diff --git a/go.mod b/go.mod index 4dd878340..90a753985 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,11 @@ require ( knative.dev/reconciler-test v0.0.0-20240417065737-ca905cbb09a9 ) -require github.com/stretchr/testify v1.8.4 +require ( + github.com/hashicorp/go-retryablehttp v0.6.7 + github.com/rickb777/date v1.13.0 + github.com/stretchr/testify v1.8.4 +) require ( cloud.google.com/go/compute v1.24.0 // indirect @@ -71,7 +75,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/go-retryablehttp v0.6.7 // indirect github.com/hashicorp/raft v1.5.0 // indirect github.com/imdario/mergo v0.3.9 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -93,7 +96,6 @@ require ( github.com/prometheus/common v0.52.3 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect - github.com/rickb777/date v1.13.0 // indirect github.com/rickb777/plural v1.2.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/fastuuid v1.2.0 // indirect diff --git a/pkg/channel/jetstream/dispatcher/consumer.go b/pkg/channel/jetstream/dispatcher/consumer.go index 4a7d0f892..49c56caed 100644 --- a/pkg/channel/jetstream/dispatcher/consumer.go +++ b/pkg/channel/jetstream/dispatcher/consumer.go @@ -19,20 +19,18 @@ package dispatcher import ( "context" "errors" - "net/http" "sync" - "time" + + "knative.dev/eventing/pkg/kncloudevents" cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2" "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/protocol" "github.com/nats-io/nats.go" "go.opencensus.io/trace" "go.uber.org/zap" "knative.dev/eventing-natss/pkg/tracing" eventingchannels "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" - "knative.dev/eventing/pkg/kncloudevents" "knative.dev/pkg/logging" ) @@ -50,7 +48,8 @@ type Consumer struct { reporter eventingchannels.StatsReporter channelNamespace string - jsSub *nats.Subscription + jsSub *nats.Subscription + natsConsumerInfo *nats.ConsumerInfo logger *zap.SugaredLogger ctx context.Context @@ -73,58 +72,11 @@ func (c *Consumer) Close() error { } func (c *Consumer) MsgHandler(msg *nats.Msg) { - logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr))) - ctx := logging.WithLogger(c.ctx, logger) - tickerCtx, tickerCancel := context.WithCancel(c.ctx) - - tickerDone := make(chan struct{}) - go func() { - defer close(tickerDone) - - // TODO(dan-j): this should be a fraction of the Consumer's AckWait - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-tickerCtx.Done(): - return - case <-ticker.C: - if err := msg.InProgress(nats.Context(tickerCtx)); err != nil && !errors.Is(err, context.Canceled) { - logging.FromContext(ctx).Errorw("failed to mark message as in progress", zap.Error(err)) - } - } - } - }() + logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr))) + ctx := logging.WithLogger(c.ctx, logger) - go func() { - var result protocol.Result - - // wrap the handler in a local function so that the tickerCtx is cancelled even if a panic occurs. - func() { - defer tickerCancel() - result = c.doHandle(ctx, msg) - }() - - // wait for the ticker to stop to prevent attempts to mark the message as in progress after it has been acked - // or nacked - <-tickerDone - - switch { - case protocol.IsACK(result): - if err := msg.Ack(nats.Context(ctx)); err != nil { - logger.Errorw("failed to Ack message after successful delivery to subscriber", zap.Error(err)) - } - case protocol.IsNACK(result): - if err := msg.Nak(nats.Context(ctx)); err != nil { - logger.Errorw("failed to Nack message after failed delivery to subscriber", zap.Error(err)) - } - default: - if err := msg.Term(nats.Context(ctx)); err != nil { - logger.Errorw("failed to Term message after failed delivery to subscriber", zap.Error(err)) - } - } + c.doHandle(ctx, msg) }() } @@ -132,7 +84,7 @@ func (c *Consumer) MsgHandler(msg *nats.Msg) { // - Ack (includes `nil`): the event was successfully delivered to the subscriber // - Nack: the event was not delivered to the subscriber, but it can be retried // - any other error: the event should be terminated and not retried -func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result { +func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) error { logger := logging.FromContext(ctx) if logger.Desugar().Core().Enabled(zap.DebugLevel) { @@ -165,21 +117,25 @@ func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result te := TypeExtractorTransformer("") - dispatchExecutionInfo, err := c.dispatcher.SendMessage( + dispatchExecutionInfo, err := SendMessage( + c.dispatcher, ctx, message, c.sub.Subscriber, - kncloudevents.WithReply(c.sub.Reply), - kncloudevents.WithDeadLetterSink(c.sub.DeadLetter), - kncloudevents.WithRetryConfig(c.sub.RetryConfig), - kncloudevents.WithTransformers(&te), - kncloudevents.WithHeader(additionalHeaders), + c.natsConsumerInfo.Config.AckWait, + msg, + WithReply(c.sub.Reply), + WithDeadLetterSink(c.sub.DeadLetter), + WithRetryConfig(c.sub.RetryConfig), + WithTransformers(&te), + WithHeader(additionalHeaders), ) args := eventingchannels.ReportArgs{ Ns: c.channelNamespace, EventType: string(te), } + _ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args) if err != nil { @@ -187,18 +143,11 @@ func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result zap.Error(err), zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode)) - code := dispatchExecutionInfo.ResponseCode - if code/100 == 5 || code == http.StatusTooManyRequests || code == http.StatusRequestTimeout { - // tell JSM to redeliver the message later - return protocol.NewReceipt(false, "%w", err) - } - // let knative decide what to do with the message, if it wraps an Ack/Nack then that is what will happen, // otherwise we will Terminate the message return err } logger.Debug("message forwarded to downstream subscriber") - - return protocol.ResultACK + return nil } diff --git a/pkg/channel/jetstream/dispatcher/defaults.go b/pkg/channel/jetstream/dispatcher/defaults.go index af1c5c685..b33a75173 100644 --- a/pkg/channel/jetstream/dispatcher/defaults.go +++ b/pkg/channel/jetstream/dispatcher/defaults.go @@ -17,9 +17,12 @@ limitations under the License. package dispatcher import ( + "time" + "github.com/nats-io/nats.go" "knative.dev/eventing-natss/pkg/apis/messaging/v1alpha1" "knative.dev/eventing-natss/pkg/channel/jetstream/utils" + "knative.dev/eventing/pkg/kncloudevents" ) func buildStreamConfig(streamName, subject string, config *v1alpha1.StreamConfig) *nats.StreamConfig { @@ -55,7 +58,8 @@ func buildStreamConfig(streamName, subject string, config *v1alpha1.StreamConfig } -func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1.ConsumerConfigTemplate) *nats.ConsumerConfig { +func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1.ConsumerConfigTemplate, retryConfig *kncloudevents.RetryConfig) *nats.ConsumerConfig { + const jitter = time.Millisecond * 500 consumerConfig := nats.ConsumerConfig{ Durable: consumerName, DeliverGroup: consumerName, @@ -63,11 +67,22 @@ func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1 AckPolicy: nats.AckExplicitPolicy, } + if template != nil { + consumerConfig.AckWait = template.AckWait.Duration + } + + if retryConfig != nil { + if retryConfig.RequestTimeout > 0 { + consumerConfig.AckWait = retryConfig.RequestTimeout + jitter + } + + consumerConfig.MaxDeliver = retryConfig.RetryMax + 1 + } + if template != nil { consumerConfig.DeliverPolicy = utils.ConvertDeliverPolicy(template.DeliverPolicy, nats.DeliverAllPolicy) consumerConfig.OptStartSeq = template.OptStartSeq - consumerConfig.AckWait = template.AckWait.Duration - consumerConfig.MaxDeliver = template.MaxDeliver + // ignoring template.AckWait and template.MaxDeliver consumerConfig.FilterSubject = template.FilterSubject consumerConfig.ReplayPolicy = utils.ConvertReplayPolicy(template.ReplayPolicy, nats.ReplayInstantPolicy) consumerConfig.RateLimit = template.RateLimitBPS diff --git a/pkg/channel/jetstream/dispatcher/dispatcher.go b/pkg/channel/jetstream/dispatcher/dispatcher.go index 80f8a54ab..b08290e4d 100644 --- a/pkg/channel/jetstream/dispatcher/dispatcher.go +++ b/pkg/channel/jetstream/dispatcher/dispatcher.go @@ -113,7 +113,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { return d.receiver.Start(ctx) } -// RegisterChannelHost registers the Dispatcher to accept HTTP events matching the specified HostName +// RegisterChannelHost registers the NatsDispatcher to accept HTTP events matching the specified HostName func (d *Dispatcher) RegisterChannelHost(config ChannelConfig) error { if old, ok := d.hostToChannelMap.LoadOrStore(config.HostName, config.ChannelReference); ok { // map already contained a channel reference for this hostname, check they both reference the same channel, @@ -222,7 +222,7 @@ func (d *Dispatcher) updateSubscription(ctx context.Context, config ChannelConfi if isLeader { deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID)) - consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate) + consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate, sub.RetryConfig) _, err := d.js.UpdateConsumer(config.StreamName, consumerConfig) if err != nil { @@ -256,6 +256,7 @@ func (d *Dispatcher) subscribe(ctx context.Context, config ChannelConfig, sub Su channelNamespace: config.Namespace, logger: logger, ctx: ctx, + natsConsumerInfo: info, } consumer.jsSub, err = d.js.QueueSubscribe(info.Config.DeliverSubject, info.Config.DeliverGroup, consumer.MsgHandler, @@ -305,7 +306,7 @@ func (d *Dispatcher) getOrEnsureConsumer(ctx context.Context, config ChannelConf if isLeader { deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID)) - consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate) + consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate, sub.RetryConfig) // AddConsumer is idempotent so this will either create the consumer, update to match expected config, or no-op info, err := d.js.AddConsumer(config.StreamName, consumerConfig) diff --git a/pkg/channel/jetstream/dispatcher/dispatcher_test.go b/pkg/channel/jetstream/dispatcher/dispatcher_test.go index 263db6537..d5cb61618 100644 --- a/pkg/channel/jetstream/dispatcher/dispatcher_test.go +++ b/pkg/channel/jetstream/dispatcher/dispatcher_test.go @@ -18,6 +18,10 @@ package dispatcher import ( "context" "testing" + "time" + + "knative.dev/eventing/pkg/channel/fanout" + "knative.dev/eventing/pkg/kncloudevents" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime" @@ -67,8 +71,15 @@ func TestDispatcher_ReconcileConsumers(t *testing.T) { ctx = controller.WithEventRecorder(ctx, eventRecorder) nc := reconciletesting.NewNatsJetStreamChannel(testNS, ncName, reconciletesting.WithNatsJetStreamChannelSubscribers(subscribers)) + sub := fanout.Subscription{ + RetryConfig: &kncloudevents.RetryConfig{ + RequestTimeout: time.Second, + RetryMax: 1, + }, + } config := createChannelConfig(nc, Subscription{ - UID: subscriber1UID, + UID: subscriber1UID, + Subscription: sub, }) d, err := NewDispatcher(ctx, NatsDispatcherArgs{ diff --git a/pkg/channel/jetstream/dispatcher/message_dispatcher.go b/pkg/channel/jetstream/dispatcher/message_dispatcher.go new file mode 100644 index 000000000..b1cec7442 --- /dev/null +++ b/pkg/channel/jetstream/dispatcher/message_dispatcher.go @@ -0,0 +1,279 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dispatcher + +import ( + "context" + "fmt" + "net/http" + "time" + + "knative.dev/pkg/apis" + + "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/nats-io/nats.go" + "go.uber.org/zap" + "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/pkg/logging" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/hashicorp/go-retryablehttp" + "k8s.io/apimachinery/pkg/types" + jsutils "knative.dev/eventing-natss/pkg/channel/jetstream/utils" + + eventingapis "knative.dev/eventing/pkg/apis" + "knative.dev/eventing/pkg/utils" + duckv1 "knative.dev/pkg/apis/duck/v1" + + _ "unsafe" +) + +//go:linkname getClientForAddressable knative.dev/eventing/pkg/kncloudevents.getClientForAddressable +func getClientForAddressable(addressable duckv1.Addressable) (*http.Client, error) + +//go:linkname generateBackoffFn knative.dev/eventing/pkg/kncloudevents.generateBackoffFn +func generateBackoffFn(config *kncloudevents.RetryConfig) retryablehttp.Backoff + +//go:linkname sanitizeAddressable knative.dev/eventing/pkg/kncloudevents.sanitizeAddressable +func sanitizeAddressable(addressable *duckv1.Addressable) *duckv1.Addressable + +//go:linkname dispatchExecutionInfoTransformers knative.dev/eventing/pkg/kncloudevents.dispatchExecutionInfoTransformers +func dispatchExecutionInfoTransformers(destination *apis.URL, dispatchExecutionInfo *kncloudevents.DispatchInfo) binding.Transformers + +//go:linkname executeRequest knative.dev/eventing/pkg/kncloudevents.(*Dispatcher).executeRequest +func executeRequest(d *kncloudevents.Dispatcher, ctx context.Context, target duckv1.Addressable, message cloudevents.Message, additionalHeaders http.Header, retryConfig *kncloudevents.RetryConfig, oidcServiceAccount *types.NamespacedName, transformers ...binding.Transformer) (context.Context, cloudevents.Message, *kncloudevents.DispatchInfo, error) + +type SendOption func(*senderConfig) error + +func WithReply(reply *duckv1.Addressable) SendOption { + return func(sc *senderConfig) error { + sc.reply = reply + + return nil + } +} + +func WithDeadLetterSink(dls *duckv1.Addressable) SendOption { + return func(sc *senderConfig) error { + sc.deadLetterSink = dls + + return nil + } +} + +func WithRetryConfig(retryConfig *kncloudevents.RetryConfig) SendOption { + return func(sc *senderConfig) error { + sc.retryConfig = retryConfig + + return nil + } +} + +func WithHeader(header http.Header) SendOption { + return func(sc *senderConfig) error { + sc.additionalHeaders = header + + return nil + } +} + +func WithTransformers(transformers ...binding.Transformer) SendOption { + return func(sc *senderConfig) error { + sc.transformers = transformers + + return nil + } +} + +type senderConfig struct { + reply *duckv1.Addressable + deadLetterSink *duckv1.Addressable + additionalHeaders http.Header + retryConfig *kncloudevents.RetryConfig + transformers binding.Transformers + oidcServiceAccount *types.NamespacedName +} + +func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, options ...SendOption) (*kncloudevents.DispatchInfo, error) { + config := &senderConfig{ + additionalHeaders: make(http.Header), + } + + // apply options + for _, opt := range options { + if err := opt(config); err != nil { + return nil, fmt.Errorf("could not apply option: %w", err) + } + } + + return send(dispatcher, ctx, message, destination, ackWait, msg, config) +} + +func send(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, config *senderConfig) (*kncloudevents.DispatchInfo, error) { + logger := logging.FromContext(ctx) + dispatchExecutionInfo := &kncloudevents.DispatchInfo{} + + // All messages that should be finished at the end of this function + // are placed in this slice + messagesToFinish := []binding.Message{message} + defer func() { + for _, msg := range messagesToFinish { + _ = msg.Finish(nil) + } + }() + + if destination.URL == nil { + return dispatchExecutionInfo, fmt.Errorf("can not dispatch message to nil destination.URL") + } + + // sanitize eventual host-only URLs + destination = *sanitizeAddressable(&destination) + config.reply = sanitizeAddressable(config.reply) + config.deadLetterSink = sanitizeAddressable(config.deadLetterSink) + + var noRetires = kncloudevents.NoRetries() + var lastTry bool + + meta, err := msg.Metadata() + retryNumber := 1 + if err != nil { + logger.Errorw("failed to get nats message metadata, assuming it is 1", zap.Error(err)) + } else { + retryNumber = int(meta.NumDelivered) + } + + if retryNumber <= config.retryConfig.RetryMax { + lastTry = false + } else { + lastTry = true + } + + // send to destination + + // Add `Prefer: reply` header no matter if a reply destination is provided. Discussion: https://github.com/knative/eventing/pull/5764 + additionalHeadersForDestination := http.Header{} + if config.additionalHeaders != nil { + additionalHeadersForDestination = config.additionalHeaders.Clone() + } + additionalHeadersForDestination.Set("Prefer", "reply") + + noRetires.RequestTimeout = jsutils.CalcRequestTimeout(msg, ackWait) + + ctx, responseMessage, dispatchExecutionInfo, err := executeRequest(dispatcher, ctx, destination, message, additionalHeadersForDestination, &noRetires, config.oidcServiceAccount, config.transformers) + processDispatchResult(ctx, msg, config.retryConfig, retryNumber, dispatchExecutionInfo, err) + + if err != nil && lastTry { + // If DeadLetter is configured, then send original message with knative error extensions + if config.deadLetterSink != nil { + dispatchTransformers := dispatchExecutionInfoTransformers(destination.URL, dispatchExecutionInfo) + _, deadLetterResponse, dispatchExecutionInfo, deadLetterErr := executeRequest(dispatcher, ctx, *config.deadLetterSink, message, config.additionalHeaders, config.retryConfig, config.oidcServiceAccount, append(config.transformers, dispatchTransformers)) + if deadLetterErr != nil { + return dispatchExecutionInfo, fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination.URL, err, config.deadLetterSink.URL, deadLetterErr) + } + if deadLetterResponse != nil { + messagesToFinish = append(messagesToFinish, deadLetterResponse) + } + + return dispatchExecutionInfo, nil + } + // No DeadLetter, just fail + return dispatchExecutionInfo, fmt.Errorf("unable to complete request to %s: %w, last try failed", destination.URL, err) + } else if err != nil && !lastTry { + return dispatchExecutionInfo, fmt.Errorf("unable to complete request to %s: %w, going for retry", destination.URL, err) + } + + responseAdditionalHeaders := utils.PassThroughHeaders(dispatchExecutionInfo.ResponseHeader) + + if config.additionalHeaders.Get(eventingapis.KnNamespaceHeader) != "" { + if responseAdditionalHeaders == nil { + responseAdditionalHeaders = make(http.Header) + } + responseAdditionalHeaders.Set(eventingapis.KnNamespaceHeader, config.additionalHeaders.Get(eventingapis.KnNamespaceHeader)) + } + + if responseMessage == nil { + // No response, dispatch completed + return dispatchExecutionInfo, nil + } + + messagesToFinish = append(messagesToFinish, responseMessage) + + if config.reply == nil { + return dispatchExecutionInfo, nil + } + + // send reply + + ctx, responseResponseMessage, dispatchExecutionInfo, err := executeRequest(dispatcher, ctx, *config.reply, responseMessage, responseAdditionalHeaders, config.retryConfig, config.oidcServiceAccount, config.transformers) + if err != nil { + // If DeadLetter is configured, then send original message with knative error extensions + if config.deadLetterSink != nil { + dispatchTransformers := dispatchExecutionInfoTransformers(config.reply.URL, dispatchExecutionInfo) + _, deadLetterResponse, dispatchExecutionInfo, deadLetterErr := executeRequest(dispatcher, ctx, *config.deadLetterSink, message, responseAdditionalHeaders, config.retryConfig, config.oidcServiceAccount, append(config.transformers, dispatchTransformers)) + if deadLetterErr != nil { + return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", config.reply.URL, err, config.deadLetterSink.URL, deadLetterErr) + } + if deadLetterResponse != nil { + messagesToFinish = append(messagesToFinish, deadLetterResponse) + } + + return dispatchExecutionInfo, nil + } + // No DeadLetter, just fail + return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s: %w", config.reply.URL, err) + } + if responseResponseMessage != nil { + messagesToFinish = append(messagesToFinish, responseResponseMessage) + } + + return dispatchExecutionInfo, nil +} + +func processDispatchResult(ctx context.Context, msg *nats.Msg, retryConfig *kncloudevents.RetryConfig, retryNumber int, dispatchExecutionInfo *kncloudevents.DispatchInfo, err error) { + logger := logging.FromContext(ctx) + result := protocol.ResultACK + + if err != nil { + logger.Error("failed to execute message", + zap.Error(err), + zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode)) + + code := dispatchExecutionInfo.ResponseCode + if code/100 == 5 || code == http.StatusTooManyRequests || code == http.StatusRequestTimeout { + // tell JSM to redeliver the message later + result = protocol.NewReceipt(false, "%w", err) + } else { + result = err + } + } + + switch { + case protocol.IsACK(result): + if err := msg.Ack(nats.Context(ctx)); err != nil { + logger.Error("failed to Ack message after successful delivery to subscriber", zap.Error(err)) + } + case protocol.IsNACK(result): + if err := msg.NakWithDelay(jsutils.CalculateNakDelayForRetryNumber(retryNumber, retryConfig), nats.Context(ctx)); err != nil { + logger.Error("failed to Nack message after failed delivery to subscriber", zap.Error(err)) + } + default: + if err := msg.Term(nats.Context(ctx)); err != nil { + logger.Error("failed to Term message after failed delivery to subscriber", zap.Error(err)) + } + } +} diff --git a/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go b/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go new file mode 100644 index 000000000..6a727e823 --- /dev/null +++ b/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go @@ -0,0 +1,744 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dispatcher + +import ( + "bytes" + "context" + "io" + "log" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/nats-io/nats.go" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + dispatchertesting "knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/testing" + + "knative.dev/eventing/pkg/eventingtls" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + + v1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/kncloudevents" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/transformer" + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/util/sets" + + "knative.dev/eventing/pkg/utils" +) + +var ( + // Headers that are added to the response, but we don't want to check in our assertions. + unimportantHeaders = sets.NewString( + "accept-encoding", + "content-length", + "content-type", + "user-agent", + "tracestate", + "ce-tracestate", + ) + + // Headers that should be present, but their value should not be asserted. + ignoreValueHeaders = sets.NewString( + // These are headers added for tracing, they will have random values, so don't bother + // checking them. + "traceparent", + // CloudEvents headers, they will have random values, so don't bother checking them. + "ce-id", + "ce-time", + "ce-traceparent", + ) +) + +const ( + testCeSource = "testsource" + testCeType = "testtype" + ackWait = 30 * time.Second +) + +var ( + retryCount int32 = 3 + backoffDelay = "PT1S" + backoffPolicy = v1.BackoffPolicyExponential +) + +func TestDispatchMessage(t *testing.T) { + testCases := map[string]struct { + sendToDestination bool + sendToReply bool + hasDeadLetterSink bool + eventExtensions map[string]string + header http.Header + body string + fakeResponse *http.Response + fakeReplyResponse *http.Response + fakeDeadLetterResponse *http.Response + expectedErr bool + expectedDestRequest *requestValidation + expectedReplyRequest *requestValidation + expectedDeadLetterRequest *requestValidation + lastReceiver string + delivery *v1.DeliverySpec + }{ + "destination - only": { + sendToDestination: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + lastReceiver: "destination", + }, + "destination - nil additional headers": { + sendToDestination: true, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + lastReceiver: "destination", + }, + "destination and reply - dest returns bad status code": { + sendToDestination: true, + sendToReply: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedErr: true, + lastReceiver: "reply", + }, + "destination and reply - dest returns empty body": { + sendToDestination: true, + sendToReply: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusAccepted, + Header: map[string][]string{ + "do-not-passthrough": {"no"}, + "x-request-id": {"altered-id"}, + "knative-1": {"new-knative-1-value"}, + "ce-abc": {`"new-ce-abc-value"`}, + }, + Body: io.NopCloser(bytes.NewBufferString("")), + }, + lastReceiver: "reply", + }, + "destination and reply": { + sendToDestination: true, + sendToReply: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusAccepted, + Header: map[string][]string{ + "do-not-passthrough": {"no"}, + "x-request-id": {"altered-id"}, + "knative-1": {"new-knative-1-value"}, + "ce-abc": {`"new-ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedReplyRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"altered-id"}, + "knative-1": {"new-knative-1-value"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"new-ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: "destination-response", + }, + lastReceiver: "reply", + }, + "no restriction on message response size": { + sendToDestination: true, + sendToReply: true, + hasDeadLetterSink: false, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + expectedReplyRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: strings.Repeat("a", 2000), + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusAccepted, + Header: map[string][]string{ + "do-not-passthrough": {"no"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: io.NopCloser(bytes.NewBufferString(strings.Repeat("a", 2000))), + }, + + lastReceiver: "reply", + }, + "error response and retries": { + delivery: &v1.DeliverySpec{ + Retry: &retryCount, + BackoffDelay: &backoffDelay, + BackoffPolicy: &backoffPolicy, + }, + sendToDestination: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusGatewayTimeout, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedErr: true, + lastReceiver: "destination", + }, + "error response term message": { + delivery: &v1.DeliverySpec{ + Retry: &retryCount, + BackoffDelay: &backoffDelay, + BackoffPolicy: &backoffPolicy, + }, + sendToDestination: true, + sendToReply: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedErr: true, + lastReceiver: "destination", + }, + } + + s := dispatchertesting.RunBasicJetstreamServer() + defer dispatchertesting.ShutdownJSServerAndRemoveStorage(t, s) + _, js := dispatchertesting.JsClient(t, s) + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + destHandler := &fakeHandler{ + t: t, + response: tc.fakeResponse, + requests: make([]requestValidation, 0), + } + destServer := httptest.NewServer(destHandler) + defer destServer.Close() + + replyHandler := &fakeHandler{ + t: t, + response: tc.fakeResponse, + requests: make([]requestValidation, 0), + } + replyServer := httptest.NewServer(replyHandler) + defer replyServer.Close() + if tc.fakeReplyResponse != nil { + replyHandler.response = tc.fakeReplyResponse + } + + var deadLetterSinkHandler *fakeHandler + var deadLetterSinkServer *httptest.Server + var deadLetterSinkURL *apis.URL + + if tc.hasDeadLetterSink { + deadLetterSinkHandler = &fakeHandler{ + t: t, + response: tc.fakeDeadLetterResponse, + requests: make([]requestValidation, 0), + } + deadLetterSinkServer = httptest.NewServer(deadLetterSinkHandler) + defer deadLetterSinkServer.Close() + + getOnlyDomainURL(t, true, deadLetterSinkServer.URL) + deadLetterSinkURL, _ = apis.ParseURL(deadLetterSinkServer.URL) + } + + deadLetterSinkDestination := duckv1.Addressable{URL: deadLetterSinkURL} + + event := cloudevents.NewEvent(cloudevents.VersionV1) + event.SetID(uuid.New().String()) + event.SetType("testtype") + event.SetSource("testsource") + for n, v := range tc.eventExtensions { + event.SetExtension(n, v) + } + event.SetData(cloudevents.ApplicationJSON, tc.body) + + ctx := context.Background() + + md := kncloudevents.NewDispatcher(eventingtls.ClientConfig{}, nil) + + getOnlyDomainURL(t, tc.sendToDestination, destServer.URL) + getOnlyDomainURL(t, tc.sendToReply, replyServer.URL) + + destinationURL, _ := apis.ParseURL(destServer.URL) + destination := duckv1.Addressable{URL: destinationURL} + replyURL, _ := apis.ParseURL(replyServer.URL) + replyDestination := duckv1.Addressable{URL: replyURL} + + // We need to do message -> event -> message to emulate the same transformers the event receiver would do + message := binding.ToMessage(&event) + var err error + ev, err := binding.ToEvent(ctx, message, binding.Transformers{transformer.AddTimeNow}) + if err != nil { + t.Fatal(err) + } + message = binding.ToMessage(ev) + finishInvoked := 0 + message = binding.WithFinish(message, func(err error) { + finishInvoked++ + }) + + var headers http.Header = nil + if tc.header != nil { + headers = utils.PassThroughHeaders(tc.header) + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: "test", + Storage: nats.MemoryStorage, + Subjects: []string{"test.>"}, + }) + if err != nil { + log.Fatalf("Failed to add stream, %s", err) + } + _, err = js.Publish("test.data", []byte("Here is a string....")) + if err != nil { + log.Fatalf("Failed to publish msg, %s", err) + } + natsSub, _ := js.SubscribeSync("test.>") + msg, err := natsSub.NextMsg(50 * time.Millisecond) + if err != nil { + log.Fatalf("Failed to read msg: %s", err) + } + + var retryConfig kncloudevents.RetryConfig + if tc.delivery == nil { + retryConfig = kncloudevents.NoRetries() + backoffLinear := v1.BackoffPolicyLinear + retryConfig.BackoffPolicy = &backoffLinear + backoffDelay := "5s" + retryConfig.BackoffDelay = &backoffDelay + } else { + retryConfig = kncloudevents.RetryConfig{ + RetryMax: int(*tc.delivery.Retry), + BackoffPolicy: tc.delivery.BackoffPolicy, + BackoffDelay: tc.delivery.BackoffDelay, + } + } + + te := TypeExtractorTransformer("") + info, err := SendMessage( + md, + ctx, + message, + destination, + ackWait, + msg, + WithReply(&replyDestination), + WithDeadLetterSink(&deadLetterSinkDestination), + WithRetryConfig(&retryConfig), + WithTransformers(&te), + WithHeader(headers)) + + if tc.lastReceiver != "" { + switch tc.lastReceiver { + case "destination": + if tc.fakeResponse != nil { + if tc.fakeResponse.StatusCode != info.ResponseCode { + t.Errorf("Unexpected response code in DispatchResultInfo. Expected %v. Actual: %v", tc.fakeResponse.StatusCode, info.ResponseCode) + } + } + case "deadLetter": + if tc.fakeDeadLetterResponse != nil { + if tc.fakeDeadLetterResponse.StatusCode != info.ResponseCode { + t.Errorf("Unexpected response code in DispatchResultInfo. Expected %v. Actual: %v", tc.fakeDeadLetterResponse.StatusCode, info.ResponseCode) + } + } + case "reply": + if tc.fakeReplyResponse != nil { + if tc.fakeReplyResponse.StatusCode != info.ResponseCode { + t.Errorf("Unexpected response code in DispatchResultInfo. Expected %v. Actual: %v", tc.fakeReplyResponse.StatusCode, info.ResponseCode) + } + } + } + } + + if tc.expectedErr != (err != nil) { + t.Errorf("Unexpected error from DispatchMessage. Expected %v. Actual: %v", tc.expectedErr, err) + } + if finishInvoked != 1 { + t.Error("Finish should be invoked exactly one time. Actual:", finishInvoked) + } + if tc.expectedDestRequest != nil { + rv := destHandler.popRequest(t) + assertEquality(t, destServer.URL, *tc.expectedDestRequest, rv) + } + if tc.expectedReplyRequest != nil { + rv := replyHandler.popRequest(t) + assertEquality(t, replyServer.URL, *tc.expectedReplyRequest, rv) + } + if tc.expectedDeadLetterRequest != nil { + if tc.sendToReply { + tc.expectedDeadLetterRequest.Headers.Set("ce-knativeerrordest", replyServer.URL+"/") + } else if tc.sendToDestination { + tc.expectedDeadLetterRequest.Headers.Set("ce-knativeerrordest", destServer.URL+"/") + } + rv := deadLetterSinkHandler.popRequest(t) + assertEquality(t, deadLetterSinkServer.URL, *tc.expectedDeadLetterRequest, rv) + } + if len(destHandler.requests) != 0 { + t.Errorf("Unexpected destination requests: %+v", destHandler.requests) + } + if len(replyHandler.requests) != 0 { + t.Errorf("Unexpected reply requests: %+v", replyHandler.requests) + } + if deadLetterSinkHandler != nil && len(deadLetterSinkHandler.requests) != 0 { + t.Errorf("Unexpected dead letter sink requests: %+v", deadLetterSinkHandler.requests) + } + }) + } +} + +func getOnlyDomainURL(t *testing.T, shouldSend bool, serverURL string) *url.URL { + if shouldSend { + server, err := url.Parse(serverURL) + if err != nil { + t.Errorf("Bad serverURL: %q", serverURL) + } + return &url.URL{ + Host: server.Host, + } + } + return nil +} + +type requestValidation struct { + Host string + Headers http.Header + Body string +} + +type fakeHandler struct { + t *testing.T + response *http.Response + requests []requestValidation +} + +func (f *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + // Make a copy of the request. + body, err := io.ReadAll(r.Body) + if err != nil { + f.t.Error("Failed to read the request body") + } + f.requests = append(f.requests, requestValidation{ + Host: r.Host, + Headers: r.Header, + Body: string(body), + }) + + // Write the response. + if f.response != nil { + for h, vs := range f.response.Header { + for _, v := range vs { + w.Header().Add(h, v) + } + } + w.WriteHeader(f.response.StatusCode) + if _, err := io.Copy(w, f.response.Body); err != nil { + f.t.Error("Error copying Body:", err) + } + } else { + w.WriteHeader(http.StatusOK) + w.Write([]byte("")) + } +} + +func (f *fakeHandler) popRequest(t *testing.T) requestValidation { + if len(f.requests) == 0 { + t.Error("Unable to pop request") + return requestValidation{ + Host: "MADE UP, no such request", + Body: "MADE UP, no such request", + } + } + rv := f.requests[0] + f.requests = f.requests[1:] + return rv +} + +func assertEquality(t *testing.T, replacementURL string, expected, actual requestValidation) { + t.Helper() + server, err := url.Parse(replacementURL) + if err != nil { + t.Errorf("Bad replacement URL: %q", replacementURL) + } + expected.Host = server.Host + canonicalizeHeaders(expected, actual) + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error("Unexpected difference (-want, +got):", diff) + } +} + +func canonicalizeHeaders(rvs ...requestValidation) { + // HTTP header names are case-insensitive, so normalize them to lower case for comparison. + for _, rv := range rvs { + headers := rv.Headers + for n, v := range headers { + delete(headers, n) + n = strings.ToLower(n) + if unimportantHeaders.Has(n) { + continue + } + if ignoreValueHeaders.Has(n) { + headers[n] = []string{"ignored-value-header"} + } else { + headers[n] = v + } + } + } +} + +func RunServerOnPort(port int) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = port + return RunServerWithOptions(&opts) +} + +func RunServerWithOptions(opts *server.Options) *server.Server { + return natsserver.RunServer(opts) +} diff --git a/pkg/channel/jetstream/utils/consumerconfig.go b/pkg/channel/jetstream/utils/consumerconfig.go index 8568e76d0..3b951e1d8 100644 --- a/pkg/channel/jetstream/utils/consumerconfig.go +++ b/pkg/channel/jetstream/utils/consumerconfig.go @@ -17,8 +17,14 @@ limitations under the License. package utils import ( + "math" + "time" + "github.com/nats-io/nats.go" + "github.com/rickb777/date/period" "knative.dev/eventing-natss/pkg/apis/messaging/v1alpha1" + v1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/kncloudevents" ) func ConvertDeliverPolicy(in v1alpha1.DeliverPolicy, def nats.DeliverPolicy) nats.DeliverPolicy { @@ -48,3 +54,58 @@ func ConvertReplayPolicy(in v1alpha1.ReplayPolicy, def nats.ReplayPolicy) nats.R return def } + +func CalcRequestTimeout(msg *nats.Msg, ackWait time.Duration) time.Duration { + const jitter = time.Millisecond * 200 + + // if previous deliveries were explicitly nacked earlier than the deadline, then our actual deadline will be earlier + // than the deadline above + ackDeadlineFromNow := ackWait - jitter + + meta, err := msg.Metadata() + if err != nil { + return ackDeadlineFromNow + } + + // if each delivery has timed out, then multiplying the number of deliveries by the ack wait will give us the + // duration from publish which this attempt will be ack-waited + ackDurationFromPublish := time.Duration(meta.NumDelivered) * ackWait + + // the deadline is the published timestamp plus our duration calculated above + deadline := ackDurationFromPublish - jitter + + if deadline > ackDeadlineFromNow { + deadline = ackDeadlineFromNow + } + return deadline +} + +func CalculateNakDelayForRetryNumber(attemptNum int, config *kncloudevents.RetryConfig) time.Duration { + backoff, backoffDelay := parseBackoffFuncAndDelay(config) + return backoff(attemptNum, backoffDelay) +} + +type backoffFunc func(attemptNum int, delayDuration time.Duration) time.Duration + +func LinearBackoff(attemptNum int, delayDuration time.Duration) time.Duration { + return delayDuration * time.Duration(attemptNum) +} + +func ExpBackoff(attemptNum int, delayDuration time.Duration) time.Duration { + return delayDuration * time.Duration(math.Exp2(float64(attemptNum))) +} + +func parseBackoffFuncAndDelay(config *kncloudevents.RetryConfig) (backoffFunc, time.Duration) { + var backoff backoffFunc + switch *config.BackoffPolicy { + case v1.BackoffPolicyExponential: + backoff = ExpBackoff + case v1.BackoffPolicyLinear: + backoff = LinearBackoff + } + // it should be validated at this point + delay, _ := period.Parse(*config.BackoffDelay) + backoffDelay, _ := delay.Duration() + + return backoff, backoffDelay +} diff --git a/pkg/reconciler/testing/natsjetstreamchannel.go b/pkg/reconciler/testing/natsjetstreamchannel.go index 629742a0c..8eaf2fe7c 100644 --- a/pkg/reconciler/testing/natsjetstreamchannel.go +++ b/pkg/reconciler/testing/natsjetstreamchannel.go @@ -42,7 +42,9 @@ func NewNatsJetStreamChannel(name, namespace string, ncopt ...NatsJetStreamChann Name: name, Namespace: namespace, }, - Spec: v1alpha1.NatsJetStreamChannelSpec{}, + Spec: v1alpha1.NatsJetStreamChannelSpec{ + ConsumerConfigTemplate: &v1alpha1.ConsumerConfigTemplate{}, + }, } for _, opt := range ncopt { opt(nc)