Skip to content

Commit

Permalink
Delay topic creation logic on subscription creation (#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakshi-goyal-razorpay authored Nov 8, 2022
1 parent 3eec241 commit 707fc42
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 13 deletions.
7 changes: 5 additions & 2 deletions internal/subscriber/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/razorpay/metro/internal/offset"
"github.com/razorpay/metro/internal/subscriber/retry"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/internal/topic"
"github.com/razorpay/metro/pkg/cache"
"github.com/razorpay/metro/pkg/logger"
"github.com/razorpay/metro/pkg/messagebroker"
Expand All @@ -26,11 +27,12 @@ type Core struct {
subscriptionCore subscription.ICore
offsetCore offset.ICore
ch cache.ICache
topicCore topic.ICore
}

// NewCore returns a new subscriber core
func NewCore(bs brokerstore.IBrokerStore, subscriptionCore subscription.ICore, offsetCore offset.ICore, ch cache.ICache) ICore {
return &Core{bs: bs, subscriptionCore: subscriptionCore, offsetCore: offsetCore, ch: ch}
func NewCore(bs brokerstore.IBrokerStore, subscriptionCore subscription.ICore, offsetCore offset.ICore, ch cache.ICache, topicCore topic.ICore) ICore {
return &Core{bs: bs, subscriptionCore: subscriptionCore, offsetCore: offsetCore, ch: ch, topicCore: topicCore}
}

// NewSubscriber initiates a new subscriber for a given topic
Expand Down Expand Up @@ -67,6 +69,7 @@ func (c *Core) NewSubscriber(ctx context.Context,
WithMessageHandler(retry.NewPushToPrimaryRetryTopicHandler(c.bs)).
WithSubscriberID(subscriberID).
WithErrChan(errChan).
WithTopicCore(c.topicCore).
Build()

err = retrier.Start(subsCtx)
Expand Down
18 changes: 14 additions & 4 deletions internal/subscriber/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
oCore "github.com/razorpay/metro/internal/offset/mocks/core"
"github.com/razorpay/metro/internal/subscription"
sCore "github.com/razorpay/metro/internal/subscription/mocks/core"
"github.com/razorpay/metro/internal/topic"
tCore "github.com/razorpay/metro/internal/topic/mocks/core"
"github.com/razorpay/metro/pkg/cache"
mCache "github.com/razorpay/metro/pkg/cache/mocks"
"github.com/razorpay/metro/pkg/messagebroker"
Expand All @@ -36,6 +38,7 @@ func setupCore(t *testing.T) (
core *sCore.MockICore,
offsetCore *oCore.MockICore,
cache *mCache.MockICache,
topicCore *tCore.MockICore,
) {
ctrl := gomock.NewController(t)
ctx = context.Background()
Expand All @@ -45,16 +48,19 @@ func setupCore(t *testing.T) (
core = sCore.NewMockICore(ctrl)
offsetCore = oCore.NewMockICore(ctrl)
cache = mCache.NewMockICache(ctrl)
topicCore = tCore.NewMockICore(ctrl)

cache.EXPECT().Get(gomock.Any(), gomock.Any()).Return([]byte{'0'}, nil).AnyTimes()
cache.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
store.EXPECT().GetProducer(gomock.Any(), gomock.Any()).Return(getMockProducer(ctrl), nil).AnyTimes()
cs.EXPECT().Close(gomock.Any()).Return(nil).AnyTimes()
store.EXPECT().RemoveConsumer(gomock.Any(), gomock.Any()).Return(true).AnyTimes()
topicCore.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes()
return
}

func TestCore_NewSubscriber(t *testing.T) {
ctx, cs, store, _, core, offsetCore, ch := setupCore(t)
ctx, cs, store, _, core, offsetCore, ch, topicCore := setupCore(t)
ackCh := make(chan *AckMessage)
modAckCh := make(chan *ModAckMessage)
requestCh := make(chan *PullRequest)
Expand All @@ -81,7 +87,7 @@ func TestCore_NewSubscriber(t *testing.T) {
{
name: "Create new subscriber and run with retry policy",
fields: fields{
core: NewCore(store, core, offsetCore, ch),
core: NewCore(store, core, offsetCore, ch, topicCore),
},
args: args{
ctx: ctx,
Expand All @@ -100,7 +106,7 @@ func TestCore_NewSubscriber(t *testing.T) {
{
name: "Create new subscriber with getConsumer error Failure",
fields: fields{
core: NewCore(store, core, offsetCore, ch),
core: NewCore(store, core, offsetCore, ch, topicCore),
},
args: args{
ctx: ctx,
Expand Down Expand Up @@ -152,11 +158,13 @@ func TestNewCore(t *testing.T) {
mockSubscriptionCore := sCore.NewMockICore(ctrl)
mockOffsetCore := oCore.NewMockICore(ctrl)
mockCache := mCache.NewMockICache(ctrl)
mockTopicCore := tCore.NewMockICore(ctrl)

type args struct {
bs brokerstore.IBrokerStore
subscriptionCore subscription.ICore
offsetCore offset.ICore
topicCore topic.ICore
ch cache.ICache
}
tests := []struct {
Expand All @@ -170,19 +178,21 @@ func TestNewCore(t *testing.T) {
bs: mockBrokerStoreCore,
subscriptionCore: mockSubscriptionCore,
offsetCore: mockOffsetCore,
topicCore: mockTopicCore,
ch: mockCache,
},
want: &Core{
bs: mockBrokerStoreCore,
subscriptionCore: mockSubscriptionCore,
offsetCore: mockOffsetCore,
topicCore: mockTopicCore,
ch: mockCache,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewCore(tt.args.bs, tt.args.subscriptionCore, tt.args.offsetCore, tt.args.ch)
got := NewCore(tt.args.bs, tt.args.subscriptionCore, tt.args.offsetCore, tt.args.ch, tt.args.topicCore)
assert.Equal(t, tt.want, got)
})
}
Expand Down
8 changes: 8 additions & 0 deletions internal/subscriber/retry/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/razorpay/metro/internal/brokerstore"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/internal/topic"
"github.com/razorpay/metro/pkg/cache"
)

Expand All @@ -18,6 +19,7 @@ type Builder interface {
WithMessageHandler(handler MessageHandler) Builder
WithSubscriberID(subscriberID string) Builder
WithErrChan(chan error) Builder
WithTopicCore(topicCore topic.ICore) Builder
Build() IRetrier
}

Expand Down Expand Up @@ -78,3 +80,9 @@ func (retrier *Retrier) WithErrChan(errChan chan error) Builder {
retrier.errChan = errChan
return retrier
}

// WithTopicCore ...
func (retrier *Retrier) WithTopicCore(topicCore topic.ICore) Builder {
retrier.topicCore = topicCore
return retrier
}
6 changes: 6 additions & 0 deletions internal/subscriber/retry/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Retrier struct {
handler MessageHandler
delayConsumers sync.Map
errChan chan error
topicCore topic.ICore
}

// Start starts a new retrier which internally takes care of spawning the needed delay-consumers.
Expand All @@ -55,8 +56,13 @@ func (r *Retrier) Start(ctx context.Context) error {
nextDelayInterval,
topic.Intervals,
))

for interval, topic := range r.subs.GetDelayTopicsMap() {
if uint(interval) <= uint(predefinedInterval) {
if _, err := r.topicCore.Get(ctx, topic); err != nil {
logger.Ctx(ctx).Infow("Start: error while getting delay topic", "topic", topic, "error", err.Error())
continue
}
dc, err := NewDelayConsumer(ctx, r.subscriberID, topic, r.subs, r.bs, r.handler, r.ch, r.errChan)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions internal/subscriber/retry/retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/razorpay/metro/internal/brokerstore/mocks"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/internal/topic"
topicCore "github.com/razorpay/metro/internal/topic/mocks/core"
mocks2 "github.com/razorpay/metro/pkg/cache/mocks"
"github.com/razorpay/metro/pkg/messagebroker"
mocks3 "github.com/razorpay/metro/pkg/messagebroker/mocks"
Expand Down Expand Up @@ -121,6 +122,8 @@ func TestRetrier_Start(t *testing.T) {
).Return(dcs, nil).AnyTimes()
mockBrokerStore.EXPECT().RemoveConsumer(ctx, gomock.Any()).AnyTimes()
mockBrokerStore.EXPECT().GetProducer(gomock.Any(), gomock.Any()).Return(producer, nil).AnyTimes()
mockTopicCore := topicCore.NewMockICore(ctrl)
mockTopicCore.EXPECT().Get(ctx, gomock.Any()).AnyTimes().Return(nil, nil)

r := &Retrier{
subscriberID: "subscription-id",
Expand All @@ -131,6 +134,7 @@ func TestRetrier_Start(t *testing.T) {
finder: subscription.NewClosestIntervalWithCeil(),
handler: NewPushToPrimaryRetryTopicHandler(mockBrokerStore),
delayConsumers: sync.Map{},
topicCore: mockTopicCore,
errChan: make(chan error),
}
expDelayTopics := r.subs.GetDelayTopics()
Expand Down
2 changes: 1 addition & 1 deletion internal/subscription/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (m *Model) GetDelayTopicsByBackoff() []string {
))

delayTopics = append(delayTopics, delayTopicsMap[closestInterval])
currentInterval = int(closestInterval)
currentInterval = int(nextDelayInterval)
currentRetryCount++
}
return delayTopics
Expand Down
4 changes: 2 additions & 2 deletions internal/subscription/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func TestModel_GetDelayTopicsByBackoff(t *testing.T) {
deadLetterPolicy: &DeadLetterPolicy{MaxDeliveryAttempts: 5},
expected: []string{
subModel.GetDelay30secTopic(),
subModel.GetDelay60secTopic(),
subModel.GetDelay30secTopic(),
subModel.GetDelay150secTopic(),
subModel.GetDelay150secTopic(),
subModel.GetDelay300secTopic(),
subModel.GetDelay600secTopic(),
},
},
Expand Down
2 changes: 1 addition & 1 deletion service/web/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (svc *Service) Start(ctx context.Context) error {

offsetCore := offset.NewCore(offset.NewRepo(r))

streamManager := stream.NewStreamManager(ctx, subscriptionCore, offsetCore, brokerStore, svc.webConfig.Interfaces.API.GrpcServerAddress)
streamManager := stream.NewStreamManager(ctx, subscriptionCore, offsetCore, brokerStore, svc.webConfig.Interfaces.API.GrpcServerAddress, topicCore)

go func() {
err = <-svc.errChan
Expand Down
7 changes: 5 additions & 2 deletions service/web/stream/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/razorpay/metro/internal/offset"
"github.com/razorpay/metro/internal/subscriber"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/internal/topic"
metrov1 "github.com/razorpay/metro/rpc/proto/v1"
"golang.org/x/sync/errgroup"
)
Expand All @@ -39,10 +40,11 @@ type Manager struct {
mutex *sync.Mutex
grpcServerAddr string
ctx context.Context
topicCore topic.ICore
}

// NewStreamManager ...
func NewStreamManager(ctx context.Context, subscriptionCore subscription.ICore, offsetCore offset.ICore, bs brokerstore.IBrokerStore, grpcServerAddr string) IManager {
func NewStreamManager(ctx context.Context, subscriptionCore subscription.ICore, offsetCore offset.ICore, bs brokerstore.IBrokerStore, grpcServerAddr string, topicCore topic.ICore) IManager {
mgr := &Manager{
pullStreams: make(map[string]IStream),
subscriptionCore: subscriptionCore,
Expand All @@ -53,6 +55,7 @@ func NewStreamManager(ctx context.Context, subscriptionCore subscription.ICore,
mutex: &sync.Mutex{},
grpcServerAddr: grpcServerAddr,
ctx: ctx,
topicCore: topicCore,
}

go mgr.run()
Expand Down Expand Up @@ -104,7 +107,7 @@ func (s *Manager) CreateNewStream(server metrov1.Subscriber_StreamingPullServer,
pullStream, err := newPullStream(server,
req.ClientID,
subModel,
subscriber.NewCore(s.bs, s.subscriptionCore, s.offsetCore, ch),
subscriber.NewCore(s.bs, s.subscriptionCore, s.offsetCore, ch, s.topicCore),
errGroup,
s.cleanupCh,
)
Expand Down
2 changes: 1 addition & 1 deletion service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewService(workerConfig *Config, registryConfig *registry.Config, cacheConf
return nil, err
}

subscriberCore := subscriber.NewCore(brokerStore, subscriptionCore, offsetCore, ch)
subscriberCore := subscriber.NewCore(brokerStore, subscriptionCore, offsetCore, ch, topicCore)

// Init scheduler task, this schedules the subscriptions on nodes
// Leader Task runs this task internally if node is elected as leader
Expand Down

0 comments on commit 707fc42

Please sign in to comment.