Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Fixed unit tests and build
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 committed Oct 11, 2023
1 parent 216ca26 commit c8f2136
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
7 changes: 6 additions & 1 deletion pkg/source/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/kncloudevents"

"knative.dev/eventing-kafka/pkg/common/consumer"
Expand Down Expand Up @@ -75,6 +76,7 @@ type Adapter struct {
keyTypeMapper func([]byte) interface{}
rateLimiter *rate.Limiter
extensions map[string]string
dispatcher *kncloudevents.Dispatcher
}

var (
Expand All @@ -89,12 +91,15 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, sink d
logger := logging.FromContext(ctx)
config := processed.(*AdapterConfig)

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

return &Adapter{
config: config,
sink: sink,
reporter: reporter,
logger: logger,
keyTypeMapper: getKeyTypeMapper(config.KeyType),
dispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
}
}
func (a *Adapter) GetConsumerGroup() string {
Expand Down Expand Up @@ -176,7 +181,7 @@ func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool
return false, fmt.Errorf("failed to get cloud event from consumer message: %w", err)
}

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, a.sink,
dispatchInfo, err := a.dispatcher.SendEvent(ctx, *event, a.sink,
kncloudevents.WithRetryConfig(retryConfig),
kncloudevents.WithTransformers(extensionAsTransformer(a.extensions)))
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/source/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"

sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"

fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
)

func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
Expand Down Expand Up @@ -342,6 +344,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
URL: sinkURI,
}

ctx, _ := fakekubeclient.With(context.Background())

a := &Adapter{
config: &AdapterConfig{
EnvConfig: adapter.EnvConfig{
Expand All @@ -358,7 +362,7 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
keyTypeMapper: getKeyTypeMapper(tc.keyTypeMapper),
}

_, err = a.Handle(context.TODO(), tc.message)
_, err = a.Handle(ctx, tc.message)
if tc.error && err == nil {
t.Errorf("expected error, but got %v", err)
}
Expand Down Expand Up @@ -434,6 +438,7 @@ func sinkRejected(writer http.ResponseWriter, _ *http.Request) {

func TestAdapter_Start(t *testing.T) { // just increase code coverage
ctx, cancel := context.WithCancel(context.Background())
ctx, _ = fakekubeclient.With(ctx)

// Increasing coverage
_ = os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "my-cluster-kafka-bootstrap.my-kafka-namespace:9092")
Expand Down

0 comments on commit c8f2136

Please sign in to comment.