diff --git a/libpod/events/journal_linux.go b/libpod/events/journal_linux.go index d341ca7b5f..7c2a3e0f2d 100644 --- a/libpod/events/journal_linux.go +++ b/libpod/events/journal_linux.go @@ -90,6 +90,13 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { return err } for { + select { + case <-ctx.Done(): + // the consumer has cancelled + return nil + default: + // fallthrough + } if _, err := j.Next(); err != nil { return err } diff --git a/libpod/events/logfile.go b/libpod/events/logfile.go index 28d0dc07ee..b701024500 100644 --- a/libpod/events/logfile.go +++ b/libpod/events/logfile.go @@ -63,6 +63,14 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { } }() for line := range t.Lines { + select { + case <-ctx.Done(): + // the consumer has cancelled + return nil + default: + // fallthrough + } + event, err := newEventFromJSONString(line.Text) if err != nil { return err diff --git a/pkg/api/handlers/compat/events.go b/pkg/api/handlers/compat/events.go index 5acc94153f..3fc8248d6b 100644 --- a/pkg/api/handlers/compat/events.go +++ b/pkg/api/handlers/compat/events.go @@ -1,9 +1,9 @@ package compat import ( - "context" "fmt" "net/http" + "sync" "github.com/containers/libpod/v2/libpod" "github.com/containers/libpod/v2/libpod/events" @@ -17,10 +17,10 @@ import ( func GetEvents(w http.ResponseWriter, r *http.Request) { var ( - fromStart bool - eventsError error - decoder = r.Context().Value("decoder").(*schema.Decoder) - runtime = r.Context().Value("runtime").(*libpod.Runtime) + fromStart bool + decoder = r.Context().Value("decoder").(*schema.Decoder) + runtime = r.Context().Value("runtime").(*libpod.Runtime) + json = jsoniter.ConfigCompatibleWithStandardLibrary // FIXME: this should happen on the package level ) query := struct { @@ -33,11 +33,16 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { } if err := decoder.Decode(&query, r.URL.Query()); err != nil { utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String())) + return } var libpodFilters = []string{} if _, found := r.URL.Query()["filters"]; found { for k, v := range query.Filters { + if len(v) == 0 { + utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Errorf("empty value for filter %q", k)) + return + } libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", k, v[0])) } } @@ -46,46 +51,57 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { fromStart = true } - eventCtx, eventCancel := context.WithCancel(r.Context()) eventChannel := make(chan *events.Event) + errorChannel := make(chan error) + + // Start reading events. go func() { - readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until} - eventsError = runtime.Events(eventCtx, readOpts) + readOpts := events.ReadOptions{ + FromStart: fromStart, + Stream: query.Stream, + Filters: libpodFilters, + EventChannel: eventChannel, + Since: query.Since, + Until: query.Until, + } + errorChannel <- runtime.Events(r.Context(), readOpts) }() - if eventsError != nil { - utils.InternalServerError(w, eventsError) - eventCancel() - close(eventChannel) - return - } - // If client disappears we need to stop listening for events - go func(done <-chan struct{}) { - <-done - eventCancel() - if _, ok := <-eventChannel; ok { - close(eventChannel) - } - }(r.Context().Done()) + var coder *jsoniter.Encoder + var writeHeader sync.Once - // Headers need to be written out before turning Writer() over to json encoder - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } + for stream := true; stream; stream = query.Stream { + select { + case err := <-errorChannel: + if err != nil { + utils.InternalServerError(w, err) + return + } + case evt := <-eventChannel: + writeHeader.Do(func() { + // Use a sync.Once so that we write the header + // only once. + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + coder = json.NewEncoder(w) + coder.SetEscapeHTML(true) + }) - json := jsoniter.ConfigCompatibleWithStandardLibrary - coder := json.NewEncoder(w) - coder.SetEscapeHTML(true) + if evt == nil { + continue + } - for event := range eventChannel { - e := entities.ConvertToEntitiesEvent(*event) - if err := coder.Encode(e); err != nil { - logrus.Errorf("unable to write json: %q", err) - } - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() + e := entities.ConvertToEntitiesEvent(*evt) + if err := coder.Encode(e); err != nil { + logrus.Errorf("unable to write json: %q", err) + } + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } } + } } diff --git a/pkg/bindings/test/system_test.go b/pkg/bindings/test/system_test.go index 93141400bf..430184f4a6 100644 --- a/pkg/bindings/test/system_test.go +++ b/pkg/bindings/test/system_test.go @@ -1,6 +1,7 @@ package test_bindings import ( + "sync" "time" "github.com/containers/libpod/v2/pkg/bindings" @@ -38,22 +39,28 @@ var _ = Describe("Podman system", func() { }) It("podman events", func() { - eChan := make(chan entities.Event, 1) - var messages []entities.Event - cancelChan := make(chan bool, 1) + var name = "top" + _, err := bt.RunTopContainer(&name, bindings.PFalse, nil) + Expect(err).To(BeNil()) + + filters := make(map[string][]string) + filters["container"] = []string{name} + + binChan := make(chan entities.Event) + done := sync.Mutex{} + done.Lock() + eventCounter := 0 go func() { - for e := range eChan { - messages = append(messages, e) + defer done.Unlock() + for range binChan { + eventCounter++ } }() - go func() { - system.Events(bt.conn, eChan, cancelChan, nil, nil, nil, bindings.PFalse) - }() - _, err := bt.RunTopContainer(nil, nil, nil) + err = system.Events(bt.conn, binChan, nil, nil, nil, filters, bindings.PFalse) Expect(err).To(BeNil()) - cancelChan <- true - Expect(len(messages)).To(BeNumerically("==", 5)) + done.Lock() + Expect(eventCounter).To(BeNumerically(">", 0)) }) It("podman system prune - pod,container stopped", func() { diff --git a/test/e2e/events_test.go b/test/e2e/events_test.go index 7f95502558..3996b32a9e 100644 --- a/test/e2e/events_test.go +++ b/test/e2e/events_test.go @@ -136,6 +136,7 @@ var _ = Describe("Podman events", func() { Expect(ec).To(Equal(0)) test := podmanTest.Podman([]string{"events", "--stream=false", "--format", "json"}) test.WaitWithDefaultTimeout() + Expect(test.ExitCode()).To(BeZero()) jsonArr := test.OutputToStringArray() Expect(len(jsonArr)).To(Not(BeZero())) eventsMap := make(map[string]string) @@ -143,10 +144,10 @@ var _ = Describe("Podman events", func() { Expect(err).To(BeNil()) _, exist := eventsMap["Status"] Expect(exist).To(BeTrue()) - Expect(test.ExitCode()).To(BeZero()) test = podmanTest.Podman([]string{"events", "--stream=false", "--format", "{{json.}}"}) test.WaitWithDefaultTimeout() + Expect(test.ExitCode()).To(BeZero()) jsonArr = test.OutputToStringArray() Expect(len(jsonArr)).To(Not(BeZero())) eventsMap = make(map[string]string) @@ -154,6 +155,5 @@ var _ = Describe("Podman events", func() { Expect(err).To(BeNil()) _, exist = eventsMap["Status"] Expect(exist).To(BeTrue()) - Expect(test.ExitCode()).To(BeZero()) }) })