Skip to content

Commit

Permalink
Merge pull request #235 from rabbitmq/query-partitions
Browse files Browse the repository at this point in the history
QueryPartitions in Environment
  • Loading branch information
Zerpet authored Sep 27, 2023
2 parents a6cced8 + b9c0921 commit 341b937
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 0 deletions.
39 changes: 39 additions & 0 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,42 @@ func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string)
}
return uint64(0), lastError
}

// QueryPartitions returns a list of partition streams for a given superstream name
func (e *Environment) QueryPartitions(ctx context.Context, superstream string) ([]string, error) {
logger := raw.LoggerFromCtxOrDiscard(ctx)
rn := rand.Intn(100)
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
lastError = err
logger.Error("error initializing locator", slog.Any("error", err))
continue
}

result := l.locatorOperation((*locator).operationPartitions, ctx, superstream)
if result[1] != nil {
lastError = result[1].(error)
if isNonRetryableError(lastError) {
return nil, lastError
}
logger.Error("locator operation failed", slog.Any("error", lastError))
continue
}

partitions := result[0].([]string)
return partitions, nil
}
return nil, lastError
}
95 changes: 95 additions & 0 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,101 @@ var _ = Describe("Environment", func() {
_, err := environment.QueryOffset(ctx, "log-things", "stream")
Expect(err).To(HaveOccurred())

Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`))
})

})
Context("query partitions", func() {
BeforeEach(func() {
mockRawClient.EXPECT().
IsOpen().
Return(true) // from maybeInitializeLocator
})

It("queries partition streams for a given superstream", func() {
// setup
mockRawClient.EXPECT().
Partitions(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string")).
Return([]string{"stream1, stream2"}, nil)

// act
partitions, err := environment.QueryPartitions(rootCtx, "superstream")
Expect(err).ToNot(HaveOccurred())
Expect(partitions).To(Equal([]string{"stream1, stream2"}))
})

When("there is an error", func() {
It("bubbles up the error", func() {
// setup
mockRawClient.EXPECT().
Partitions(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string")).
Return(nil, errors.New("err not today")).
Times(3)

_, err := environment.QueryPartitions(rootCtx, "superstream-does-not-exist")
Expect(err).To(MatchError("err not today"))
})
})

When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)

BeforeEach(func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)
})

It("uses different locators when one fails", func() {
// setup
locator2rawClient.EXPECT().
IsOpen().
Return(true)
locator2rawClient.EXPECT().
Partitions(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string")).
Return([]string{"stream1", "stream2"}, nil)

mockRawClient.EXPECT().
Partitions(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string")).
Return(nil, errors.New("something went wrong")).
Times(3)

// act
partitions, err := environment.QueryPartitions(rootCtx, "superstream")
Expect(err).ToNot(HaveOccurred())
Expect(partitions).To(Equal([]string{"stream1", "stream2"}))
})

It("gives up on non-retryable errors", func() {
// setup
mockRawClient.EXPECT().
Partitions(gomock.AssignableToTypeOf(ctxType), gomock.Eq("non-retryable")).
Return(nil, raw.ErrStreamDoesNotExist)

// act
_, err := environment.QueryPartitions(rootCtx, "non-retryable")
Expect(err).To(HaveOccurred())
})
})

It("logs intermediate error messages", func() {
// setup
logBuffer := gbytes.NewBuffer()
logger := slog.New(slog.NewTextHandler(logBuffer))
ctx := raw.NewContextWithLogger(context.Background(), *logger)

mockRawClient.EXPECT().
Partitions(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string")).
Return(nil, errors.New("err maybe later")).
Times(3)

// act
_, err := environment.QueryPartitions(ctx, "log-things")
Expect(err).To(HaveOccurred())

Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`))
})
})
Expand Down
7 changes: 7 additions & 0 deletions pkg/stream/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,10 @@ func (l *locator) operationQueryOffset(args ...any) []any {
offset, err := l.client.QueryOffset(ctx, reference, stream)
return []any{offset, err}
}

func (l *locator) operationPartitions(args ...any) []any {
ctx := args[0].(context.Context)
superstream := args[1].(string)
partitions, err := l.client.Partitions(ctx, superstream)
return []any{partitions, err}
}

0 comments on commit 341b937

Please sign in to comment.