Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Enable filtering acorn events output to a given time span
Browse files Browse the repository at this point in the history
Add two new options to the `acorn events` subcommand:
- `--since` will determine the start of the time span; excludes events
  observed before this point when provided
- `--until` will determine the end of the time span; excludes events
  observed after this point when provided

Each option accepts a single argument; arguments can be Unix timestamps,
date formatted timestamps, or Go duration strings(relative to system time).

Signed-off-by: Nick Hale <[email protected]>
  • Loading branch information
njhale committed Jul 14, 2023
1 parent 0a30e5c commit fd9b4d2
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 10 deletions.
10 changes: 10 additions & 0 deletions docs/docs/100-reference/01-command-line/acorn_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ acorn events [flags] [PREFIX]
# Get a single event by name
acorn events 4b2ba097badf2031c4718609b9179fb5
# Filtering by Time
# The --since and --until options can be Unix timestamps, date formatted timestamps, or Go duration strings (relative to system time).
# List events observed within the last 15 minutes
acorn events --since 15m
# List events observed between 2023-05-08T15:04:05 and 2023-05-08T15:05:05 (inclusive)
acorn events --since '2023-05-08T15:04:05' --until '2023-05-08T15:05:05'
```

### Options
Expand All @@ -52,7 +60,9 @@ acorn events [flags] [PREFIX]
-f, --follow Follow the event log
-h, --help help for events
-o, --output string Output format (json, yaml, {{gotemplate}})
-s, --since string Show all events created since timestamp
-t, --tail int Return this number of latest events
-u, --until string Stream events until this timestamp
```

