Skip to content

Commit

Permalink
Fix shared EBS discovery implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Sep 12, 2023
1 parent 284375c commit e7e9eca
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 86 deletions.
8 changes: 5 additions & 3 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) er
}

volumeId := ebs.GetAttachmentProperties(apiebs.VolumeIdName)
_, ok := w.agentState.GetEBSByVolumeId(volumeId)
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volumeId)
if ok {
log.Infof("EBS Volume attachment already exists. Skip handling EBS attachment %v.", ebs.EBSToString())
return nil
return ebsAttachment.StartTimer(func() {
w.handleEBSAckTimeout(volumeId)
})
}

if err := w.addEBSAttachmentToState(ebs); err != nil {
Expand All @@ -116,7 +118,7 @@ func (w *EBSWatcher) notifyFoundEBS(volumeId string) {
// TODO: Add the EBS volume to data client
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Warnf("Unable to find EBS volume with volume ID: %v.", volumeId)
log.Warnf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId)
return
}

Expand Down
54 changes: 54 additions & 0 deletions agent/ebs/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ebs

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -313,3 +314,56 @@ func TestHandleEBSAckTimeout(t *testing.T) {
ebsAttachment, ok := taskEngineState.(*dockerstate.DockerTaskEngineState).GetEBSByVolumeId(volumeID)
assert.False(t, ok)
}

// TestHandleMismatchEBSAttachment tests handling an EBS attachment but found a different volume attached
// onto the host during the scanning process.
func TestHandleMismatchEBSAttachment(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

ctx := context.Background()
taskEngineState := dockerstate.NewTaskEngineState()
eventChannel := make(chan statechange.Event)
mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl)

watcher := newTestEBSWatcher(ctx, taskEngineState, eventChannel, mockDiscoveryClient)

testAttachmentProperties := map[string]string{
apiebs.ResourceTypeName: apiebs.ElasticBlockStorage,
apiebs.DeviceName: deviceName,
apiebs.VolumeIdName: volumeID,
}

expiresAt := time.Now().Add(time.Millisecond * testconst.WaitTimeoutMillis)
ebsAttachment := &apiebs.ResourceAttachment{
AttachmentInfo: attachmentinfo.AttachmentInfo{
TaskARN: taskARN,
TaskClusterARN: taskClusterARN,
ContainerInstanceARN: containerInstanceARN,
ExpiresAt: expiresAt,
Status: status.AttachmentNone,
AttachmentARN: resourceAttachmentARN,
},
AttachmentProperties: testAttachmentProperties,
}

var wg sync.WaitGroup
wg.Add(1)
mockDiscoveryClient.EXPECT().ConfirmEBSVolumeIsAttached(deviceName, volumeID).
Do(func(deviceName, volumeID string) {
wg.Done()
}).
Return(fmt.Errorf("%w; expected EBS volume %s but found %s", apiebs.ErrInvalidVolumeID, volumeID, "vol-321")).
MinTimes(1)

err := watcher.HandleResourceAttachment(ebsAttachment)
assert.NoError(t, err)

pendingEBS := watcher.agentState.GetAllPendingEBSAttachmentsWithKey()
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, watcher.discoveryClient)

