Skip to content

Commit

Permalink
more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
danehlim committed Jun 7, 2024
1 parent 6e57470 commit cb9a0c5
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 67 deletions.
6 changes: 4 additions & 2 deletions agent/api/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ type Container struct {
// saving state.
// NOTE: Do not access StartedAtUnsafe directly. Instead, use `GetStartedAt` and `SetStartedAt`.
StartedAtUnsafe time.Time `json:"startedAt,omitempty"`
// setStartedAtOnce is used to set the value of the container's started at time only the first time SetStartedAt is invoked.
// setStartedAtOnce is used to set the value of the container's started at time only the first time SetStartedAt is
// invoked.
setStartedAtOnce sync.Once
finishedAt time.Time

Expand Down Expand Up @@ -1582,7 +1583,8 @@ func (c *Container) GetRestartAggregationDataForStats() ContainerRestartAggregat
}

// SetRestartAggregationDataForStats sets the restart aggregation data for stats of a container.
func (c *Container) SetRestartAggregationDataForStats(restartAggregationDataForStats ContainerRestartAggregationDataForStats) {
func (c *Container) SetRestartAggregationDataForStats(
restartAggregationDataForStats ContainerRestartAggregationDataForStats) {
c.lock.Lock()
defer c.lock.Unlock()

Expand Down
3 changes: 3 additions & 0 deletions agent/stats/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const (
taskDefinitionVersion = "1"
containerName = "gremlin-container"
serviceConnectContainerName = "service-connect-container"

testNetworkNameA = "eth0"
testNetworkNameB = "eth1"
)

var (
Expand Down
98 changes: 50 additions & 48 deletions agent/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,54 +93,6 @@ func (queue *Queue) Add(dockerStat *types.StatsJSON, nonDockerStats NonDockerCon
return nil
}

func getAggregatedDockerStatAcrossRestarts(dockerStat, lastStatBeforeLastRestart,
lastStatInStatsQueue *types.StatsJSON) *types.StatsJSON {
logger.Debug("About to aggregate Docker stat across restart(s)", logger.Fields{
loggerfield.DockerId: dockerStat.ID,
"preLastRestartStat": lastStatBeforeLastRestart,
})

aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart)
aggregateOSDependentStats(dockerStat, lastStatBeforeLastRestart)

// PreCPU stats.
preCPUStats := types.CPUStats{}
if lastStatInStatsQueue != nil {
preCPUStats = lastStatInStatsQueue.CPUStats
}
dockerStat.PreCPUStats = preCPUStats

logger.Debug("Aggregated Docker stat across restart(s)", logger.Fields{
loggerfield.DockerId: dockerStat.ID,
"aggregatedStat": dockerStat,
})

return dockerStat
}

// aggregateOSIndependentStats aggregates stats that are measured cumulatively against container start time and
// populated regardless of what OS is being used.
func aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart *types.StatsJSON) {
// CPU stats.
dockerStat.CPUStats.CPUUsage.TotalUsage += lastStatBeforeLastRestart.CPUStats.CPUUsage.TotalUsage
dockerStat.CPUStats.CPUUsage.UsageInKernelmode += lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInKernelmode
dockerStat.CPUStats.CPUUsage.UsageInUsermode += lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInUsermode

// Network stats.
for key, dockerStatNetwork := range dockerStat.Networks {
lastStatBeforeLastRestartNetwork, ok := lastStatBeforeLastRestart.Networks[key]
if ok {
dockerStatNetwork.RxBytes += lastStatBeforeLastRestartNetwork.RxBytes
dockerStatNetwork.RxPackets += lastStatBeforeLastRestartNetwork.RxPackets
dockerStatNetwork.RxDropped += lastStatBeforeLastRestartNetwork.RxDropped
dockerStatNetwork.TxBytes += lastStatBeforeLastRestartNetwork.TxBytes
dockerStatNetwork.TxPackets += lastStatBeforeLastRestartNetwork.TxPackets
dockerStatNetwork.TxDropped += lastStatBeforeLastRestartNetwork.TxDropped
}
dockerStat.Networks[key] = dockerStatNetwork
}
}

func (queue *Queue) setLastStat(stat *types.StatsJSON) {
queue.lock.Lock()
defer queue.lock.Unlock()
Expand Down Expand Up @@ -631,3 +583,53 @@ func (queue *Queue) getUDoubleCWStatsSet(getUsageFloat getUsageFloatFunc) (*ecst
Sum: &sum,
}, nil
}

// getAggregatedDockerStatAcrossRestarts gets the aggregated docker stat for a container across container restarts.
func getAggregatedDockerStatAcrossRestarts(dockerStat, lastStatBeforeLastRestart,
lastStatInStatsQueue *types.StatsJSON) *types.StatsJSON {
logger.Debug("About to aggregate Docker stat across restart(s)", logger.Fields{
loggerfield.DockerId: dockerStat.ID,
"preLastRestartStat": lastStatBeforeLastRestart,
})

dockerStat = aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart)
dockerStat = aggregateOSDependentStats(dockerStat, lastStatBeforeLastRestart)

// PreCPU stats.
preCPUStats := types.CPUStats{}
if lastStatInStatsQueue != nil {
preCPUStats = lastStatInStatsQueue.CPUStats
}
dockerStat.PreCPUStats = preCPUStats

logger.Debug("Aggregated Docker stat across restart(s)", logger.Fields{
loggerfield.DockerId: dockerStat.ID,
"aggregatedStat": dockerStat,
})

return dockerStat
}