### Options inherited from parent commands
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/api.acorn.io/v1/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func AddToSchemeWithGV(scheme *runtime.Scheme, schemeGroupVersion schema.GroupVe
gvk := schemeGroupVersion.WithKind("Event")
flcf := func(label, value string) (string, string, error) {
switch label {
case "prefix", "details", "metadata.name", "metadata.namespace":
case "prefix", "since", "until", "details", "metadata.name", "metadata.namespace":
return label, value, nil
}
return "", "", fmt.Errorf("unsupported field selection [%s]", label)
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/internal.acorn.io/v1/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (e EventInstance) GetObserved() MicroTime {
// It extends metav1.MicroTime to allow unmarshaling from RFC3339.
type MicroTime metav1.MicroTime

func NewMicroTime(t time.Time) MicroTime {
return MicroTime(metav1.NewMicroTime(t))
}

func NowMicro() MicroTime {
return NewMicroTime(time.Now())
}

// DeepCopyInto returns a deep-copy of the MicroTime value. The underlying time.Time
// type is effectively immutable in the time API, so it is safe to
// copy-by-assign, despite the presence of (unexported) Pointer fields.
Expand Down
12 changes: 12 additions & 0 deletions pkg/cli/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,23 @@ func NewEvent(c CommandContext) *cobra.Command {
# Get a single event by name
acorn events 4b2ba097badf2031c4718609b9179fb5
# Filtering by Time
# The --since and --until options can be Unix timestamps, date formatted timestamps, or Go duration strings (relative to system time).
# List events observed within the last 15 minutes
acorn events --since 15m
# List events observed between 2023-05-08T15:04:05 and 2023-05-08T15:05:05 (inclusive)
acorn events --since '2023-05-08T15:04:05' --until '2023-05-08T15:05:05'
`})
return cmd
}

type Events struct {
Tail int `usage:"Return this number of latest events" short:"t"`
Follow bool `usage:"Follow the event log" short:"f"`
Since string `usage:"Show all events created since timestamp" short:"s"`
Until string `usage:"Stream events until this timestamp" short:"u"`
Output string `usage:"Output format (json, yaml, {{gotemplate}})" short:"o"`
client ClientFactory
}
Expand All @@ -66,6 +76,8 @@ func (e *Events) Run(cmd *cobra.Command, args []string) error {
opts := &client.EventStreamOptions{
Tail: e.Tail,
Follow: e.Follow,
Since: e.Since,
Until: e.Until,
}

if len(args) > 0 {
Expand Down
8 changes: 8 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ type EventStreamOptions struct {
Tail int `json:"tail,omitempty"`
Follow bool `json:"follow,omitempty"`
Prefix string `json:"prefix,omitempty"`
Since string `json:"since,omitempty"`
Until string `json:"until,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
}

Expand All @@ -346,6 +348,12 @@ func (o EventStreamOptions) ListOptions() *kclient.ListOptions {
if o.Prefix != "" {
fieldSet["prefix"] = o.Prefix
}
if o.Since != "" {
fieldSet["since"] = o.Since
}
if o.Until != "" {
fieldSet["until"] = o.Until
}

// Set details selector to get details from older runtime APIs that don't return details by default.
fieldSet["details"] = strconv.FormatBool(true)
Expand Down
100 changes: 91 additions & 9 deletions pkg/server/registry/apigroups/acorn/events/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package events

import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"

"github.com/acorn-io/mink/pkg/strategy"
"github.com/acorn-io/mink/pkg/types"
apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1"
v1 "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1"
internalv1 "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1"
"github.com/acorn-io/runtime/pkg/channels"
"github.com/acorn-io/z"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -83,7 +86,7 @@ func setDefaults(ctx context.Context, e *apiv1.Event) *apiv1.Event {
}

if e.Observed.IsZero() {
e.Observed = v1.MicroTime(metav1.NowMicro())
e.Observed = internalv1.NowMicro()
}

return e
Expand All @@ -97,6 +100,12 @@ type query struct {
// Only events with matching names or source strings are included in query results.
// As a special case, the empty string "" matches all events.
prefix prefix

// since excludes events observed before it when not nil.
since *internalv1.MicroTime

// until excludes events observed after it when not nil.
until *internalv1.MicroTime
}

// filterChannel applies the query to every event received from unfiltered and forwards the result to filtered, if any.
Expand Down Expand Up @@ -131,7 +140,7 @@ func (q query) filterEvent(e watch.Event) *watch.Event {
return &e
}

// Attempt to filter
// Filter
obj := e.Object.(*apiv1.Event)
filtered := q.filter(*obj)
if len(filtered) < 1 {
Expand All @@ -144,6 +153,24 @@ func (q query) filterEvent(e watch.Event) *watch.Event {
return &e
}

func (q query) afterWindow(observation internalv1.MicroTime) bool {
if q.until == nil {
// Window includes all future events
return false
}

return observation.After(q.until.Time)
}

func (q query) beforeWindow(observation internalv1.MicroTime) bool {
if q.since == nil {
// Window includes all existing events
return false
}

return observation.Before(q.since.Time)
}

// filter returns the result of applying the query to a slice of events.
func (q query) filter(events ...apiv1.Event) []apiv1.Event {
if len(events) < 1 {
Expand All @@ -161,15 +188,22 @@ func (q query) filter(events ...apiv1.Event) []apiv1.Event {
tail = int(q.tail)
}

if q.prefix.all() {
// Query selects all remaining events
return events[len(events)-tail:]
}

results := make([]apiv1.Event, 0, tail)
for _, event := range events {
observed := event.Observed
if q.beforeWindow(observed) {
// Exclude events observed before the observation window starts
continue
}

if q.afterWindow(observed) {
// Exclude all events observed after the observation window ends.
// Since the slice is sorted chronologically, we can stop filtering here.
break
}

if !q.prefix.matches(event) {
// Exclude from results
// Exclude event, it doesn't match the given prefix
continue
}

Expand All @@ -187,13 +221,18 @@ func (q query) filter(events ...apiv1.Event) []apiv1.Event {
func stripQuery(opts storage.ListOptions) (q query, stripped storage.ListOptions, err error) {
stripped = opts

now := internalv1.MicroTime(metav1.NowMicro())
stripped.Predicate.Field, err = stripped.Predicate.Field.Transform(func(f, v string) (string, string, error) {
var err error
switch f {
case "details":
// Detail elision is deprecated, so clients should always get details.
// We still strip it from the selector here in order to maintain limited backwards compatibility with old
// clients that still specify it.
case "since":
q.since, err = parseTimeBound(v, now, true)
case "until":
q.until, err = parseTimeBound(v, now, false)
case "prefix":
q.prefix = prefix(v)
default:
Expand All @@ -211,6 +250,49 @@ func stripQuery(opts storage.ListOptions) (q query, stripped storage.ListOptions
return
}

func parseTimeBound(raw string, now internalv1.MicroTime, lower bool) (*internalv1.MicroTime, error) {
// Try to parse raw as a duration string
var errs []error
duration, err := time.ParseDuration(raw)
if err == nil {
if lower {
duration *= -1
}

return z.P(internalv1.NewMicroTime(now.Add(duration))), nil
}
errs = append(errs, fmt.Errorf("%s is not a valid duration: %w", raw, err))

t, err := parseTime(raw)
if err == nil {
return t, nil
}
errs = append(errs, fmt.Errorf("%s is not a valid time: %w", raw, err))

return nil, errors.Join(errs...)
}

var (
supportedLayouts = []string{
time.RFC3339,
metav1.RFC3339Micro,
}
)

func parseTime(raw string) (*internalv1.MicroTime, error) {
var errs []error
for _, layout := range supportedLayouts {
since, err := time.Parse(layout, raw)
if err == nil {
return z.P(internalv1.NewMicroTime(since)), nil
}

errs = append(errs, err)
}

return nil, errors.Join(errs...)
}

type prefix string

func (p prefix) matches(e apiv1.Event) bool {
Expand Down

0 comments on commit fd9b4d2

Please sign in to comment.