Skip to content

Commit

Permalink
[release-1.14] re-applied backport pkg files from release-1.10 (#561)
Browse files Browse the repository at this point in the history
* re-applied over main

* update codegen

---------

Co-authored-by: astelmashenko <[email protected]>
  • Loading branch information
knative-prow-robot and astelmashenko authored May 14, 2024
1 parent 5e12e7f commit fadfabd
Show file tree
Hide file tree
Showing 9 changed files with 1,145 additions and 81 deletions.
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ require (
knative.dev/reconciler-test v0.0.0-20240417065737-ca905cbb09a9
)

require github.com/stretchr/testify v1.8.4
require (
github.com/hashicorp/go-retryablehttp v0.6.7
github.com/rickb777/date v1.13.0
github.com/stretchr/testify v1.8.4
)

require (
cloud.google.com/go/compute v1.24.0 // indirect
Expand Down Expand Up @@ -71,7 +75,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
github.com/hashicorp/raft v1.5.0 // indirect
github.com/imdario/mergo v0.3.9 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand All @@ -93,7 +96,6 @@ require (
github.com/prometheus/common v0.52.3 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
github.com/rickb777/date v1.13.0 // indirect
github.com/rickb777/plural v1.2.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
Expand Down
89 changes: 19 additions & 70 deletions pkg/channel/jetstream/dispatcher/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@ package dispatcher
import (
"context"
"errors"
"net/http"
"sync"
"time"

"knative.dev/eventing/pkg/kncloudevents"

cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/nats-io/nats.go"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing-natss/pkg/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/logging"
)

Expand All @@ -50,7 +48,8 @@ type Consumer struct {
reporter eventingchannels.StatsReporter
channelNamespace string

jsSub *nats.Subscription
jsSub *nats.Subscription
natsConsumerInfo *nats.ConsumerInfo

logger *zap.SugaredLogger
ctx context.Context
Expand All @@ -73,66 +72,19 @@ func (c *Consumer) Close() error {
}

func (c *Consumer) MsgHandler(msg *nats.Msg) {
logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr)))
ctx := logging.WithLogger(c.ctx, logger)
tickerCtx, tickerCancel := context.WithCancel(c.ctx)

tickerDone := make(chan struct{})

go func() {
defer close(tickerDone)

// TODO(dan-j): this should be a fraction of the Consumer's AckWait
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-tickerCtx.Done():
return
case <-ticker.C:
if err := msg.InProgress(nats.Context(tickerCtx)); err != nil && !errors.Is(err, context.Canceled) {
logging.FromContext(ctx).Errorw("failed to mark message as in progress", zap.Error(err))
}
}
}
}()
logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr)))
ctx := logging.WithLogger(c.ctx, logger)

go func() {
var result protocol.Result

// wrap the handler in a local function so that the tickerCtx is cancelled even if a panic occurs.
func() {
defer tickerCancel()
result = c.doHandle(ctx, msg)
}()

// wait for the ticker to stop to prevent attempts to mark the message as in progress after it has been acked
// or nacked
<-tickerDone

switch {
case protocol.IsACK(result):
if err := msg.Ack(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Ack message after successful delivery to subscriber", zap.Error(err))
}
case protocol.IsNACK(result):
if err := msg.Nak(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Nack message after failed delivery to subscriber", zap.Error(err))
}
default:
if err := msg.Term(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Term message after failed delivery to subscriber", zap.Error(err))
}
}
c.doHandle(ctx, msg)
}()
}

// doHandle forwards the received event to the subscriber, the return has three outcomes:
// - Ack (includes `nil`): the event was successfully delivered to the subscriber
// - Nack: the event was not delivered to the subscriber, but it can be retried
// - any other error: the event should be terminated and not retried
func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result {
func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) error {
logger := logging.FromContext(ctx)

if logger.Desugar().Core().Enabled(zap.DebugLevel) {
Expand Down Expand Up @@ -165,40 +117,37 @@ func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result

te := TypeExtractorTransformer("")

dispatchExecutionInfo, err := c.dispatcher.SendMessage(
dispatchExecutionInfo, err := SendMessage(
c.dispatcher,
ctx,
message,
c.sub.Subscriber,
kncloudevents.WithReply(c.sub.Reply),
kncloudevents.WithDeadLetterSink(c.sub.DeadLetter),
kncloudevents.WithRetryConfig(c.sub.RetryConfig),
kncloudevents.WithTransformers(&te),
kncloudevents.WithHeader(additionalHeaders),
c.natsConsumerInfo.Config.AckWait,
msg,
WithReply(c.sub.Reply),
WithDeadLetterSink(c.sub.DeadLetter),
WithRetryConfig(c.sub.RetryConfig),
WithTransformers(&te),
WithHeader(additionalHeaders),
)

args := eventingchannels.ReportArgs{
Ns: c.channelNamespace,
EventType: string(te),
}

_ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args)

if err != nil {
logger.Errorw("failed to forward message to downstream subscriber",
zap.Error(err),
zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode))

code := dispatchExecutionInfo.ResponseCode
if code/100 == 5 || code == http.StatusTooManyRequests || code == http.StatusRequestTimeout {
// tell JSM to redeliver the message later
return protocol.NewReceipt(false, "%w", err)
}

// let knative decide what to do with the message, if it wraps an Ack/Nack then that is what will happen,
// otherwise we will Terminate the message
return err
}

logger.Debug("message forwarded to downstream subscriber")

return protocol.ResultACK
return nil
}
21 changes: 18 additions & 3 deletions pkg/channel/jetstream/dispatcher/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package dispatcher

