Skip to content

Commit

Permalink
Merge branch 'master' into vilius/tags_to_doc_performace_improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
vpranckaitis authored Jan 18, 2021
2 parents 9114f90 + acbe533 commit ed786ba
Show file tree
Hide file tree
Showing 18 changed files with 461 additions and 515 deletions.
16 changes: 10 additions & 6 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

func TestWatchChan(t *testing.T) {
t.Parallel()
wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled
defer closer()

ec := ecluster.RandClient()
integration.WaitClientV3(t, ec)

wc, _, err := wh.watchChanWithTimeout("foo", 0)
require.NoError(t, err)
Expand All @@ -67,9 +67,9 @@ func TestWatchChan(t *testing.T) {
}

func TestWatchSimple(t *testing.T) {
t.Parallel()
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
defer closer()
integration.WaitClientV3(t, ec)
require.Equal(t, int32(0), atomic.LoadInt32(updateCalled))

go wh.Watch("foo")
Expand Down Expand Up @@ -115,11 +115,11 @@ func TestWatchSimple(t *testing.T) {
}

func TestWatchRecreate(t *testing.T) {
t.Parallel()
wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t)
defer closer()

ec := ecluster.RandClient()
integration.WaitClientV3(t, ec)

failTotal := 1
wh.opts = wh.opts.
Expand Down Expand Up @@ -165,7 +165,6 @@ func TestWatchRecreate(t *testing.T) {
}

func TestWatchNoLeader(t *testing.T) {
t.Parallel()
const (
watchInitAndRetryDelay = 200 * time.Millisecond
watchCheckInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -210,6 +209,8 @@ func TestWatchNoLeader(t *testing.T) {
SetWatchChanResetInterval(watchInitAndRetryDelay).
SetWatchChanCheckInterval(watchCheckInterval)

integration.WaitClientV3(t, ec)

wh, err := NewWatchManager(opts)
require.NoError(t, err)

Expand All @@ -234,19 +235,21 @@ func TestWatchNoLeader(t *testing.T) {

require.NoError(t, ecluster.Members[1].Restart(t))
require.NoError(t, ecluster.Members[2].Restart(t))

// wait for leader + election delay just in case
time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration)

leaderIdx = ecluster.WaitLeader(t)
require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader")
integration.WaitClientV3(t, ec) // wait for client to be ready again

_, err = ec.Put(context.Background(), "foo", "baz")
require.NoError(t, err)

// give some time for watch to be updated
require.True(t, clock.WaitUntil(func() bool {
return atomic.LoadInt32(&updateCalled) >= 2
}, 30*time.Second))
}, 10*time.Second))

updates := atomic.LoadInt32(&updateCalled)
if updates < 2 {
Expand All @@ -269,10 +272,11 @@ func TestWatchNoLeader(t *testing.T) {
}

func TestWatchCompactedRevision(t *testing.T) {
t.Parallel()
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
defer closer()

integration.WaitClientV3(t, ec)

ts := tally.NewTestScope("", nil)
errC := ts.Counter("errors")
wh.m.etcdWatchError = errC
Expand Down
22 changes: 0 additions & 22 deletions src/cluster/kv/etcd/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func TestGetAndSet(t *testing.T) {
}

func TestNoCache(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)

store, err := NewStore(ec, opts)
Expand Down Expand Up @@ -152,8 +150,6 @@ func TestCacheDirCreation(t *testing.T) {
}

func TestCache(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)

f, err := ioutil.TempFile("", "")
Expand Down Expand Up @@ -206,8 +202,6 @@ func TestCache(t *testing.T) {
}

func TestSetIfNotExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand All @@ -227,8 +221,6 @@ func TestSetIfNotExist(t *testing.T) {
}

func TestCheckAndSet(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand All @@ -255,8 +247,6 @@ func TestCheckAndSet(t *testing.T) {
}

func TestWatchClose(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -306,8 +296,6 @@ func TestWatchClose(t *testing.T) {
}

func TestWatchLastVersion(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -350,8 +338,6 @@ func TestWatchLastVersion(t *testing.T) {
}

func TestWatchFromExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -389,8 +375,6 @@ func TestWatchFromExist(t *testing.T) {
}

func TestWatchFromNotExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -434,8 +418,6 @@ func TestGetFromKvNotFound(t *testing.T) {
}

func TestMultipleWatchesFromExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -486,8 +468,6 @@ func TestMultipleWatchesFromExist(t *testing.T) {
}

func TestMultipleWatchesFromNotExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -530,8 +510,6 @@ func TestMultipleWatchesFromNotExist(t *testing.T) {
}

func TestWatchNonBlocking(t *testing.T) {
t.Parallel()

ecluster, opts, closeFn := testCluster(t)
defer closeFn()

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3857,7 +3857,7 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD
return nil, nil, errEnqueueChIsClosed
}
c.sending++ // NB(r): This is decremented by calling the returned enqueue done function
c.enqueued += (numToEnqueue)
c.enqueued += numToEnqueue
c.Unlock()
return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil
}
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,12 +808,14 @@ func (s *service) fetchReadEncoded(ctx context.Context,
// Re-use reader and id for more memory-efficient processing of
// tags from doc.Metadata
reader := docs.NewEncodedDocumentReader()
id := ident.NewReusableBytesID()
for _, entry := range results.Map().Iter() {
idx := i
i++

id.Reset(entry.Key())
// NB(r): Use a bytes ID here so that this ID doesn't need to be
// copied by the blockRetriever in the streamRequest method when
// it checks if the ID is finalizeable or not with IsNoFinalize.
id := ident.BytesID(entry.Key())

d := entry.Value()
metadata, err := docs.MetadataFromDocument(d, reader)
Expand Down
14 changes: 11 additions & 3 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,17 @@ func (r *blockRetriever) streamRequest(
nsCtx namespace.Context,
) (bool, error) {
req.shard = shard
// NB(r): Clone the ID as we're not positive it will stay valid throughout
// the lifecycle of the async request.
req.id = r.idPool.Clone(id)

// NB(r): If the ID is a ident.BytesID then we can just hold
// onto this ID.
seriesID := id
if !seriesID.IsNoFinalize() {
// NB(r): Clone the ID as we're not positive it will stay valid throughout
// the lifecycle of the async request.
seriesID = r.idPool.Clone(id)
}

req.id = seriesID
req.start = startTime
req.blockSize = r.blockSize

Expand Down
Loading

0 comments on commit ed786ba

Please sign in to comment.