diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e7994d337686..4c046134ddb9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Metricbeat* +- Fix and improve AWS metric period calculation to avoid zero-length intervals {pull}32724[32724] *Packetbeat* diff --git a/x-pack/metricbeat/module/aws/billing/billing.go b/x-pack/metricbeat/module/aws/billing/billing.go index c1c66ac05294..6e2a122162e5 100644 --- a/x-pack/metricbeat/module/aws/billing/billing.go +++ b/x-pack/metricbeat/module/aws/billing/billing.go @@ -118,7 +118,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { startDate, endDate := getStartDateEndDate(m.Period) // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency) // get cost metrics from cost explorer awsConfig := m.MetricSet.AwsConfig.Copy() diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index 48245e9118c8..bc688ced3642 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -141,7 +141,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(report mb.ReporterV2) error { // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency) m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime) // Check statistic method in config diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go index eb769456359d..d7615f4117c1 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go @@ -1468,7 +1468,7 @@ func TestCreateEventsWithIdentifier(t *testing.T) { Value: "test-ec2", }, } - startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) @@ -1508,7 +1508,7 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) { } resourceTypeTagFilters := map[string][]aws.Tag{} - startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) @@ -1554,7 +1554,7 @@ func TestCreateEventsWithTagsFilter(t *testing.T) { }, } - startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) assert.Equal(t, 1, len(events)) @@ -1706,7 +1706,7 @@ func TestCreateEventsTimestamp(t *testing.T) { } resourceTypeTagFilters := map[string][]aws.Tag{} - startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) events, err := m.createEvents(&MockCloudWatchClientWithoutDim{}, &MockResourceGroupsTaggingClient{}, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) assert.NoError(t, err) @@ -1718,6 +1718,6 @@ func TestGetStartTimeEndTime(t *testing.T) { m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}} m.MetricSet = &aws.MetricSet{Period: 5 * time.Minute} m.logger = logp.NewLogger("test") - startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency) + startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency) assert.Equal(t, 5*time.Minute, endTime.Sub(startTime)) } diff --git a/x-pack/metricbeat/module/aws/utils.go b/x-pack/metricbeat/module/aws/utils.go index 8fc46b74383e..043b1c9e79e9 100644 --- a/x-pack/metricbeat/module/aws/utils.go +++ b/x-pack/metricbeat/module/aws/utils.go @@ -18,22 +18,22 @@ import ( "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" ) -// GetStartTimeEndTime function uses durationString to create startTime and endTime for queries. -func GetStartTimeEndTime(period time.Duration, latency time.Duration) (time.Time, time.Time) { - endTime := time.Now() - if latency != 0 { - // add latency if config is not 0 - endTime = endTime.Add(latency * -1) - } - - // Set startTime to be one period earlier than the endTime. If metrics are - // not being collected, use latency config parameter to offset the startTime - // and endTime. - startTime := endTime.Add(period * -1) - // Defining duration - d := 60 * time.Second - // Calling Round() method - return startTime.Round(d), endTime.Round(d) +// GetStartTimeEndTime calculates start and end times for queries based on the current time and a duration. +// +// Whilst the inputs to this function are continuous, the maximum period granularity we can consistently use +// is 1 minute. The resulting interval should also be aligned to the period for best performance. This means +// if a period of 3 minutes is requested at 12:05, for example, the calculated times are 12:00->12:03. See +// https://github.com/aws/aws-sdk-go-v2/blob/fdbd882cdf5c63a578caed14688cf9a456c75f2b/service/cloudwatch/api_op_GetMetricData.go#L88 +// for more information about granularity and period alignment. +// +// If durations are configured in non-whole minute periods, they are rounded up to the next minute e.g. 90s becomes 120s. +// +// If `latency` is configured, the period is shifted back in time by specified duration (before period alignment). +func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration) (time.Time, time.Time) { + periodInMinutes := (period + time.Second*29).Round(time.Second * 60) + endTime := now.Add(latency * -1).Truncate(periodInMinutes) + startTime := endTime.Add(periodInMinutes * -1) + return startTime, endTime } // GetListMetricsOutput function gets listMetrics results from cloudwatch per namespace for each region. diff --git a/x-pack/metricbeat/module/aws/utils_test.go b/x-pack/metricbeat/module/aws/utils_test.go index 32d3fd295a26..02cf629bbf95 100644 --- a/x-pack/metricbeat/module/aws/utils_test.go +++ b/x-pack/metricbeat/module/aws/utils_test.go @@ -222,7 +222,7 @@ func TestGetListMetricsOutputWithWildcard(t *testing.T) { } func TestGetMetricDataPerRegion(t *testing.T) { - startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0) + startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0) mockSvc := &MockCloudWatchClient{} var metricDataQueries []cloudwatch.MetricDataQuery @@ -257,7 +257,7 @@ func TestGetMetricDataPerRegion(t *testing.T) { } func TestGetMetricDataResults(t *testing.T) { - startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0) + startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0) mockSvc := &MockCloudWatchClient{} metricInfo := cloudwatch.Metric{ @@ -497,3 +497,135 @@ func TestGetResourcesTags(t *testing.T) { } assert.Equal(t, expectedResourceTagMap, resourceTagMap) } + +func parseTime(t *testing.T, in string) time.Time { + time, err := time.Parse(time.RFC3339, in) + if err != nil { + t.Errorf("test setup failed - could not parse time with time.RFC3339: %s", in) + } + return time +} + +func TestGetStartTimeEndTime(t *testing.T) { + var cases = []struct { + title string + start string + period time.Duration + latency time.Duration + expectedStart string + expectedEnd string + }{ + // window should align with period e.g. requesting a 5 minute period at 10:27 gives 10:20->10:25 + {"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"}, + {"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"}, + {"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"}, + {"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"}, + {"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"}, + + // latency should shift the time *before* period alignment + // e.g. requesting a 5 minute period at 10:27 with 1 minutes latency still gives 10:20->10:25, + // but with 3 minutes latency gives 10:15->10:20 + {"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"}, + {"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"}, + {"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"}, + {"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"}, + + // non-whole-minute periods should be rounded up to the nearest minute; latency is applied as-is before period adjustment + {"20 seconds, 45 second latency", "2022-08-15T13:38:45Z", time.Second * 20, time.Second * 45, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"}, + {"1.5 minutes, 60 second latency", "2022-08-15T13:38:45Z", time.Second * 90, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"}, + {"just less than 5 minutes, 3 minute latency", "2022-08-15T13:38:45Z", time.Second * 59 * 5, time.Second * 90, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"}, + } + + for _, tt := range cases { + t.Run(tt.title, func(t *testing.T) { + startTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd) + + start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency) + + if expectedStartTime != start || expectedEndTime != end { + t.Errorf("got (%s, %s), want (%s, %s)", start, end, tt.expectedStart, tt.expectedEnd) + } + }) + } +} + +func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) { + type interval struct { + start, end string + } + + startTime := parseTime(t, "2022-08-24T11:01:00Z") + numCalls := 5 + + var cases = []struct { + title string + period time.Duration + latency time.Duration + expectedIntervals []interval + }{ + // with no latency + {"1 minute", time.Second * 60, 0, []interval{ + {"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"}, + {"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"}, + {"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"}, + {"2022-08-24T11:03:00Z", "2022-08-24T11:04:00Z"}, + {"2022-08-24T11:04:00Z", "2022-08-24T11:05:00Z"}, + }}, + {"2 minutes", time.Second * 60 * 2, 0, []interval{ + {"2022-08-24T10:58:00Z", "2022-08-24T11:00:00Z"}, + {"2022-08-24T11:00:00Z", "2022-08-24T11:02:00Z"}, + {"2022-08-24T11:02:00Z", "2022-08-24T11:04:00Z"}, + {"2022-08-24T11:04:00Z", "2022-08-24T11:06:00Z"}, + {"2022-08-24T11:06:00Z", "2022-08-24T11:08:00Z"}, + }}, + {"3 minutes", time.Second * 60 * 3, 0, []interval{ + {"2022-08-24T10:57:00Z", "2022-08-24T11:00:00Z"}, + {"2022-08-24T11:00:00Z", "2022-08-24T11:03:00Z"}, + {"2022-08-24T11:03:00Z", "2022-08-24T11:06:00Z"}, + {"2022-08-24T11:06:00Z", "2022-08-24T11:09:00Z"}, + {"2022-08-24T11:09:00Z", "2022-08-24T11:12:00Z"}, + }}, + {"5 minutes", time.Second * 60 * 5, 0, []interval{ + {"2022-08-24T10:55:00Z", "2022-08-24T11:00:00Z"}, + {"2022-08-24T11:00:00Z", "2022-08-24T11:05:00Z"}, + {"2022-08-24T11:05:00Z", "2022-08-24T11:10:00Z"}, + {"2022-08-24T11:10:00Z", "2022-08-24T11:15:00Z"}, + {"2022-08-24T11:15:00Z", "2022-08-24T11:20:00Z"}, + }}, + {"30 minutes", time.Second * 60 * 30, 0, []interval{ + {"2022-08-24T10:30:00Z", "2022-08-24T11:00:00Z"}, + {"2022-08-24T11:00:00Z", "2022-08-24T11:30:00Z"}, + {"2022-08-24T11:30:00Z", "2022-08-24T12:00:00Z"}, + {"2022-08-24T12:00:00Z", "2022-08-24T12:30:00Z"}, + {"2022-08-24T12:30:00Z", "2022-08-24T13:00:00Z"}, + }}, + + // with 90s latency (sanity check) + {"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, []interval{ + {"2022-08-24T10:58:00Z", "2022-08-24T10:59:00Z"}, + {"2022-08-24T10:59:00Z", "2022-08-24T11:00:00Z"}, + {"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"}, + {"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"}, + {"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"}, + }}, + } + + for _, tt := range cases { + t.Run(tt.title, func(t *testing.T) { + // get a few repeated intervals + intervals := make([]interval, numCalls) + for i := range intervals { + adjustedStartTime := startTime.Add(tt.period * time.Duration(i)) + start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency) + intervals[i] = interval{start.Format(time.RFC3339), end.Format(time.RFC3339)} + } + + for i, val := range intervals { + if val != tt.expectedIntervals[i] { + t.Errorf("got %v, want %v", intervals, tt.expectedIntervals) + break + } + } + }) + } +}