diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index cf13aea403..72173a4141 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -59,6 +59,7 @@ import ( xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" + apachethrift "github.com/apache/thrift/lib/go/thrift" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" @@ -2265,15 +2266,18 @@ func (s *session) streamBlocksMetadataFromPeer( data.IncRef() data.AppendAll(elem.ID) data.DecRef() - clonedID := idPool.BinaryID(data) + // Return thrift bytes to pool once the ID has been copied. + apachethrift.BytesPoolPut(elem.ID) var encodedTags checked.Bytes - if bytes := elem.EncodedTags; len(bytes) != 0 { - encodedTags = bytesPool.Get(len(bytes)) + if tagBytes := elem.EncodedTags; len(tagBytes) != 0 { + encodedTags = bytesPool.Get(len(tagBytes)) encodedTags.IncRef() - encodedTags.AppendAll(bytes) + encodedTags.AppendAll(tagBytes) encodedTags.DecRef() + // Return thrift bytes to pool once the tags have been copied. + apachethrift.BytesPoolPut(tagBytes) } // Error occurred retrieving block metadata, use default values diff --git a/src/dbnode/storage/repair/metadata.go b/src/dbnode/storage/repair/metadata.go index 3eebc373de..257f136284 100644 --- a/src/dbnode/storage/repair/metadata.go +++ b/src/dbnode/storage/repair/metadata.go @@ -223,11 +223,25 @@ func (m replicaMetadataComparer) Compare() MetadataComparisonResult { for _, hm := range bm { if !originContainsBlock { - if hm.Host.String() == m.origin.String() { + if hm.Host.ID() == m.origin.ID() { originContainsBlock = true } } + if hm.Metadata.Checksum == nil { + // Skip metadata that doesn't have a checksum. This usually means that the + // metadata represents unmerged or pending data. Better to skip for now and + // repair it once it has been merged as opposed to repairing it now and + // ping-ponging the same data back and forth between all the repairing nodes. + // + // The impact of this is that recently modified data may take longer to be + // repaired, but it saves a ton of work by preventing nodes from repairing + // from each other unnecessarily even when they have identical data. + // + // TODO(rartoul): Consider skipping series with duplicate metadata as well? + continue + } + // Check size. if firstSize { sizeVal = hm.Metadata.Size diff --git a/src/dbnode/storage/repair/metadata_test.go b/src/dbnode/storage/repair/metadata_test.go index 8085d5a024..c1cb62d4c7 100644 --- a/src/dbnode/storage/repair/metadata_test.go +++ b/src/dbnode/storage/repair/metadata_test.go @@ -252,6 +252,8 @@ func TestReplicaMetadataComparerCompare(t *testing.T) { Host: hosts[1], Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(time.Second), int64(1), &ten, time.Time{}), }, + // hosts[0] has a checksum but hosts[1] doesn't so this block will not be repaired (skipped until the next attempt) at + // which points hosts[1] will have merged the blocks and an accurate comparison can be made. block.ReplicaMetadata{ Host: hosts[0], Metadata: block.NewMetadata(ident.StringID("baz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &twenty, time.Time{}), @@ -260,6 +262,15 @@ func TestReplicaMetadataComparerCompare(t *testing.T) { Host: hosts[1], Metadata: block.NewMetadata(ident.StringID("baz"), ident.Tags{}, now.Add(2*time.Second), int64(2), nil, time.Time{}), }, + // hosts[0] and hosts[1] both have a checksum, but they differ, so this should trigger a checksum mismatch. + block.ReplicaMetadata{ + Host: hosts[0], + Metadata: block.NewMetadata(ident.StringID("boz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &twenty, time.Time{}), + }, + block.ReplicaMetadata{ + Host: hosts[1], + Metadata: block.NewMetadata(ident.StringID("boz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &ten, time.Time{}), + }, // Block only exists for host[1] but host[0] is the origin so should be consider a size/checksum mismatch. block.ReplicaMetadata{ Host: hosts[1], @@ -282,17 +293,17 @@ func TestReplicaMetadataComparerCompare(t *testing.T) { inputs[3], }}, {ident.StringID("gah"), now.Add(3 * time.Second), []block.ReplicaMetadata{ - inputs[6], + inputs[8], }}, } checksumExpected := []testBlock{ - {ident.StringID("baz"), now.Add(2 * time.Second), []block.ReplicaMetadata{ - inputs[4], - inputs[5], + {ident.StringID("boz"), now.Add(2 * time.Second), []block.ReplicaMetadata{ + inputs[6], + inputs[7], }}, {ident.StringID("gah"), now.Add(3 * time.Second), []block.ReplicaMetadata{ - inputs[6], + inputs[8], }}, } @@ -300,8 +311,8 @@ func TestReplicaMetadataComparerCompare(t *testing.T) { m.metadata = metadata res := m.Compare() - require.Equal(t, int64(5), res.NumSeries) - require.Equal(t, int64(5), res.NumBlocks) + require.Equal(t, int64(6), res.NumSeries) + require.Equal(t, int64(6), res.NumBlocks) assertEqual(t, sizeExpected, res.SizeDifferences) assertEqual(t, checksumExpected, res.ChecksumDifferences) } diff --git a/src/dbnode/storage/repair/options.go b/src/dbnode/storage/repair/options.go index 57616b439b..f3ba6e4dde 100644 --- a/src/dbnode/storage/repair/options.go +++ b/src/dbnode/storage/repair/options.go @@ -30,7 +30,9 @@ import ( ) const ( - defaultRepairConsistencyLevel = topology.ReadConsistencyLevelMajority + // Allow repairs to progress when a single peer is down (I.E during single node failure + // or deployments). + defaultRepairConsistencyLevel = topology.ReadConsistencyLevelUnstrictMajority defaultRepairCheckInterval = time.Minute defaultRepairThrottle = 90 * time.Second defaultRepairShardConcurrency = 1