From e1e7f50e51d96ba8883f624cda78419a35e01c0e Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Fri, 8 Jan 2021 18:48:20 -0500 Subject: [PATCH] [aggregator] Add ActivePlacementVersion to tcp client --- src/aggregator/client/tcp_client.go | 12 ++++++++++++ src/aggregator/client/tcp_client_test.go | 9 +++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/aggregator/client/tcp_client.go b/src/aggregator/client/tcp_client.go index 40a519457c..38209e20f7 100644 --- a/src/aggregator/client/tcp_client.go +++ b/src/aggregator/client/tcp_client.go @@ -239,6 +239,18 @@ func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) { return placement.Clone(), stagedPlacement.Version(), nil } +// ActivePlacementVersion returns a copy of the currently active placement version. It is a far less expensive call +// than ActivePlacement, as it does not clone the placement. +func (c *TCPClient) ActivePlacementVersion() (int, error) { + stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement() + if err != nil { + return 0, err + } + defer onStagedPlacementDoneFn() + + return stagedPlacement.Version(), nil +} + // Flush flushes any remaining data buffered by the client. func (c *TCPClient) Flush() error { c.metrics.flush.Inc(1) diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index b1e5db411d..a6f4a4af36 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -799,8 +799,8 @@ func TestTCPClientActivePlacement(t *testing.T) { ) c.placementWatcher = watcher - watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil) - stagedPlacement.EXPECT().Version().Return(42) + watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil).Times(2) + stagedPlacement.EXPECT().Version().Return(42).Times(2) stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, func() { doneCalls++ }, nil) mockPl.EXPECT().Clone().Return(emptyPl) @@ -809,6 +809,11 @@ func TestTCPClientActivePlacement(t *testing.T) { assert.Equal(t, 42, v) assert.Equal(t, 2, doneCalls) assert.Equal(t, emptyPl, pl) + + v, err = c.ActivePlacementVersion() + assert.NoError(t, err) + assert.Equal(t, 42, v) + assert.Equal(t, 3, doneCalls) } func TestTCPClientInitAndClose(t *testing.T) {