diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 4d919b2..bef5620 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -153,23 +153,26 @@ func (cwl *Client) Tail(ctx context.Context) error { func (cwl *Client) tail(ctx context.Context, logGroupName string, start chan struct{}, ch chan *logEvent, errch chan error) error { - lastSeenTime := aws.Int64(cwl.config.StartTime.UTC().Unix() * 1000) + lastEventTime := aws.Int64(cwl.config.StartTime.UTC().Unix() * 1000) fn := func(res *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool { for _, event := range res.Events { if cache.Cache.Load(logGroupName, event.EventId) { continue } - cache.Cache.Store(logGroupName, event.EventId, event.IngestionTime) + cache.Cache.Store(logGroupName, event.EventId, event.Timestamp) ch <- &logEvent{ logGroupName: logGroupName, event: event, } + + if *event.Timestamp > *lastEventTime { + lastEventTime = event.Timestamp + } } - if lastPage && len(res.Events) > 0 { - lastSeenTime = res.Events[len(res.Events)-1].IngestionTime - cache.Cache.Expire(logGroupName, lastSeenTime) + if lastPage { + cache.Cache.Expire(logGroupName, lastEventTime) } return true @@ -182,7 +185,7 @@ func (cwl *Client) tail(ctx context.Context, logGroupName string, case <-start: } - streams, err := cwl.ListStreams(ctx, logGroupName, *lastSeenTime) + streams, err := cwl.ListStreams(ctx, logGroupName, *lastEventTime) if err != nil { return err } @@ -205,7 +208,7 @@ func (cwl *Client) tail(ctx context.Context, logGroupName string, LogGroupName: aws.String(logGroupName), LogStreamNames: streamNames, Interleaved: aws.Bool(true), - StartTime: lastSeenTime, + StartTime: lastEventTime, } if cwl.config.FilterPattern != "" { input.FilterPattern = aws.String(cwl.config.FilterPattern)