Skip to content

Commit

Permalink
Merge pull request #230 from rabbitmq/query-sequence
Browse files Browse the repository at this point in the history
QuerySequence v2
  • Loading branch information
ablease authored Sep 28, 2023
2 parents 17b210b + 2a2d7f4 commit 3eabadd
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 1 deletion.
45 changes: 45 additions & 0 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,48 @@ func (e *Environment) QueryPartitions(ctx context.Context, superstream string) (
}
return nil, lastError
}

// QuerySequence retrieves the last publishingID for a given producer
// (reference) and stream name.
func (e *Environment) QuerySequence(ctx context.Context, reference, stream string) (uint64, error) {
logger := raw.LoggerFromCtxOrDiscard(ctx)
rn := rand.Intn(100)
n := len(e.locators)
var lastError error

if !validateStringParameter(reference) {
lastError = fmt.Errorf("producer reference invalid: %s", reference)
return uint64(0), lastError
}

var l *locator
for i := 0; i < n; i++ {
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}
if err := l.maybeInitializeLocator(); err != nil {
lastError = err
logger.Error("error initializing locator", slog.Any("error", err))
continue
}

result := l.locatorOperation((*locator).operationQuerySequence, ctx, reference, stream)
if result[1] != nil {
lastError = result[1].(error)
if isNonRetryableError(lastError) {
return uint64(0), lastError
}
logger.Error("locator operation failed", slog.Any("error", lastError))
continue
}

pubId := result[0].(uint64)
return pubId, nil
}

return uint64(0), lastError
}
117 changes: 117 additions & 0 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,121 @@ var _ = Describe("Environment", func() {
Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`))
})
})

Context("query sequence", func() {
Context("input validation succeeds", func() {

BeforeEach(func() {
mockRawClient.EXPECT().
IsOpen().
Return(true) // from maybeInitializeLocator
})

It("queries last publishingid for a given producer and stream", func() {
// setup
publishingId := uint64(42)
mockRawClient.EXPECT().
QueryPublisherSequence(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(publishingId, nil)

// act
pubId, err := environment.QuerySequence(rootCtx, "producer-id", "stream-id")
Expect(err).ToNot(HaveOccurred())
Expect(pubId).To(BeNumerically("==", 42))

})

When("there is an error", func() {
It("bubbles up the error", func() {
// setup
publishingId := uint64(0)

mockRawClient.EXPECT().
QueryPublisherSequence(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(publishingId, errors.New("err not today")).
Times(3)

_, err := environment.QuerySequence(rootCtx, "producer-id", "stream-id")
Expect(err).To(MatchError("err not today"))
})

})

When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)

BeforeEach(func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)
})

It("uses different locators when one fails", func() {
// setup
locator2rawClient.EXPECT().
IsOpen().
Return(true)
locator2rawClient.EXPECT().
QueryPublisherSequence(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(42), nil)

mockRawClient.EXPECT().
QueryPublisherSequence(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), errors.New("something went wrong")).
Times(3)

// act
pubId, err := environment.QuerySequence(rootCtx, "retried-stream-stats", "stream-id")
Expect(err).ToNot(HaveOccurred())
Expect(pubId).To(BeNumerically("==", 42))
})

It("gives up on non-retryable errors", func() {
// setup
mockRawClient.EXPECT().
QueryPublisherSequence(gomock.AssignableToTypeOf(ctxType), gomock.Eq("non-retryable"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), raw.ErrInternalError)

// act
_, err := environment.QuerySequence(rootCtx, "non-retryable", "stream")
Expect(err).To(HaveOccurred())
})
})

It("logs intermediate error messages", func() {
// setup
logBuffer := gbytes.NewBuffer()
logger := slog.New(slog.NewTextHandler(logBuffer))
ctx := raw.NewContextWithLogger(context.Background(), *logger)

mockRawClient.EXPECT().
QueryPublisherSequence(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), errors.New("err maybe later")).
Times(3)

// act
_, err := environment.QuerySequence(ctx, "log-things", "stream")
Expect(err).To(HaveOccurred())

Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`))
})
})

Context("input validation fails", func() {
It("validates that reference is not a nil string", func() {

_, err := environment.QuerySequence(rootCtx, "", "stream-id")
Expect(err).To(MatchError("producer reference invalid: "))
})

It("validates that reference is not a whitespace char", func() {
// setup
_, err := environment.QuerySequence(rootCtx, " ", "stream-id")
Expect(err).To(MatchError("producer reference invalid: "))
})
})
})
})
9 changes: 8 additions & 1 deletion pkg/stream/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,17 @@ func (l *locator) operationQueryOffset(args ...any) []any {
offset, err := l.client.QueryOffset(ctx, reference, stream)
return []any{offset, err}
}

func (l *locator) operationPartitions(args ...any) []any {
ctx := args[0].(context.Context)
superstream := args[1].(string)
partitions, err := l.client.Partitions(ctx, superstream)
return []any{partitions, err}
}

func (l *locator) operationQuerySequence(args ...any) []any {
ctx := args[0].(context.Context)
reference := args[1].(string)
stream := args[2].(string)
pubId, err := l.client.QueryPublisherSequence(ctx, reference, stream)
return []any{pubId, err}
}
8 changes: 8 additions & 0 deletions pkg/stream/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ func maybeApplyDefaultTimeout(ctx context.Context) (context.Context, context.Can
}
return ctx, nil
}

func validateStringParameter(p string) bool {
if len(p) == 0 || p == " " {
return false
}

return true
}

0 comments on commit 3eabadd

Please sign in to comment.