Skip to content

Commit

Permalink
[aggregator] Add ActivePlacement method to TCP client (#2971)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 2, 2020
1 parent 0342a92 commit 38f0545
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/aggregator/client/tcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,23 @@ func (c *TCPClient) WriteForwarded(
return c.write(metric.ID, metric.TimeNanos, payload)
}

// ActivePlacement returns a copy of the currently active placement and its version.
func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) {
stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement()
if err != nil {
return nil, 0, err
}
defer onStagedPlacementDoneFn()

placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
if err != nil {
return nil, 0, err
}
defer onPlacementDoneFn()

return placement.Clone(), stagedPlacement.Version(), nil
}

// Flush flushes any remaining data buffered by the client.
func (c *TCPClient) Flush() error {
c.metrics.flush.Inc(1)
Expand Down
25 changes: 25 additions & 0 deletions src/aggregator/client/tcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/cluster/kv/mem"
Expand Down Expand Up @@ -786,6 +787,30 @@ func TestTCPClientWriteTimeRangeFor(t *testing.T) {
}
}

func TestTCPClientActivePlacement(t *testing.T) {
var (
c = mustNewTestTCPClient(t, testOptions())
emptyPl = placement.NewPlacement()
ctrl = gomock.NewController(t)
mockPl = placement.NewMockPlacement(ctrl)
stagedPlacement = placement.NewMockActiveStagedPlacement(ctrl)
watcher = placement.NewMockStagedPlacementWatcher(ctrl)
doneCalls int
)

c.placementWatcher = watcher
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil)
stagedPlacement.EXPECT().Version().Return(42)
stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, func() { doneCalls++ }, nil)
mockPl.EXPECT().Clone().Return(emptyPl)

pl, v, err := c.ActivePlacement()
assert.NoError(t, err)
assert.Equal(t, 42, v)
assert.Equal(t, 2, doneCalls)
assert.Equal(t, emptyPl, pl)
}

func TestTCPClientInitAndClose(t *testing.T) {
c := mustNewTestTCPClient(t, testOptions())
require.NoError(t, c.Init())
Expand Down

0 comments on commit 38f0545

Please sign in to comment.