// aggregateOSIndependentStats aggregates stats that are measured cumulatively against container start time and
// populated regardless of what OS is being used.
func aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart *types.StatsJSON) *types.StatsJSON {
// CPU stats.
dockerStat.CPUStats.CPUUsage.TotalUsage += lastStatBeforeLastRestart.CPUStats.CPUUsage.TotalUsage
dockerStat.CPUStats.CPUUsage.UsageInKernelmode += lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInKernelmode
dockerStat.CPUStats.CPUUsage.UsageInUsermode += lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInUsermode

// Network stats.
for key, dockerStatNetwork := range dockerStat.Networks {
lastStatBeforeLastRestartNetwork, ok := lastStatBeforeLastRestart.Networks[key]
if ok {
dockerStatNetwork.RxBytes += lastStatBeforeLastRestartNetwork.RxBytes
dockerStatNetwork.RxPackets += lastStatBeforeLastRestartNetwork.RxPackets
dockerStatNetwork.RxDropped += lastStatBeforeLastRestartNetwork.RxDropped
dockerStatNetwork.TxBytes += lastStatBeforeLastRestartNetwork.TxBytes
dockerStatNetwork.TxPackets += lastStatBeforeLastRestartNetwork.TxPackets
dockerStatNetwork.TxDropped += lastStatBeforeLastRestartNetwork.TxDropped
}
dockerStat.Networks[key] = dockerStatNetwork
}
return dockerStat
}
95 changes: 95 additions & 0 deletions agent/stats/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/docker/docker/api/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -785,3 +786,97 @@ func TestPerSecNetworkStatSetFailWithOneDatapoint(t *testing.T) {
stats, err := queue.GetNetworkStatsSet()
require.Errorf(t, err, "Received unexpected network stats set %v", stats)
}

func TestAggregateOSIndependentStats(t *testing.T) {
dockerStat := getTestStatsJSONForOSIndependentStats(1, 2, 3, 4, 5, 6, 7, 8, 9)
lastStatBeforeLastRestart := getTestStatsJSONForOSIndependentStats(9, 8, 7, 6, 5, 4, 3, 2, 1)
expectedAggregatedStat := types.StatsJSON{
Stats: types.Stats{
CPUStats: types.CPUStats{
CPUUsage: types.CPUUsage{
TotalUsage: dockerStat.CPUStats.CPUUsage.TotalUsage +
lastStatBeforeLastRestart.CPUStats.CPUUsage.TotalUsage,
UsageInKernelmode: dockerStat.CPUStats.CPUUsage.UsageInKernelmode +
lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInKernelmode,
UsageInUsermode: dockerStat.CPUStats.CPUUsage.UsageInUsermode +
lastStatBeforeLastRestart.CPUStats.CPUUsage.UsageInUsermode,
},
},
},
Networks: map[string]types.NetworkStats{
testNetworkNameA: {
RxBytes: dockerStat.Networks[testNetworkNameA].RxBytes +
lastStatBeforeLastRestart.Networks[testNetworkNameA].RxBytes,
RxPackets: dockerStat.Networks[testNetworkNameA].RxPackets +
lastStatBeforeLastRestart.Networks[testNetworkNameA].RxPackets,
RxDropped: dockerStat.Networks[testNetworkNameA].RxDropped +
lastStatBeforeLastRestart.Networks[testNetworkNameA].RxDropped,
TxBytes: dockerStat.Networks[testNetworkNameA].TxBytes +
lastStatBeforeLastRestart.Networks[testNetworkNameA].TxBytes,
TxPackets: dockerStat.Networks[testNetworkNameA].TxPackets +
lastStatBeforeLastRestart.Networks[testNetworkNameA].TxPackets,
TxDropped: dockerStat.Networks[testNetworkNameA].TxDropped +
lastStatBeforeLastRestart.Networks[testNetworkNameA].TxDropped,
},
testNetworkNameB: {
RxBytes: dockerStat.Networks[testNetworkNameB].RxBytes +
lastStatBeforeLastRestart.Networks[testNetworkNameB].RxBytes,
RxPackets: dockerStat.Networks[testNetworkNameB].RxPackets +
lastStatBeforeLastRestart.Networks[testNetworkNameB].RxPackets,
RxDropped: dockerStat.Networks[testNetworkNameB].RxDropped +
lastStatBeforeLastRestart.Networks[testNetworkNameB].RxDropped,
TxBytes: dockerStat.Networks[testNetworkNameB].TxBytes +
lastStatBeforeLastRestart.Networks[testNetworkNameB].TxBytes,
TxPackets: dockerStat.Networks[testNetworkNameB].TxPackets +
lastStatBeforeLastRestart.Networks[testNetworkNameB].TxPackets,
TxDropped: dockerStat.Networks[testNetworkNameB].TxDropped +
lastStatBeforeLastRestart.Networks[testNetworkNameB].TxDropped,
},
},
}

dockerStat = aggregateOSIndependentStats(dockerStat, lastStatBeforeLastRestart)
require.Equal(t, *dockerStat, expectedAggregatedStat)
}

