diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index a2a13eba26e..28e72c6d41d 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "context" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -51,7 +52,11 @@ type apiServerAdapter struct { name string // TODO: who dis? } -func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error { +func (a *apiServerAdapter) Start(ctx context.Context) error { + return a.start(ctx.Done()) +} + +func (a *apiServerAdapter) start(stopCh <-chan struct{}) error { // Local stop channel. stop := make(chan struct{}) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index fc360b41593..a5e67203b12 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "context" "testing" "time" @@ -65,10 +66,11 @@ func TestAdapter_StartRef(t *testing.T) { } err := errors.New("test never ran") - stopCh := make(chan struct{}) + + ctx, cancel := context.WithCancel(ctx) done := make(chan struct{}) go func() { - err = a.Start(stopCh) + err = a.Start(ctx) done <- struct{}{} }() @@ -77,7 +79,7 @@ func TestAdapter_StartRef(t *testing.T) { // don't have access to it. time.Sleep(1 * time.Second) - stopCh <- struct{}{} + cancel() <-done if err != nil { @@ -112,10 +114,10 @@ func TestAdapter_StartResource(t *testing.T) { } err := errors.New("test never ran") - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) done := make(chan struct{}) go func() { - err = a.Start(stopCh) + err = a.Start(ctx) done <- struct{}{} }() @@ -124,7 +126,7 @@ func TestAdapter_StartResource(t *testing.T) { // don't have access to it. time.Sleep(1 * time.Second) - stopCh <- struct{}{} + cancel() <-done if err != nil { @@ -159,10 +161,10 @@ func TestAdapter_StartNonNamespacedResource(t *testing.T) { } err := errors.New("test never ran") - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) done := make(chan struct{}) go func() { - err = a.Start(stopCh) + err = a.Start(ctx) done <- struct{}{} }() @@ -171,7 +173,7 @@ func TestAdapter_StartNonNamespacedResource(t *testing.T) { // don't have access to it. time.Sleep(1 * time.Second) - stopCh <- struct{}{} + cancel() <-done if err != nil { diff --git a/pkg/adapter/ping/adapter.go b/pkg/adapter/ping/adapter.go index fd50add99e2..e5c4f6ca3bd 100644 --- a/pkg/adapter/ping/adapter.go +++ b/pkg/adapter/ping/adapter.go @@ -79,7 +79,11 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie } } -func (a *pingAdapter) Start(stopCh <-chan struct{}) error { +func (a *pingAdapter) Start(ctx context.Context) error { + return a.start(ctx.Done()) +} + +func (a *pingAdapter) start(stopCh <-chan struct{}) error { sched, err := cron.ParseStandard(a.Schedule) if err != nil { return fmt.Errorf("unparseable schedule %s: %v", a.Schedule, err) diff --git a/pkg/adapter/ping/adapter_test.go b/pkg/adapter/ping/adapter_test.go index 1278f3f3755..a0cb30f2a36 100644 --- a/pkg/adapter/ping/adapter_test.go +++ b/pkg/adapter/ping/adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package ping import ( + "context" "encoding/json" "log" "net/http" @@ -55,7 +56,7 @@ func TestStart_ServeHTTP(t *testing.T) { Client: ce, } - stop := make(chan struct{}) + stop := context.Background() go func() { if err := a.Start(stop); err != nil { if tc.error { @@ -81,7 +82,7 @@ func TestStartBadCron(t *testing.T) { Schedule: schedule, } - stop := make(chan struct{}) + stop := context.Background() if err := a.Start(stop); err == nil { t.Errorf("failed to fail, %v", err) diff --git a/pkg/adapter/v2/main.go b/pkg/adapter/v2/main.go index a0ae3c98d4d..ccbb663e71c 100644 --- a/pkg/adapter/v2/main.go +++ b/pkg/adapter/v2/main.go @@ -35,7 +35,7 @@ import ( ) type Adapter interface { - Start(stopCh <-chan struct{}) error + Start(ctx context.Context) error } type AdapterConstructor func(ctx context.Context, env EnvConfigAccessor, client cloudevents.Client) Adapter @@ -113,7 +113,7 @@ func MainWithContext(ctx context.Context, component string, ector EnvConfigConst logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter)) - if err := adapter.Start(ctx.Done()); err != nil { + if err := adapter.Start(ctx); err != nil { logger.Warn("start returned an error", zap.Error(err)) } } diff --git a/pkg/adapter/v2/main_message.go b/pkg/adapter/v2/main_message.go index 662c9f82463..902ff79005b 100644 --- a/pkg/adapter/v2/main_message.go +++ b/pkg/adapter/v2/main_message.go @@ -36,7 +36,7 @@ import ( ) type MessageAdapter interface { - Start(stopCh <-chan struct{}) error + Start(ctx context.Context) error } type MessageAdapterConstructor func(ctx context.Context, env EnvConfigAccessor, adapter *kncloudevents.HttpMessageSender, reporter source.StatsReporter) MessageAdapter @@ -116,7 +116,7 @@ func MainMessageAdapterWithContext(ctx context.Context, component string, ector logger.Info("Starting Receive MessageAdapter", zap.Any("adapter", adapter)) - if err := adapter.Start(ctx.Done()); err != nil { + if err := adapter.Start(ctx); err != nil { logger.Warn("start returned an error", zap.Error(err)) } } diff --git a/pkg/adapter/v2/main_message_test.go b/pkg/adapter/v2/main_message_test.go index 5cd82f54c27..06c53d8572a 100644 --- a/pkg/adapter/v2/main_message_test.go +++ b/pkg/adapter/v2/main_message_test.go @@ -59,6 +59,6 @@ func TestMainMessageAdapter(t *testing.T) { defer view.Unregister(metrics.NewMemStatsAll().DefaultViews()...) } -func (m *myAdapterBindings) Start(_ <-chan struct{}) error { +func (m *myAdapterBindings) Start(_ context.Context) error { return nil } diff --git a/pkg/adapter/v2/main_test.go b/pkg/adapter/v2/main_test.go index b270b1f3878..92cd07baf5d 100644 --- a/pkg/adapter/v2/main_test.go +++ b/pkg/adapter/v2/main_test.go @@ -57,6 +57,6 @@ func TestMainWithContext(t *testing.T) { defer view.Unregister(metrics.NewMemStatsAll().DefaultViews()...) } -func (m *myAdapter) Start(stopCh <-chan struct{}) error { +func (m *myAdapter) Start(_ context.Context) error { return nil }