From 9ad86715475499d7b5f9db998884c591d7816577 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Wed, 2 Dec 2020 15:42:48 -0500 Subject: [PATCH] [aggregator] Add ActivePlacement method to TCP client --- src/aggregator/client/tcp_client.go | 17 ++++++++++++++++ src/aggregator/client/tcp_client_test.go | 25 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/aggregator/client/tcp_client.go b/src/aggregator/client/tcp_client.go index 18d2fb2c00..725894049b 100644 --- a/src/aggregator/client/tcp_client.go +++ b/src/aggregator/client/tcp_client.go @@ -222,6 +222,23 @@ func (c *TCPClient) WriteForwarded( return c.write(metric.ID, metric.TimeNanos, payload) } +// ActivePlacement returns a copy of the currently active placement and its version. +func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) { + stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement() + if err != nil { + return nil, 0, err + } + defer onStagedPlacementDoneFn() + + placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() + if err != nil { + return nil, 0, err + } + defer onPlacementDoneFn() + + return placement.Clone(), stagedPlacement.Version(), nil +} + // Flush flushes any remaining data buffered by the client. func (c *TCPClient) Flush() error { c.metrics.flush.Inc(1) diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index 1b4327b003..a36a5c6020 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/m3db/m3/src/cluster/kv/mem" @@ -786,6 +787,30 @@ func TestTCPClientWriteTimeRangeFor(t *testing.T) { } } +func TestTCPClientActivePlacement(t *testing.T) { + var ( + c = mustNewTestTCPClient(t, testOptions()) + emptyPl = placement.NewPlacement() + ctrl = gomock.NewController(t) + mockPl = placement.NewMockPlacement(ctrl) + stagedPlacement = placement.NewMockActiveStagedPlacement(ctrl) + watcher = placement.NewMockStagedPlacementWatcher(ctrl) + doneCalls int + ) + + c.placementWatcher = watcher + watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil) + stagedPlacement.EXPECT().Version().Return(42) + stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, func() { doneCalls++ }, nil) + mockPl.EXPECT().Clone().Return(emptyPl) + + pl, v, err := c.ActivePlacement() + assert.NoError(t, err) + assert.Equal(t, 42, v) + assert.Equal(t, 2, doneCalls) + assert.Equal(t, emptyPl, pl) +} + func TestTCPClientInitAndClose(t *testing.T) { c := mustNewTestTCPClient(t, testOptions()) require.NoError(t, c.Init())