diff --git a/cmd/podman/system/events.go b/cmd/podman/system/events.go index b4e1317d9e..fa4ce5f95e 100644 --- a/cmd/podman/system/events.go +++ b/cmd/podman/system/events.go @@ -12,6 +12,7 @@ import ( "github.com/containers/podman/v5/cmd/podman/validate" "github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/pkg/domain/entities" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -139,9 +140,8 @@ func eventsCmd(cmd *cobra.Command, _ []string) error { if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 { eventOptions.FromStart = true } - eventChannel := make(chan *events.Event, 1) + eventChannel := make(chan events.ReadResult, 1) eventOptions.EventChan = eventChannel - errChannel := make(chan error) var ( rpt *report.Formatter @@ -161,40 +161,31 @@ func eventsCmd(cmd *cobra.Command, _ []string) error { } } - go func() { - errChannel <- registry.ContainerEngine().Events(context.Background(), eventOptions) - close(errChannel) - }() - - for { - select { - case event, ok := <-eventChannel: - if !ok { - // channel was closed we can exit - // read the error channel blocking to make sure we are not missing any errors (#23165) - return <-errChannel - } - switch { - case doJSON: - e := newEventFromLibpodEvent(event) - jsonStr, err := e.ToJSONString() - if err != nil { - return err - } - fmt.Println(jsonStr) - case cmd.Flags().Changed("format"): - if err := rpt.Execute(newEventFromLibpodEvent(event)); err != nil { - return err - } - default: - fmt.Println(event.ToHumanReadable(!noTrunc)) - } - case err := <-errChannel: - // only exit in case of an error, - // otherwise keep reading events until the event channel is closed + err := registry.ContainerEngine().Events(context.Background(), eventOptions) + if err != nil { + return err + } + + for evt := range eventChannel { + if evt.Error != nil { + logrus.Errorf("Failed to read event: %v", evt.Error) + continue + } + switch { + case doJSON: + e := newEventFromLibpodEvent(evt.Event) + jsonStr, err := e.ToJSONString() if err != nil { return err } + fmt.Println(jsonStr) + case cmd.Flags().Changed("format"): + if err := rpt.Execute(newEventFromLibpodEvent(evt.Event)); err != nil { + return err + } + default: + fmt.Println(evt.Event.ToHumanReadable(!noTrunc)) } } + return nil } diff --git a/libpod/events.go b/libpod/events.go index 6189ee895a..daaacdd868 100644 --- a/libpod/events.go +++ b/libpod/events.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "path/filepath" - "sync" "github.com/containers/podman/v5/libpod/define" "github.com/containers/podman/v5/libpod/events" @@ -187,7 +186,7 @@ func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error // GetEvents reads the event log and returns events based on input filters func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) { - eventChannel := make(chan *events.Event) + eventChannel := make(chan events.ReadResult) options := events.ReadOptions{ EventChannel: eventChannel, Filters: filters, @@ -195,45 +194,21 @@ func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Ev Stream: false, } - logEvents := make([]*events.Event, 0, len(eventChannel)) - readLock := sync.Mutex{} - readLock.Lock() - go func() { - for e := range eventChannel { - logEvents = append(logEvents, e) - } - readLock.Unlock() - }() - - readErr := r.eventer.Read(ctx, options) - readLock.Lock() // Wait for the events to be consumed. - return logEvents, readErr -} - -// GetLastContainerEvent takes a container name or ID and an event status and returns -// the last occurrence of the container event -func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) { - // FIXME: events should be read in reverse order! - // https://github.com/containers/podman/issues/14579 - - // check to make sure the event.Status is valid - if _, err := events.StringToStatus(containerEvent.String()); err != nil { - return nil, err - } - filters := []string{ - fmt.Sprintf("container=%s", nameOrID), - fmt.Sprintf("event=%s", containerEvent), - "type=container", - } - containerEvents, err := r.GetEvents(ctx, filters) + err := r.eventer.Read(ctx, options) if err != nil { return nil, err } - if len(containerEvents) < 1 { - return nil, fmt.Errorf("%s not found: %w", containerEvent.String(), events.ErrEventNotFound) + + logEvents := make([]*events.Event, 0, len(eventChannel)) + for evt := range eventChannel { + // we ignore any error here, this is only used on the backup + // GetExecDiedEvent() died path as best effort anyway + if evt.Error == nil { + logEvents = append(logEvents, evt.Event) + } } - // return the last element in the slice - return containerEvents[len(containerEvents)-1], nil + + return logEvents, nil } // GetExecDiedEvent takes a container name or ID, exec session ID, and returns diff --git a/libpod/events/config.go b/libpod/events/config.go index 9e3ae6bae8..6337e4c155 100644 --- a/libpod/events/config.go +++ b/libpod/events/config.go @@ -85,10 +85,15 @@ type Eventer interface { String() string } +type ReadResult struct { + Event *Event + Error error +} + // ReadOptions describe the attributes needed to read event logs type ReadOptions struct { // EventChannel is the comm path back to user - EventChannel chan *Event + EventChannel chan ReadResult // Filters are key/value pairs that describe to limit output Filters []string // FromStart means you start reading from the start of the logs diff --git a/libpod/events/journal_linux.go b/libpod/events/journal_linux.go index fe23a40379..147d1ea60a 100644 --- a/libpod/events/journal_linux.go +++ b/libpod/events/journal_linux.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "runtime" "strconv" "time" @@ -97,8 +98,9 @@ func (e EventJournalD) Write(ee Event) error { } // Read reads events from the journal and sends qualified events to the event channel -func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { - defer close(options.EventChannel) +func (e EventJournalD) Read(ctx context.Context, options ReadOptions) (retErr error) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until) if err != nil { return fmt.Errorf("failed to parse event filters: %w", err) @@ -117,13 +119,15 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { return err } defer func() { - if err := j.Close(); err != nil { - logrus.Errorf("Unable to close journal :%v", err) + if retErr != nil { + if err := j.Close(); err != nil { + logrus.Errorf("Unable to close journal :%v", err) + } } }() err = j.SetDataThreshold(0) if err != nil { - logrus.Warnf("cannot set data threshold: %v", err) + return fmt.Errorf("cannot set data threshold for journal: %v", err) } // match only podman journal entries podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"} @@ -158,30 +162,40 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { } } - for { - entry, err := GetNextEntry(ctx, j, options.Stream, untilTime) - if err != nil { - return err - } - // no entry == we hit the end - if entry == nil { - return nil - } + go func() { + defer close(options.EventChannel) + defer func() { + if err := j.Close(); err != nil { + logrus.Errorf("Unable to close journal :%v", err) + } + }() + for { + entry, err := GetNextEntry(ctx, j, options.Stream, untilTime) + if err != nil { + options.EventChannel <- ReadResult{Error: err} + break + } + // no entry == we hit the end + if entry == nil { + break + } - newEvent, err := newEventFromJournalEntry(entry) - if err != nil { - // We can't decode this event. - // Don't fail hard - that would make events unusable. - // Instead, log and continue. - if !errors.Is(err, ErrEventTypeBlank) { - logrus.Errorf("Unable to decode event: %v", err) + newEvent, err := newEventFromJournalEntry(entry) + if err != nil { + // We can't decode this event. + // Don't fail hard - that would make events unusable. + // Instead, log and continue. + if !errors.Is(err, ErrEventTypeBlank) { + options.EventChannel <- ReadResult{Error: fmt.Errorf("unable to decode event: %v", err)} + } + continue + } + if applyFilters(newEvent, filterMap) { + options.EventChannel <- ReadResult{Event: newEvent} } - continue - } - if applyFilters(newEvent, filterMap) { - options.EventChannel <- newEvent } - } + }() + return nil } func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) { diff --git a/libpod/events/logfile.go b/libpod/events/logfile.go index 9b69d15f61..4c0dd5e226 100644 --- a/libpod/events/logfile.go +++ b/libpod/events/logfile.go @@ -108,7 +108,6 @@ func (e EventLogFile) readRotateEvent(event *Event) (begin bool, end bool, err e // Reads from the log file func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { - defer close(options.EventChannel) filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until) if err != nil { return fmt.Errorf("failed to parse event filters: %w", err) @@ -148,56 +147,65 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { return err } - var line *tail.Line - var ok bool - var skipRotate bool - for { - select { - case <-ctx.Done(): - // the consumer has cancelled - t.Kill(errors.New("hangup by client")) - return nil - case line, ok = <-t.Lines: - if !ok { - // channel was closed - return nil + go func() { + defer close(options.EventChannel) + var line *tail.Line + var ok bool + var skipRotate bool + for { + select { + case <-ctx.Done(): + // the consumer has cancelled + t.Kill(errors.New("hangup by client")) + return + case line, ok = <-t.Lines: + if !ok { + // channel was closed + return + } + // fallthrough } - // fallthrough - } - event, err := newEventFromJSONString(line.Text) - if err != nil { - return err - } - switch event.Type { - case Image, Volume, Pod, Container, Network: - // no-op - case System: - begin, end, err := e.readRotateEvent(event) + event, err := newEventFromJSONString(line.Text) if err != nil { - return err + err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) + options.EventChannel <- ReadResult{Error: err} + continue } - if begin && event.Time.After(readTime) { - // If the rotation event happened _after_ we - // started reading, we need to ignore/skip - // subsequent event until the end of the - // rotation. - skipRotate = true - logrus.Debugf("Skipping already read events after log-file rotation: %v", event) - } else if end { - // This rotate event - skipRotate = false + switch event.Type { + case Image, Volume, Pod, Container, Network: + // no-op + case System: + begin, end, err := e.readRotateEvent(event) + if err != nil { + options.EventChannel <- ReadResult{Error: err} + continue + } + if begin && event.Time.After(readTime) { + // If the rotation event happened _after_ we + // started reading, we need to ignore/skip + // subsequent event until the end of the + // rotation. + skipRotate = true + logrus.Debugf("Skipping already read events after log-file rotation: %v", event) + } else if end { + // This rotate event + skipRotate = false + } + default: + err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) + options.EventChannel <- ReadResult{Error: err} + continue + } + if skipRotate { + continue + } + if applyFilters(event, filterMap) { + options.EventChannel <- ReadResult{Event: event} } - default: - return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) - } - if skipRotate { - continue - } - if applyFilters(event, filterMap) { - options.EventChannel <- event } - } + }() + return nil } // String returns a string representation of the logger diff --git a/pkg/api/handlers/compat/events.go b/pkg/api/handlers/compat/events.go index e449aa0cab..fa398a4a13 100644 --- a/pkg/api/handlers/compat/events.go +++ b/pkg/api/handlers/compat/events.go @@ -48,21 +48,21 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { utils.Error(w, http.StatusBadRequest, fmt.Errorf("failed to parse filters for %s: %w", r.URL.String(), err)) return } - eventChannel := make(chan *events.Event) - errorChannel := make(chan error) + eventChannel := make(chan events.ReadResult) - // Start reading events. - go func() { - readOpts := events.ReadOptions{ - FromStart: fromStart, - Stream: query.Stream, - Filters: libpodFilters, - EventChannel: eventChannel, - Since: query.Since, - Until: query.Until, - } - errorChannel <- runtime.Events(r.Context(), readOpts) - }() + readOpts := events.ReadOptions{ + FromStart: fromStart, + Stream: query.Stream, + Filters: libpodFilters, + EventChannel: eventChannel, + Since: query.Since, + Until: query.Until, + } + err = runtime.Events(r.Context(), readOpts) + if err != nil { + utils.InternalServerError(w, err) + return + } flush := func() {} if flusher, ok := w.(http.Flusher); ok { @@ -70,31 +70,29 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json") - wroteContent := false - defer func() { - if !wroteContent { - w.WriteHeader(http.StatusOK) - flush() - } - }() + w.WriteHeader(http.StatusOK) + flush() coder := json.NewEncoder(w) coder.SetEscapeHTML(true) for { select { - case err := <-errorChannel: - if err != nil { - utils.InternalServerError(w, err) - wroteContent = true - } + case <-r.Context().Done(): return - case evt := <-eventChannel: - if evt == nil { + case evt, ok := <-eventChannel: + if !ok { + return + } + if evt.Error != nil { + logrus.Errorf("Unable to read event: %q", err) + continue + } + if evt.Event == nil { continue } - e := entities.ConvertToEntitiesEvent(*evt) + e := entities.ConvertToEntitiesEvent(*evt.Event) // Some events differ between Libpod and Docker endpoints. // Handle these differences for Docker-compat. if !utils.IsLibpodRequest(r) && e.Type == "image" && e.Status == "remove" { @@ -110,10 +108,7 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { if err := coder.Encode(e); err != nil { logrus.Errorf("Unable to write json: %q", err) } - wroteContent = true flush() - case <-r.Context().Done(): - return } } } diff --git a/pkg/api/handlers/utils/containers.go b/pkg/api/handlers/utils/containers.go index 585899509f..363fc044fd 100644 --- a/pkg/api/handlers/utils/containers.go +++ b/pkg/api/handlers/utils/containers.go @@ -236,8 +236,7 @@ func waitRemoved(ctrWait containerWaitFn) (int32, error) { func waitNextExit(ctx context.Context, containerName string) (int32, error) { runtime := ctx.Value(api.RuntimeKey).(*libpod.Runtime) containerEngine := &abi.ContainerEngine{Libpod: runtime} - eventChannel := make(chan *events.Event) - errChannel := make(chan error) + eventChannel := make(chan events.ReadResult) opts := entities.EventsOptions{ EventChan: eventChannel, Filter: []string{"event=died", fmt.Sprintf("container=%s", containerName)}, @@ -247,21 +246,22 @@ func waitNextExit(ctx context.Context, containerName string) (int32, error) { // ctx is used to cancel event watching goroutine ctx, cancel := context.WithCancel(ctx) defer cancel() - go func() { - errChannel <- containerEngine.Events(ctx, opts) - }() - - evt, ok := <-eventChannel - if ok { - if evt.ContainerExitCode != nil { - return int32(*evt.ContainerExitCode), nil + err := containerEngine.Events(ctx, opts) + if err != nil { + return -1, err + } + + for evt := range eventChannel { + if evt.Error == nil { + if evt.Event.ContainerExitCode != nil { + return int32(*evt.Event.ContainerExitCode), nil + } } - return -1, nil } - // if ok == false then containerEngine.Events() has exited + // if we are here then containerEngine.Events() has exited // it may happen if request was canceled (e.g. client closed connection prematurely) or // the server is in process of shutting down - return -1, <-errChannel + return -1, nil } func waitNotRunning(ctrWait containerWaitFn) (int32, error) { diff --git a/pkg/bindings/system/system.go b/pkg/bindings/system/system.go index a41bfb1f8e..b2eacd4a69 100644 --- a/pkg/bindings/system/system.go +++ b/pkg/bindings/system/system.go @@ -3,9 +3,7 @@ package system import ( "context" "encoding/json" - "errors" "fmt" - "io" "net/http" "time" @@ -31,7 +29,6 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo if err != nil { return err } - defer response.Body.Close() if cancelChan != nil { go func() { @@ -43,26 +40,23 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo } if response.StatusCode != http.StatusOK { + defer response.Body.Close() return response.Process(nil) } - dec := json.NewDecoder(response.Body) - for err = (error)(nil); err == nil; { - var e = types.Event{} - err = dec.Decode(&e) - if err == nil { - eventChan <- e + go func() { + defer response.Body.Close() + defer close(eventChan) + dec := json.NewDecoder(response.Body) + for err = (error)(nil); err == nil; { + var e = types.Event{} + err = dec.Decode(&e) + if err == nil { + eventChan <- e + } } - } - close(eventChan) - switch { - case err == nil: - return nil - case errors.Is(err, io.EOF): - return nil - default: - return fmt.Errorf("unable to decode event response: %w", err) - } + }() + return nil } // Prune removes all unused system data. diff --git a/pkg/domain/entities/types.go b/pkg/domain/entities/types.go index 52901c03f2..277460ed75 100644 --- a/pkg/domain/entities/types.go +++ b/pkg/domain/entities/types.go @@ -90,7 +90,7 @@ type DiffReport struct { type EventsOptions struct { FromStart bool - EventChan chan *events.Event + EventChan chan events.ReadResult Filter []string Stream bool Since string diff --git a/pkg/domain/infra/tunnel/events.go b/pkg/domain/infra/tunnel/events.go index f893e9508c..14af2e8546 100644 --- a/pkg/domain/infra/tunnel/events.go +++ b/pkg/domain/infra/tunnel/events.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/pkg/bindings/system" "github.com/containers/podman/v5/pkg/domain/entities" ) @@ -23,7 +24,7 @@ func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptio binChan := make(chan entities.Event) go func() { for e := range binChan { - opts.EventChan <- entities.ConvertToLibpodEvent(e) + opts.EventChan <- events.ReadResult{Event: entities.ConvertToLibpodEvent(e)} } close(opts.EventChan) }() diff --git a/test/apiv2/27-containersEvents.at b/test/apiv2/27-containersEvents.at index e57ee46777..1f7ac08014 100644 --- a/test/apiv2/27-containersEvents.at +++ b/test/apiv2/27-containersEvents.at @@ -32,4 +32,8 @@ t GET "events?stream=false&since=$START&type=remove" 200 \ 'select(.status | contains("remove")).Action=remove' \ 'select(.status | contains("remove")).Actor.Attributes.containerExitCode=1' +APIV2_TEST_EXPECT_TIMEOUT=1 t GET "events?stream=true" 999 +like "$(<$WORKDIR/curl.headers.out)" ".*HTTP.* 200 OK.*" \ + "Received headers from /events" + # vim: filetype=sh