From 63487783482b451d482e394b92b43ac34c46fd75 Mon Sep 17 00:00:00 2001 From: Paul Holzinger Date: Tue, 29 Oct 2024 14:19:28 +0100 Subject: [PATCH 1/4] libpod: log file use Wait() over event API Using the internal Wait() API over the events API as this is much more efficient. Reading events will need to read a lot of data otherwise. For the function here it should work fine and it is even better as it does not depend on the event logger at all. Signed-off-by: Paul Holzinger --- libpod/container_log.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/libpod/container_log.go b/libpod/container_log.go index ea76c7ea39..91c0a50ccf 100644 --- a/libpod/container_log.go +++ b/libpod/container_log.go @@ -10,7 +10,6 @@ import ( "time" "github.com/containers/podman/v5/libpod/define" - "github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/libpod/logs" systemdDefine "github.com/containers/podman/v5/pkg/systemd/define" "github.com/nxadm/tail" @@ -139,20 +138,10 @@ func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOption // The container is running, so we need to wait until the container exited go func() { - eventChannel := make(chan *events.Event) - eventOptions := events.ReadOptions{ - EventChannel: eventChannel, - Filters: []string{"event=died", "container=" + c.ID()}, - Stream: true, + _, err = c.Wait(ctx) + if err != nil && !errors.Is(err, define.ErrNoSuchCtr) { + logrus.Errorf("Waiting for container to exit: %v", err) } - go func() { - if err := c.runtime.Events(ctx, eventOptions); err != nil { - logrus.Errorf("Waiting for container to exit: %v", err) - } - }() - // Now wait for the died event and signal to finish - // reading the log until EOF. - <-eventChannel // Make sure to wait at least for the poll duration // before stopping the file logger (see #10675). time.Sleep(watch.POLL_DURATION) From e3abf5c9e87de06a934aa025a0b4538ef241a3a7 Mon Sep 17 00:00:00 2001 From: Paul Holzinger Date: Tue, 29 Oct 2024 15:30:07 +0100 Subject: [PATCH 2/4] events: remove memory eventer This type is unsused, undocumented and basically broken. If this would be used anywhere it will just deadlock after writing 100+ events without reading as the channel will just be full. It was added in commit 8da5f3f733 but never used there nor is there any justification why this was added in the commit message or PR comments. Signed-off-by: Paul Holzinger --- libpod/events/config.go | 2 -- libpod/events/events.go | 4 --- libpod/events/events_freebsd.go | 2 -- libpod/events/events_linux.go | 2 -- libpod/events/memory.go | 49 --------------------------------- 5 files changed, 59 deletions(-) delete mode 100644 libpod/events/memory.go diff --git a/libpod/events/config.go b/libpod/events/config.go index 1927c5a6d3..9e3ae6bae8 100644 --- a/libpod/events/config.go +++ b/libpod/events/config.go @@ -16,8 +16,6 @@ const ( Journald EventerType = iota // Null is a no-op events logger. It does not read or write events. Null EventerType = iota - // Memory indicates the event logger will hold events in memory - Memory EventerType = iota ) // Event describes the attributes of a libpod event diff --git a/libpod/events/events.go b/libpod/events/events.go index 084be84cab..9f9b6f6f8e 100644 --- a/libpod/events/events.go +++ b/libpod/events/events.go @@ -20,8 +20,6 @@ func (et EventerType) String() string { return "file" case Journald: return "journald" - case Memory: - return "memory" case Null: return "none" default: @@ -36,8 +34,6 @@ func IsValidEventer(eventer string) bool { return true case Journald.String(): return true - case Memory.String(): - return true case Null.String(): return true default: diff --git a/libpod/events/events_freebsd.go b/libpod/events/events_freebsd.go index 90933fa2cc..116398d06e 100644 --- a/libpod/events/events_freebsd.go +++ b/libpod/events/events_freebsd.go @@ -15,8 +15,6 @@ func NewEventer(options EventerOptions) (Eventer, error) { return EventLogFile{options}, nil case strings.ToUpper(Null.String()): return newNullEventer(), nil - case strings.ToUpper(Memory.String()): - return NewMemoryEventer(), nil default: return nil, fmt.Errorf("unknown event logger type: %s", strings.ToUpper(options.EventerType)) } diff --git a/libpod/events/events_linux.go b/libpod/events/events_linux.go index 66b125dd5f..f17f9708a8 100644 --- a/libpod/events/events_linux.go +++ b/libpod/events/events_linux.go @@ -21,8 +21,6 @@ func NewEventer(options EventerOptions) (Eventer, error) { return newLogFileEventer(options) case strings.ToUpper(Null.String()): return newNullEventer(), nil - case strings.ToUpper(Memory.String()): - return NewMemoryEventer(), nil default: return nil, fmt.Errorf("unknown event logger type: %s", strings.ToUpper(options.EventerType)) } diff --git a/libpod/events/memory.go b/libpod/events/memory.go deleted file mode 100644 index b3e03d86bc..0000000000 --- a/libpod/events/memory.go +++ /dev/null @@ -1,49 +0,0 @@ -package events - -import ( - "context" -) - -// EventMemory is the structure for event writing to a channel. It contains the eventer -// options and the event itself. Methods for reading and writing are also defined from it. -type EventMemory struct { - options EventerOptions - elements chan *Event -} - -// Write event to memory queue -func (e EventMemory) Write(event Event) (err error) { - e.elements <- &event - return -} - -// Read event(s) from memory queue -func (e EventMemory) Read(ctx context.Context, options ReadOptions) (err error) { - select { - case <-ctx.Done(): - return - default: - } - - select { - case event := <-e.elements: - options.EventChannel <- event - default: - } - return nil -} - -// String returns eventer type -func (e EventMemory) String() string { - return e.options.EventerType -} - -// NewMemoryEventer returns configured MemoryEventer -func NewMemoryEventer() Eventer { - return EventMemory{ - options: EventerOptions{ - EventerType: Memory.String(), - }, - elements: make(chan *Event, 100), - } -} From 768ad8653af4cebd069a865a5dbf4a7ec8517444 Mon Sep 17 00:00:00 2001 From: Paul Holzinger Date: Tue, 29 Oct 2024 15:26:45 +0100 Subject: [PATCH 3/4] rework event code to improve API errors One of the problems with the Events() API was that you had to call it in a new goroutine. This meant the the error returned by it had to be read back via a second channel. This cuased other bugs in the past but here the biggest problem is that basic errors such as invalid since/until options were not directly returned to the caller. It meant in the API we were not able to write http code 200 quickly because we always waited for the first event or error from the channels. This in turn made some clients not happy as they assume the server hangs on time out if no such events are generated. To fix this we resturcture the entire event flow. First we spawn the goroutine inside the eventer Read() function so not all the callers have to. Then we can return the basic error quickly without the goroutine. The caller then checks the error like any normal function and the API can use this one to decide which status code to return. Second we now return errors/event in one channel then the callers can decide to ignore or log them which makes it a bit more clear. Fixes c46884aa93 ("podman events: check for an error after we finish reading events") Fixes #23712 Signed-off-by: Paul Holzinger --- cmd/podman/system/events.go | 57 +++++++--------- libpod/events.go | 49 ++++---------- libpod/events/config.go | 7 +- libpod/events/journal_linux.go | 66 +++++++++++-------- libpod/events/logfile.go | 98 +++++++++++++++------------- pkg/api/handlers/compat/events.go | 59 ++++++++--------- pkg/api/handlers/utils/containers.go | 26 ++++---- pkg/bindings/system/system.go | 32 ++++----- pkg/domain/entities/types.go | 2 +- pkg/domain/infra/tunnel/events.go | 3 +- test/apiv2/27-containersEvents.at | 4 ++ 11 files changed, 195 insertions(+), 208 deletions(-) diff --git a/cmd/podman/system/events.go b/cmd/podman/system/events.go index b4e1317d9e..fa4ce5f95e 100644 --- a/cmd/podman/system/events.go +++ b/cmd/podman/system/events.go @@ -12,6 +12,7 @@ import ( "github.com/containers/podman/v5/cmd/podman/validate" "github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/pkg/domain/entities" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -139,9 +140,8 @@ func eventsCmd(cmd *cobra.Command, _ []string) error { if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 { eventOptions.FromStart = true } - eventChannel := make(chan *events.Event, 1) + eventChannel := make(chan events.ReadResult, 1) eventOptions.EventChan = eventChannel - errChannel := make(chan error) var ( rpt *report.Formatter @@ -161,40 +161,31 @@ func eventsCmd(cmd *cobra.Command, _ []string) error { } } - go func() { - errChannel <- registry.ContainerEngine().Events(context.Background(), eventOptions) - close(errChannel) - }() - - for { - select { - case event, ok := <-eventChannel: - if !ok { - // channel was closed we can exit - // read the error channel blocking to make sure we are not missing any errors (#23165) - return <-errChannel - } - switch { - case doJSON: - e := newEventFromLibpodEvent(event) - jsonStr, err := e.ToJSONString() - if err != nil { - return err - } - fmt.Println(jsonStr) - case cmd.Flags().Changed("format"): - if err := rpt.Execute(newEventFromLibpodEvent(event)); err != nil { - return err - } - default: - fmt.Println(event.ToHumanReadable(!noTrunc)) - } - case err := <-errChannel: - // only exit in case of an error, - // otherwise keep reading events until the event channel is closed + err := registry.ContainerEngine().Events(context.Background(), eventOptions) + if err != nil { + return err + } + + for evt := range eventChannel { + if evt.Error != nil { + logrus.Errorf("Failed to read event: %v", evt.Error) + continue + } + switch { + case doJSON: + e := newEventFromLibpodEvent(evt.Event) + jsonStr, err := e.ToJSONString() if err != nil { return err } + fmt.Println(jsonStr) + case cmd.Flags().Changed("format"): + if err := rpt.Execute(newEventFromLibpodEvent(evt.Event)); err != nil { + return err + } + default: + fmt.Println(evt.Event.ToHumanReadable(!noTrunc)) } } + return nil } diff --git a/libpod/events.go b/libpod/events.go index 6189ee895a..daaacdd868 100644 --- a/libpod/events.go +++ b/libpod/events.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "path/filepath" - "sync" "github.com/containers/podman/v5/libpod/define" "github.com/containers/podman/v5/libpod/events" @@ -187,7 +186,7 @@ func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error // GetEvents reads the event log and returns events based on input filters func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) { - eventChannel := make(chan *events.Event) + eventChannel := make(chan events.ReadResult) options := events.ReadOptions{ EventChannel: eventChannel, Filters: filters, @@ -195,45 +194,21 @@ func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Ev Stream: false, } - logEvents := make([]*events.Event, 0, len(eventChannel)) - readLock := sync.Mutex{} - readLock.Lock() - go func() { - for e := range eventChannel { - logEvents = append(logEvents, e) - } - readLock.Unlock() - }() - - readErr := r.eventer.Read(ctx, options) - readLock.Lock() // Wait for the events to be consumed. - return logEvents, readErr -} - -// GetLastContainerEvent takes a container name or ID and an event status and returns -// the last occurrence of the container event -func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) { - // FIXME: events should be read in reverse order! - // https://github.com/containers/podman/issues/14579 - - // check to make sure the event.Status is valid - if _, err := events.StringToStatus(containerEvent.String()); err != nil { - return nil, err - } - filters := []string{ - fmt.Sprintf("container=%s", nameOrID), - fmt.Sprintf("event=%s", containerEvent), - "type=container", - } - containerEvents, err := r.GetEvents(ctx, filters) + err := r.eventer.Read(ctx, options) if err != nil { return nil, err } - if len(containerEvents) < 1 { - return nil, fmt.Errorf("%s not found: %w", containerEvent.String(), events.ErrEventNotFound) + + logEvents := make([]*events.Event, 0, len(eventChannel)) + for evt := range eventChannel { + // we ignore any error here, this is only used on the backup + // GetExecDiedEvent() died path as best effort anyway + if evt.Error == nil { + logEvents = append(logEvents, evt.Event) + } } - // return the last element in the slice - return containerEvents[len(containerEvents)-1], nil + + return logEvents, nil } // GetExecDiedEvent takes a container name or ID, exec session ID, and returns diff --git a/libpod/events/config.go b/libpod/events/config.go index 9e3ae6bae8..6337e4c155 100644 --- a/libpod/events/config.go +++ b/libpod/events/config.go @@ -85,10 +85,15 @@ type Eventer interface { String() string } +type ReadResult struct { + Event *Event + Error error +} + // ReadOptions describe the attributes needed to read event logs type ReadOptions struct { // EventChannel is the comm path back to user - EventChannel chan *Event + EventChannel chan ReadResult // Filters are key/value pairs that describe to limit output Filters []string // FromStart means you start reading from the start of the logs diff --git a/libpod/events/journal_linux.go b/libpod/events/journal_linux.go index fe23a40379..147d1ea60a 100644 --- a/libpod/events/journal_linux.go +++ b/libpod/events/journal_linux.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "runtime" "strconv" "time" @@ -97,8 +98,9 @@ func (e EventJournalD) Write(ee Event) error { } // Read reads events from the journal and sends qualified events to the event channel -func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { - defer close(options.EventChannel) +func (e EventJournalD) Read(ctx context.Context, options ReadOptions) (retErr error) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until) if err != nil { return fmt.Errorf("failed to parse event filters: %w", err) @@ -117,13 +119,15 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { return err } defer func() { - if err := j.Close(); err != nil { - logrus.Errorf("Unable to close journal :%v", err) + if retErr != nil { + if err := j.Close(); err != nil { + logrus.Errorf("Unable to close journal :%v", err) + } } }() err = j.SetDataThreshold(0) if err != nil { - logrus.Warnf("cannot set data threshold: %v", err) + return fmt.Errorf("cannot set data threshold for journal: %v", err) } // match only podman journal entries podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"} @@ -158,30 +162,40 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { } } - for { - entry, err := GetNextEntry(ctx, j, options.Stream, untilTime) - if err != nil { - return err - } - // no entry == we hit the end - if entry == nil { - return nil - } + go func() { + defer close(options.EventChannel) + defer func() { + if err := j.Close(); err != nil { + logrus.Errorf("Unable to close journal :%v", err) + } + }() + for { + entry, err := GetNextEntry(ctx, j, options.Stream, untilTime) + if err != nil { + options.EventChannel <- ReadResult{Error: err} + break + } + // no entry == we hit the end + if entry == nil { + break + } - newEvent, err := newEventFromJournalEntry(entry) - if err != nil { - // We can't decode this event. - // Don't fail hard - that would make events unusable. - // Instead, log and continue. - if !errors.Is(err, ErrEventTypeBlank) { - logrus.Errorf("Unable to decode event: %v", err) + newEvent, err := newEventFromJournalEntry(entry) + if err != nil { + // We can't decode this event. + // Don't fail hard - that would make events unusable. + // Instead, log and continue. + if !errors.Is(err, ErrEventTypeBlank) { + options.EventChannel <- ReadResult{Error: fmt.Errorf("unable to decode event: %v", err)} + } + continue + } + if applyFilters(newEvent, filterMap) { + options.EventChannel <- ReadResult{Event: newEvent} } - continue - } - if applyFilters(newEvent, filterMap) { - options.EventChannel <- newEvent } - } + }() + return nil } func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) { diff --git a/libpod/events/logfile.go b/libpod/events/logfile.go index 9b69d15f61..4c0dd5e226 100644 --- a/libpod/events/logfile.go +++ b/libpod/events/logfile.go @@ -108,7 +108,6 @@ func (e EventLogFile) readRotateEvent(event *Event) (begin bool, end bool, err e // Reads from the log file func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { - defer close(options.EventChannel) filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until) if err != nil { return fmt.Errorf("failed to parse event filters: %w", err) @@ -148,56 +147,65 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { return err } - var line *tail.Line - var ok bool - var skipRotate bool - for { - select { - case <-ctx.Done(): - // the consumer has cancelled - t.Kill(errors.New("hangup by client")) - return nil - case line, ok = <-t.Lines: - if !ok { - // channel was closed - return nil + go func() { + defer close(options.EventChannel) + var line *tail.Line + var ok bool + var skipRotate bool + for { + select { + case <-ctx.Done(): + // the consumer has cancelled + t.Kill(errors.New("hangup by client")) + return + case line, ok = <-t.Lines: + if !ok { + // channel was closed + return + } + // fallthrough } - // fallthrough - } - event, err := newEventFromJSONString(line.Text) - if err != nil { - return err - } - switch event.Type { - case Image, Volume, Pod, Container, Network: - // no-op - case System: - begin, end, err := e.readRotateEvent(event) + event, err := newEventFromJSONString(line.Text) if err != nil { - return err + err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) + options.EventChannel <- ReadResult{Error: err} + continue } - if begin && event.Time.After(readTime) { - // If the rotation event happened _after_ we - // started reading, we need to ignore/skip - // subsequent event until the end of the - // rotation. - skipRotate = true - logrus.Debugf("Skipping already read events after log-file rotation: %v", event) - } else if end { - // This rotate event - skipRotate = false + switch event.Type { + case Image, Volume, Pod, Container, Network: + // no-op + case System: + begin, end, err := e.readRotateEvent(event) + if err != nil { + options.EventChannel <- ReadResult{Error: err} + continue + } + if begin && event.Time.After(readTime) { + // If the rotation event happened _after_ we + // started reading, we need to ignore/skip + // subsequent event until the end of the + // rotation. + skipRotate = true + logrus.Debugf("Skipping already read events after log-file rotation: %v", event) + } else if end { + // This rotate event + skipRotate = false + } + default: + err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) + options.EventChannel <- ReadResult{Error: err} + continue + } + if skipRotate { + continue + } + if applyFilters(event, filterMap) { + options.EventChannel <- ReadResult{Event: event} } - default: - return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) - } - if skipRotate { - continue - } - if applyFilters(event, filterMap) { - options.EventChannel <- event } - } + }() + return nil } // String returns a string representation of the logger diff --git a/pkg/api/handlers/compat/events.go b/pkg/api/handlers/compat/events.go index e449aa0cab..fa398a4a13 100644 --- a/pkg/api/handlers/compat/events.go +++ b/pkg/api/handlers/compat/events.go @@ -48,21 +48,21 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { utils.Error(w, http.StatusBadRequest, fmt.Errorf("failed to parse filters for %s: %w", r.URL.String(), err)) return } - eventChannel := make(chan *events.Event) - errorChannel := make(chan error) + eventChannel := make(chan events.ReadResult) - // Start reading events. - go func() { - readOpts := events.ReadOptions{ - FromStart: fromStart, - Stream: query.Stream, - Filters: libpodFilters, - EventChannel: eventChannel, - Since: query.Since, - Until: query.Until, - } - errorChannel <- runtime.Events(r.Context(), readOpts) - }() + readOpts := events.ReadOptions{ + FromStart: fromStart, + Stream: query.Stream, + Filters: libpodFilters, + EventChannel: eventChannel, + Since: query.Since, + Until: query.Until, + } + err = runtime.Events(r.Context(), readOpts) + if err != nil { + utils.InternalServerError(w, err) + return + } flush := func() {} if flusher, ok := w.(http.Flusher); ok { @@ -70,31 +70,29 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json") - wroteContent := false - defer func() { - if !wroteContent { - w.WriteHeader(http.StatusOK) - flush() - } - }() + w.WriteHeader(http.StatusOK) + flush() coder := json.NewEncoder(w) coder.SetEscapeHTML(true) for { select { - case err := <-errorChannel: - if err != nil { - utils.InternalServerError(w, err) - wroteContent = true - } + case <-r.Context().Done(): return - case evt := <-eventChannel: - if evt == nil { + case evt, ok := <-eventChannel: + if !ok { + return + } + if evt.Error != nil { + logrus.Errorf("Unable to read event: %q", err) + continue + } + if evt.Event == nil { continue } - e := entities.ConvertToEntitiesEvent(*evt) + e := entities.ConvertToEntitiesEvent(*evt.Event) // Some events differ between Libpod and Docker endpoints. // Handle these differences for Docker-compat. if !utils.IsLibpodRequest(r) && e.Type == "image" && e.Status == "remove" { @@ -110,10 +108,7 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { if err := coder.Encode(e); err != nil { logrus.Errorf("Unable to write json: %q", err) } - wroteContent = true flush() - case <-r.Context().Done(): - return } } } diff --git a/pkg/api/handlers/utils/containers.go b/pkg/api/handlers/utils/containers.go index 585899509f..363fc044fd 100644 --- a/pkg/api/handlers/utils/containers.go +++ b/pkg/api/handlers/utils/containers.go @@ -236,8 +236,7 @@ func waitRemoved(ctrWait containerWaitFn) (int32, error) { func waitNextExit(ctx context.Context, containerName string) (int32, error) { runtime := ctx.Value(api.RuntimeKey).(*libpod.Runtime) containerEngine := &abi.ContainerEngine{Libpod: runtime} - eventChannel := make(chan *events.Event) - errChannel := make(chan error) + eventChannel := make(chan events.ReadResult) opts := entities.EventsOptions{ EventChan: eventChannel, Filter: []string{"event=died", fmt.Sprintf("container=%s", containerName)}, @@ -247,21 +246,22 @@ func waitNextExit(ctx context.Context, containerName string) (int32, error) { // ctx is used to cancel event watching goroutine ctx, cancel := context.WithCancel(ctx) defer cancel() - go func() { - errChannel <- containerEngine.Events(ctx, opts) - }() - - evt, ok := <-eventChannel - if ok { - if evt.ContainerExitCode != nil { - return int32(*evt.ContainerExitCode), nil + err := containerEngine.Events(ctx, opts) + if err != nil { + return -1, err + } + + for evt := range eventChannel { + if evt.Error == nil { + if evt.Event.ContainerExitCode != nil { + return int32(*evt.Event.ContainerExitCode), nil + } } - return -1, nil } - // if ok == false then containerEngine.Events() has exited + // if we are here then containerEngine.Events() has exited // it may happen if request was canceled (e.g. client closed connection prematurely) or // the server is in process of shutting down - return -1, <-errChannel + return -1, nil } func waitNotRunning(ctrWait containerWaitFn) (int32, error) { diff --git a/pkg/bindings/system/system.go b/pkg/bindings/system/system.go index a41bfb1f8e..b2eacd4a69 100644 --- a/pkg/bindings/system/system.go +++ b/pkg/bindings/system/system.go @@ -3,9 +3,7 @@ package system import ( "context" "encoding/json" - "errors" "fmt" - "io" "net/http" "time" @@ -31,7 +29,6 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo if err != nil { return err } - defer response.Body.Close() if cancelChan != nil { go func() { @@ -43,26 +40,23 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo } if response.StatusCode != http.StatusOK { + defer response.Body.Close() return response.Process(nil) } - dec := json.NewDecoder(response.Body) - for err = (error)(nil); err == nil; { - var e = types.Event{} - err = dec.Decode(&e) - if err == nil { - eventChan <- e + go func() { + defer response.Body.Close() + defer close(eventChan) + dec := json.NewDecoder(response.Body) + for err = (error)(nil); err == nil; { + var e = types.Event{} + err = dec.Decode(&e) + if err == nil { + eventChan <- e + } } - } - close(eventChan) - switch { - case err == nil: - return nil - case errors.Is(err, io.EOF): - return nil - default: - return fmt.Errorf("unable to decode event response: %w", err) - } + }() + return nil } // Prune removes all unused system data. diff --git a/pkg/domain/entities/types.go b/pkg/domain/entities/types.go index 52901c03f2..277460ed75 100644 --- a/pkg/domain/entities/types.go +++ b/pkg/domain/entities/types.go @@ -90,7 +90,7 @@ type DiffReport struct { type EventsOptions struct { FromStart bool - EventChan chan *events.Event + EventChan chan events.ReadResult Filter []string Stream bool Since string diff --git a/pkg/domain/infra/tunnel/events.go b/pkg/domain/infra/tunnel/events.go index f893e9508c..14af2e8546 100644 --- a/pkg/domain/infra/tunnel/events.go +++ b/pkg/domain/infra/tunnel/events.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/pkg/bindings/system" "github.com/containers/podman/v5/pkg/domain/entities" ) @@ -23,7 +24,7 @@ func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptio binChan := make(chan entities.Event) go func() { for e := range binChan { - opts.EventChan <- entities.ConvertToLibpodEvent(e) + opts.EventChan <- events.ReadResult{Event: entities.ConvertToLibpodEvent(e)} } close(opts.EventChan) }() diff --git a/test/apiv2/27-containersEvents.at b/test/apiv2/27-containersEvents.at index e57ee46777..1f7ac08014 100644 --- a/test/apiv2/27-containersEvents.at +++ b/test/apiv2/27-containersEvents.at @@ -32,4 +32,8 @@ t GET "events?stream=false&since=$START&type=remove" 200 \ 'select(.status | contains("remove")).Action=remove' \ 'select(.status | contains("remove")).Actor.Attributes.containerExitCode=1' +APIV2_TEST_EXPECT_TIMEOUT=1 t GET "events?stream=true" 999 +like "$(<$WORKDIR/curl.headers.out)" ".*HTTP.* 200 OK.*" \ + "Received headers from /events" + # vim: filetype=sh From e6d987882eec98e0ced1b11858921401abecb705 Mon Sep 17 00:00:00 2001 From: Paul Holzinger Date: Tue, 29 Oct 2024 16:29:02 +0100 Subject: [PATCH 4/4] API: container logs flush status code API clients expect the status code quickly otherwise they can time out. If we do not flush we may not write the header immediately and only when futher logs are send. Fixes #23712 Signed-off-by: Paul Holzinger --- pkg/api/handlers/compat/containers_logs.go | 11 ++++++++--- test/apiv2/20-containers.at | 7 +++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/api/handlers/compat/containers_logs.go b/pkg/api/handlers/compat/containers_logs.go index b1440ff9dd..f956a2b05c 100644 --- a/pkg/api/handlers/compat/containers_logs.go +++ b/pkg/api/handlers/compat/containers_logs.go @@ -106,6 +106,13 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) + flush := func() { + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + } + flush() + var frame strings.Builder header := make([]byte, 8) @@ -167,8 +174,6 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { if _, err := io.WriteString(w, frame.String()); err != nil { log.Errorf("unable to write frame string: %q", err) } - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } + flush() } } diff --git a/test/apiv2/20-containers.at b/test/apiv2/20-containers.at index 41039f22d7..c2737f006a 100644 --- a/test/apiv2/20-containers.at +++ b/test/apiv2/20-containers.at @@ -150,6 +150,13 @@ podman run --name $CTRNAME -d $IMAGE sleep 25 t GET containers/$CTRNAME/top?stream=false 200 \ .Processes.[0].[6]="00:00:00" \ .Processes.[0].[7]="sleep 25" + +# check logs output, IMPORTANT the container should write no logs to reproduce #23712 +APIV2_TEST_EXPECT_TIMEOUT=1 t GET "containers/${CTRNAME}/logs?follow=true&stdout=true&stderr=true" 999 +is "" "$(<$WORKDIR/curl.result.out)" "Container MUST NOT log output" +like "$(<$WORKDIR/curl.headers.out)" ".*HTTP.* 200 OK.*" \ + "Received headers from /container//logs" + podman rm -f -t0 $CTRNAME CTRNAME=test123