func TestGetAggregatedDockerStatAcrossRestarts(t *testing.T) {
var dockerStat, lastStatBeforeLastRestart, lastStatInStatsQueue types.StatsJSON
lastStatInStatsQueue.Stats.CPUStats.CPUUsage.TotalUsage = uint64(123)

dockerStat = *getAggregatedDockerStatAcrossRestarts(&dockerStat, &lastStatBeforeLastRestart, &lastStatInStatsQueue)
require.Equal(t, lastStatInStatsQueue.Stats.CPUStats.CPUUsage.TotalUsage,
dockerStat.PreCPUStats.CPUUsage.TotalUsage)
}

func getTestStatsJSONForOSIndependentStats(totalCPUUsage, usageInKernelMode, usageInUserMode, rxBytes, rxPackets,
rxDropped, txBytes, txPackets, txDropped uint64) *types.StatsJSON {
return &types.StatsJSON{
Stats: types.Stats{
CPUStats: types.CPUStats{
CPUUsage: types.CPUUsage{
TotalUsage: totalCPUUsage,
UsageInKernelmode: usageInKernelMode,
UsageInUsermode: usageInUserMode,
},
},
},
Networks: map[string]types.NetworkStats{
testNetworkNameA: {
RxBytes: rxBytes,
RxPackets: rxPackets,
RxDropped: rxDropped,
TxBytes: txBytes,
TxPackets: txPackets,
TxDropped: txDropped,
},
testNetworkNameB: {
RxBytes: rxBytes + 1,
RxPackets: rxPackets + 1,
RxDropped: rxDropped + 1,
TxBytes: txBytes + 1,
TxPackets: txPackets + 1,
TxDropped: txDropped + 1,
},
},
}
}
44 changes: 29 additions & 15 deletions agent/stats/queue_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@ type BlockStatValue struct {

// aggregateOSDependentStats aggregates stats that are measured cumulatively against container start time and
// populated only for Linux OS.
func aggregateOSDependentStats(dockerStat, lastStatBeforeLastRestart *types.StatsJSON) {
func aggregateOSDependentStats(dockerStat, lastStatBeforeLastRestart *types.StatsJSON) *types.StatsJSON {
// CPU stats.
aggregateUsagePerCore(&dockerStat.CPUStats.CPUUsage.PercpuUsage, lastStatBeforeLastRestart.CPUStats.CPUUsage.PercpuUsage)
aggregateUsagePerCore(&dockerStat.CPUStats.CPUUsage.PercpuUsage,
lastStatBeforeLastRestart.CPUStats.CPUUsage.PercpuUsage)
dockerStat.CPUStats.ThrottlingData.Periods += lastStatBeforeLastRestart.CPUStats.ThrottlingData.Periods
dockerStat.CPUStats.ThrottlingData.ThrottledPeriods += lastStatBeforeLastRestart.CPUStats.ThrottlingData.ThrottledPeriods
dockerStat.CPUStats.ThrottlingData.ThrottledPeriods +=
lastStatBeforeLastRestart.CPUStats.ThrottlingData.ThrottledPeriods
dockerStat.CPUStats.ThrottlingData.ThrottledTime += lastStatBeforeLastRestart.CPUStats.ThrottlingData.ThrottledTime

// Memory stats.
dockerStat.MemoryStats.MaxUsage = utils.MaxNum(dockerStat.MemoryStats.MaxUsage, lastStatBeforeLastRestart.MemoryStats.MaxUsage)
dockerStat.MemoryStats.MaxUsage = utils.MaxNum(dockerStat.MemoryStats.MaxUsage,
lastStatBeforeLastRestart.MemoryStats.MaxUsage)
dockerStat.MemoryStats.Failcnt += lastStatBeforeLastRestart.MemoryStats.Failcnt

// Block I/O stats.
aggregateBlockStat(&dockerStat.BlkioStats.IoServiceBytesRecursive, lastStatBeforeLastRestart.BlkioStats.IoServiceBytesRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoServicedRecursive, lastStatBeforeLastRestart.BlkioStats.IoServicedRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoServiceTimeRecursive, lastStatBeforeLastRestart.BlkioStats.IoServiceTimeRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoWaitTimeRecursive, lastStatBeforeLastRestart.BlkioStats.IoWaitTimeRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoServiceBytesRecursive,
lastStatBeforeLastRestart.BlkioStats.IoServiceBytesRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoServicedRecursive,
lastStatBeforeLastRestart.BlkioStats.IoServicedRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoServiceTimeRecursive,
lastStatBeforeLastRestart.BlkioStats.IoServiceTimeRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoWaitTimeRecursive,
lastStatBeforeLastRestart.BlkioStats.IoWaitTimeRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoMergedRecursive, lastStatBeforeLastRestart.BlkioStats.IoMergedRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.IoTimeRecursive, lastStatBeforeLastRestart.BlkioStats.IoTimeRecursive)
aggregateBlockStat(&dockerStat.BlkioStats.SectorsRecursive, lastStatBeforeLastRestart.BlkioStats.SectorsRecursive)
Expand All @@ -63,12 +70,17 @@ func aggregateOSDependentStats(dockerStat, lastStatBeforeLastRestart *types.Stat
}
dockerStat.Networks[key] = dockerStatNetwork
}

return dockerStat
}

// TO-DO: add comment here
// aggregateUsagePerCore aggregates the total CPU time consumed per core.
func aggregateUsagePerCore(dockerStatUsageSlice *[]uint64, lastStatBeforeLastRestartStatUsageSlice []uint64) {
if len(*dockerStatUsageSlice) == 0 && len(lastStatBeforeLastRestartStatUsageSlice) == 0 {
return
}

aggregatedUsageSlice := []uint64{}
// TO-DO: once max is a generic function remove uint64 and int casting below
peakNumCores := utils.MaxNum(len(*dockerStatUsageSlice), len(lastStatBeforeLastRestartStatUsageSlice))
for i := 0; i < peakNumCores; i++ {
coreUsage := uint64(0)
Expand All @@ -83,9 +95,13 @@ func aggregateUsagePerCore(dockerStatUsageSlice *[]uint64, lastStatBeforeLastRes
*dockerStatUsageSlice = aggregatedUsageSlice
}

// TO-DO: add comment here
// TO-DO: test this function with a unit test for sure to get the most confidence (as opposed to trying to figure out how to produce values where addition is actually happening in a real world setup)
func aggregateBlockStat(dockerStatBlockStatSlice *[]types.BlkioStatEntry, lastStatBeforeLastRestartStatBlockStatSlice []types.BlkioStatEntry) {
// aggregateBlockStat aggregates block I/O stats for the specified I/O service stat.
func aggregateBlockStat(dockerStatBlockStatSlice *[]types.BlkioStatEntry,
lastStatBeforeLastRestartStatBlockStatSlice []types.BlkioStatEntry) {
if len(*dockerStatBlockStatSlice) == 0 && len(lastStatBeforeLastRestartStatBlockStatSlice) == 0 {
return
}

aggregatedBlockStatSlice := []types.BlkioStatEntry{}
blockStatsMap := make(map[BlockStatKey]BlockStatValue)

Expand All @@ -102,7 +118,6 @@ func aggregateBlockStat(dockerStatBlockStatSlice *[]types.BlkioStatEntry, lastSt
*dockerStatBlockStatSlice = aggregatedBlockStatSlice
}

// TO-DO: add comment here
func addBlockStatEntriesFromSliceToMap(statSlice []types.BlkioStatEntry, statMap map[BlockStatKey]BlockStatValue) {
for _, blockStat := range statSlice {
blkStatKey := BlockStatKey{blockStat.Major, blockStat.Minor, blockStat.Op}
Expand All @@ -116,7 +131,6 @@ func addBlockStatEntriesFromSliceToMap(statSlice []types.BlkioStatEntry, statMap
}
}

// TO-DO: add comment here
func addCorrespondingMapEntriesOfSliceToAggregatedSlice(statMap map[BlockStatKey]BlockStatValue,
statSlice []types.BlkioStatEntry, aggregatedSlice *[]types.BlkioStatEntry) {
for _, blockStat := range statSlice {
Expand Down
Loading

0 comments on commit cb9a0c5

Please sign in to comment.