Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run adapter in a context #3246

Merged
merged 1 commit into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiserver

import (
"context"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -51,7 +52,11 @@ type apiServerAdapter struct {
name string // TODO: who dis?
}

func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {
func (a *apiServerAdapter) Start(ctx context.Context) error {
return a.start(ctx.Done())
}

func (a *apiServerAdapter) start(stopCh <-chan struct{}) error {
// Local stop channel.
stop := make(chan struct{})

Expand Down
20 changes: 11 additions & 9 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiserver

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -65,10 +66,11 @@ func TestAdapter_StartRef(t *testing.T) {
}

err := errors.New("test never ran")
stopCh := make(chan struct{})

ctx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
err = a.Start(stopCh)
err = a.Start(ctx)
done <- struct{}{}
}()

Expand All @@ -77,7 +79,7 @@ func TestAdapter_StartRef(t *testing.T) {
// don't have access to it.
time.Sleep(1 * time.Second)

stopCh <- struct{}{}
cancel()
<-done

if err != nil {
Expand Down Expand Up @@ -112,10 +114,10 @@ func TestAdapter_StartResource(t *testing.T) {
}

err := errors.New("test never ran")
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
err = a.Start(stopCh)
err = a.Start(ctx)
done <- struct{}{}
}()

Expand All @@ -124,7 +126,7 @@ func TestAdapter_StartResource(t *testing.T) {
// don't have access to it.
time.Sleep(1 * time.Second)

stopCh <- struct{}{}
cancel()
<-done

if err != nil {
Expand Down Expand Up @@ -159,10 +161,10 @@ func TestAdapter_StartNonNamespacedResource(t *testing.T) {
}

err := errors.New("test never ran")
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
err = a.Start(stopCh)
err = a.Start(ctx)
done <- struct{}{}
}()

Expand All @@ -171,7 +173,7 @@ func TestAdapter_StartNonNamespacedResource(t *testing.T) {
// don't have access to it.
time.Sleep(1 * time.Second)

stopCh <- struct{}{}
cancel()
<-done

if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/adapter/ping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
}
}

func (a *pingAdapter) Start(stopCh <-chan struct{}) error {
func (a *pingAdapter) Start(ctx context.Context) error {
return a.start(ctx.Done())
}

func (a *pingAdapter) start(stopCh <-chan struct{}) error {
sched, err := cron.ParseStandard(a.Schedule)
if err != nil {
return fmt.Errorf("unparseable schedule %s: %v", a.Schedule, err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/adapter/ping/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package ping

import (
"context"
"encoding/json"
"log"
"net/http"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestStart_ServeHTTP(t *testing.T) {
Client: ce,
}

stop := make(chan struct{})
stop := context.Background()
go func() {
if err := a.Start(stop); err != nil {
if tc.error {
Expand All @@ -81,7 +82,7 @@ func TestStartBadCron(t *testing.T) {
Schedule: schedule,
}

stop := make(chan struct{})
stop := context.Background()
if err := a.Start(stop); err == nil {

t.Errorf("failed to fail, %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

type Adapter interface {
Start(stopCh <-chan struct{}) error
Start(ctx context.Context) error
}

type AdapterConstructor func(ctx context.Context, env EnvConfigAccessor, client cloudevents.Client) Adapter
Expand Down Expand Up @@ -113,7 +113,7 @@ func MainWithContext(ctx context.Context, component string, ector EnvConfigConst

logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter))

if err := adapter.Start(ctx.Done()); err != nil {
if err := adapter.Start(ctx); err != nil {
logger.Warn("start returned an error", zap.Error(err))
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapter/v2/main_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

type MessageAdapter interface {
Start(stopCh <-chan struct{}) error
Start(ctx context.Context) error
}

type MessageAdapterConstructor func(ctx context.Context, env EnvConfigAccessor, adapter *kncloudevents.HttpMessageSender, reporter source.StatsReporter) MessageAdapter
Expand Down Expand Up @@ -116,7 +116,7 @@ func MainMessageAdapterWithContext(ctx context.Context, component string, ector

logger.Info("Starting Receive MessageAdapter", zap.Any("adapter", adapter))

if err := adapter.Start(ctx.Done()); err != nil {
if err := adapter.Start(ctx); err != nil {
logger.Warn("start returned an error", zap.Error(err))
}
}
2 changes: 1 addition & 1 deletion pkg/adapter/v2/main_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ func TestMainMessageAdapter(t *testing.T) {
defer view.Unregister(metrics.NewMemStatsAll().DefaultViews()...)
}

func (m *myAdapterBindings) Start(_ <-chan struct{}) error {
func (m *myAdapterBindings) Start(_ context.Context) error {
return nil
}
2 changes: 1 addition & 1 deletion pkg/adapter/v2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ func TestMainWithContext(t *testing.T) {
defer view.Unregister(metrics.NewMemStatsAll().DefaultViews()...)
}

func (m *myAdapter) Start(stopCh <-chan struct{}) error {
func (m *myAdapter) Start(_ context.Context) error {
return nil
}