import (
"time"

"github.com/nats-io/nats.go"
"knative.dev/eventing-natss/pkg/apis/messaging/v1alpha1"
"knative.dev/eventing-natss/pkg/channel/jetstream/utils"
"knative.dev/eventing/pkg/kncloudevents"
)

func buildStreamConfig(streamName, subject string, config *v1alpha1.StreamConfig) *nats.StreamConfig {
Expand Down Expand Up @@ -55,19 +58,31 @@ func buildStreamConfig(streamName, subject string, config *v1alpha1.StreamConfig

}

func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1.ConsumerConfigTemplate) *nats.ConsumerConfig {
func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1.ConsumerConfigTemplate, retryConfig *kncloudevents.RetryConfig) *nats.ConsumerConfig {
const jitter = time.Millisecond * 500
consumerConfig := nats.ConsumerConfig{
Durable: consumerName,
DeliverGroup: consumerName,
DeliverSubject: deliverSubject,
AckPolicy: nats.AckExplicitPolicy,
}

if template != nil {
consumerConfig.AckWait = template.AckWait.Duration
}

if retryConfig != nil {
if retryConfig.RequestTimeout > 0 {
consumerConfig.AckWait = retryConfig.RequestTimeout + jitter
}

consumerConfig.MaxDeliver = retryConfig.RetryMax + 1
}

if template != nil {
consumerConfig.DeliverPolicy = utils.ConvertDeliverPolicy(template.DeliverPolicy, nats.DeliverAllPolicy)
consumerConfig.OptStartSeq = template.OptStartSeq
consumerConfig.AckWait = template.AckWait.Duration
consumerConfig.MaxDeliver = template.MaxDeliver
// ignoring template.AckWait and template.MaxDeliver
consumerConfig.FilterSubject = template.FilterSubject
consumerConfig.ReplayPolicy = utils.ConvertReplayPolicy(template.ReplayPolicy, nats.ReplayInstantPolicy)
consumerConfig.RateLimit = template.RateLimitBPS
Expand Down
7 changes: 4 additions & 3 deletions pkg/channel/jetstream/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
return d.receiver.Start(ctx)
}

// RegisterChannelHost registers the Dispatcher to accept HTTP events matching the specified HostName
// RegisterChannelHost registers the NatsDispatcher to accept HTTP events matching the specified HostName
func (d *Dispatcher) RegisterChannelHost(config ChannelConfig) error {
if old, ok := d.hostToChannelMap.LoadOrStore(config.HostName, config.ChannelReference); ok {
// map already contained a channel reference for this hostname, check they both reference the same channel,
Expand Down Expand Up @@ -222,7 +222,7 @@ func (d *Dispatcher) updateSubscription(ctx context.Context, config ChannelConfi

if isLeader {
deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID))
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate)
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate, sub.RetryConfig)

_, err := d.js.UpdateConsumer(config.StreamName, consumerConfig)
if err != nil {
Expand Down Expand Up @@ -256,6 +256,7 @@ func (d *Dispatcher) subscribe(ctx context.Context, config ChannelConfig, sub Su
channelNamespace: config.Namespace,
logger: logger,
ctx: ctx,
natsConsumerInfo: info,
}

consumer.jsSub, err = d.js.QueueSubscribe(info.Config.DeliverSubject, info.Config.DeliverGroup, consumer.MsgHandler,
Expand Down Expand Up @@ -305,7 +306,7 @@ func (d *Dispatcher) getOrEnsureConsumer(ctx context.Context, config ChannelConf

if isLeader {
deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID))
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate)
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate, sub.RetryConfig)

// AddConsumer is idempotent so this will either create the consumer, update to match expected config, or no-op
info, err := d.js.AddConsumer(config.StreamName, consumerConfig)
Expand Down
13 changes: 12 additions & 1 deletion pkg/channel/jetstream/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package dispatcher
import (
"context"
"testing"
"time"

"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -67,8 +71,15 @@ func TestDispatcher_ReconcileConsumers(t *testing.T) {
ctx = controller.WithEventRecorder(ctx, eventRecorder)

nc := reconciletesting.NewNatsJetStreamChannel(testNS, ncName, reconciletesting.WithNatsJetStreamChannelSubscribers(subscribers))
sub := fanout.Subscription{
RetryConfig: &kncloudevents.RetryConfig{
RequestTimeout: time.Second,
RetryMax: 1,
},
}
config := createChannelConfig(nc, Subscription{
UID: subscriber1UID,
UID: subscriber1UID,
Subscription: sub,
})

d, err := NewDispatcher(ctx, NatsDispatcherArgs{
Expand Down
Loading

0 comments on commit fadfabd

Please sign in to comment.