assert.Empty(t, foundVolumes)
ebsAttachment, ok := taskEngineState.(*dockerstate.DockerTaskEngineState).GetEBSByVolumeId(volumeID)
assert.True(t, ok)
assert.ErrorIs(t, ebsAttachment.GetError(), apiebs.ErrInvalidVolumeID)
}
4 changes: 2 additions & 2 deletions agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource
func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment {
var pendingEBSAttachments []*apiresource.ResourceAttachment
for _, v := range state.ebsAttachments {
if !v.IsAttached() {
if !v.IsAttached() && !v.IsSent() {
pendingEBSAttachments = append(pendingEBSAttachments, v)
}
}
Expand All @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str
func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment {
pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment)
for k, v := range state.ebsAttachments {
if !v.IsAttached() {
if !v.IsAttached() && !v.IsSent() {
pendingEBSAttachments[k] = v
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 9 additions & 19 deletions ecs-agent/api/resource/ebs_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ package resource

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/aws/amazon-ecs-agent/ecs-agent/logger"

"github.com/pkg/errors"
)

const (
ebsVolumeDiscoveryTimeout = 5 * time.Second
ebsVolumeDiscoveryTimeout = 300 * time.Second
ebsResourceKeyPrefix = "ebs-volume:"
ScanPeriod = 500 * time.Millisecond
)
Expand All @@ -43,27 +41,19 @@ func NewDiscoveryClient(ctx context.Context) *EBSDiscoveryClient {
}
}

// ScanEBSVolumes will iterate through the entire list of pending EBS volume attachments within the agent state and checks if it's attached on the host.
func ScanEBSVolumes[T GenericEBSAttachmentObject](t map[string]T, dc EBSDiscovery) []string {
// ScanEBSVolumes will iterate through the entire list of provided EBS volume attachments within the agent state and checks if it's attached on the host.
func ScanEBSVolumes[T GenericEBSAttachmentObject](pendingAttachments map[string]T, dc EBSDiscovery) []string {
var err error
var foundVolumes []string
for key, ebs := range t {
for key, ebs := range pendingAttachments {
volumeId := strings.TrimPrefix(key, ebsResourceKeyPrefix)
deviceName := ebs.GetAttachmentProperties(DeviceName)
err = dc.ConfirmEBSVolumeIsAttached(deviceName, volumeId)
if err != nil {
if err == ErrInvalidVolumeID || errors.Cause(err) == ErrInvalidVolumeID {
logger.Warn("Found a different EBS volume attached to the host. Expected EBS volume:", logger.Fields{
"volumeId": volumeId,
"deviceName": deviceName,
})
} else {
logger.Warn("Failed to confirm if EBS volume is attached to the host. ", logger.Fields{
"volumeId": volumeId,
"deviceName": deviceName,
"error": err,
})
if !errors.Is(err, ErrInvalidVolumeID) {
err = fmt.Errorf("%w; failed to confirm if EBS volume is attached to the host", err)
}
ebs.SetError(err)
continue
}
foundVolumes = append(foundVolumes, key)
Expand Down
33 changes: 18 additions & 15 deletions ecs-agent/api/resource/ebs_discovery_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ import (

// LsblkOutput is used to manage and track the output of `lsblk`
type LsblkOutput struct {
BlockDevies []BD `json:"blockdevices"`
BlockDevices []BlockDevice `json:"blockdevices"`
}
type BD struct {
Name string `json:"name"`
Serial string `json:"serial"`
Children []BDChild `json:"children"`
}
type BDChild struct {
Name string `json:"name"`
Serial string `json:"serial"`
type BlockDevice struct {
Name string `json:"name"`
Serial string `json:"serial"`
Children []*BlockDevice `json:"children,omitempty"`
}

// ConfirmEBSVolumeIsAttached is used to scan for an EBS volume that's on the host with a specific device name and/or volume ID.
Expand Down Expand Up @@ -76,14 +72,21 @@ func (api *EBSDiscoveryClient) ConfirmEBSVolumeIsAttached(deviceName, volumeID s

// parseLsblkOutput will parse the `lsblk` output and search for a EBS volume with a specific device name.
// Once found we return the volume ID, otherwise we return an empty string along with an error
// The output of the "lsblk -o +SERIAL" command looks like the following:
// NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT SERIAL
// nvme0n1 259:0 0 30G 0 disk vol123
// ├─nvme0n1p1 259:1 0 30G 0 part /
// └─nvme0n1p128 259:2 0 1M 0 part
// The output of the "lsblk -o NAME,SERIAL -J" command looks like the following:
//
// {
// "blockdevices": [
// {"name": "nvme0n1", "serial": "vol087768edff8511a23",
// "children": [
// {"name": "nvme0n1p1", "serial": null},
// {"name": "nvme0n1p128", "serial": null}
// ]
// }
// ]
// }
func parseLsblkOutput(output *LsblkOutput, deviceName string) (string, error) {
actualDeviceName := deviceName[strings.LastIndex(deviceName, "/")+1:]
for _, block := range output.BlockDevies {
for _, block := range output.BlockDevices {
if block.Name == actualDeviceName {
return block.Serial, nil
}
Expand Down
12 changes: 6 additions & 6 deletions ecs-agent/api/resource/ebs_discovery_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ const (
)

func TestParseLsblkOutput(t *testing.T) {
blockDevice := BD{
blockDevice := BlockDevice{
Name: testDeviceName,
Serial: testVolumeID,
Children: make([]BDChild, 0),
Children: make([]*BlockDevice, 0),
}

lsblkOutput := &LsblkOutput{
BlockDevies: []BD{
BlockDevices: []BlockDevice{
blockDevice,
},
}
Expand All @@ -47,14 +47,14 @@ func TestParseLsblkOutput(t *testing.T) {
}

func TestParseLsblkOutputError(t *testing.T) {
blockDevice := BD{
blockDevice := BlockDevice{
Name: "nvme1n1",
Serial: testVolumeID,
Children: make([]BDChild, 0),
Children: make([]*BlockDevice, 0),
}

lsblkOutput := &LsblkOutput{
BlockDevies: []BD{
BlockDevices: []BlockDevice{
blockDevice,
},
}
Expand Down
Loading

0 comments on commit e7e9eca

Please sign in to comment.