Skip to content

Commit

Permalink
Fix crash when releasing all blocks
Browse files Browse the repository at this point in the history
If data corruption was found in the newest block, all blocks would be
released and trigger a panic when no block was allocated for writing.
See the stack trace below where lbm.newBlocks=0 and i=0 leads to
lbm.allocationAttemptsRemaining = 1 << -1
which panics.

2023/11/08 14:33:20 rpc error: code = Internal desc = Releasing 1 blocks due to a data integrity error
panic: runtime error: negative shift amount
goroutine 15748 [running]:
github.com/buildbarn/bb-storage/pkg/blobstore/local.(*OldCurrentNewLocationBlobMap).startAllocatingFromBlock(...)
	pkg/blobstore/local/old_current_new_location_blob_map.go:259
github.com/buildbarn/bb-storage/pkg/blobstore/local.(*OldCurrentNewLocationBlobMap).findBlockWithSpace(0xc00025e140, 0x232)
	pkg/blobstore/local/old_current_new_location_blob_map.go:302 +0x66a
github.com/buildbarn/bb-storage/pkg/blobstore/local.(*OldCurrentNewLocationBlobMap).Put(0xc00025e140, 0x232)
	pkg/blobstore/local/old_current_new_location_blob_map.go:363 +0x27

Fixes: #181
  • Loading branch information
moroten authored and EdSchouten committed Nov 11, 2023
1 parent 519a894 commit 670d2c5
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 12 deletions.
31 changes: 19 additions & 12 deletions pkg/blobstore/local/old_current_new_location_blob_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,9 @@ func NewOldCurrentNewLocationBlobMap(blockList BlockList, blockListGrowthPolicy
desiredOldBlocksCount: oldBlocksCount,
desiredNewBlocksCount: newBlocksCount,

allocationBlockIndex: -1,

lastRemovedOldBlockInsertionTime: oldCurrentNewLocationBlobMapLastRemovedOldBlockInsertionTime.WithLabelValues(storageType),
}
lbm.resetAllocationBlockIndex()
now := unixTime()
lbm.lastRemovedOldBlockInsertionTime.Set(now)

Expand Down Expand Up @@ -249,14 +248,22 @@ func (lbm *OldCurrentNewLocationBlobMap) Get(location Location) (LocationBlobGet
}, location.BlockIndex < len(lbm.oldBlocks)
}

// startAllocatingFromBlock resets the counters used to determine from
// which "new" block to allocate data. This function is called whenever
// the list of "new" blocks changes.
func (lbm *OldCurrentNewLocationBlobMap) startAllocatingFromBlock(i int) {
lbm.allocationBlockIndex = i
if i >= lbm.newBlocks-lbm.desiredNewBlocksCount {
// resetAllocationBlockIndex resets the counters used to determine from
// which "new" block to allocate data. This causes the next allocation
// to be performed against the first "new" block.
func (lbm *OldCurrentNewLocationBlobMap) resetAllocationBlockIndex() {
lbm.allocationBlockIndex = -1
lbm.allocationAttemptsRemaining = 0
}

// incrementAllocationBlockIndex increments the counters used to
// determine from which "new" block to allocate data. This function is
// called when space in a "new" block is exhausted.
func (lbm *OldCurrentNewLocationBlobMap) incrementAllocationBlockIndex() {
lbm.allocationBlockIndex = (lbm.allocationBlockIndex + 1) % lbm.newBlocks
if lbm.allocationBlockIndex >= lbm.newBlocks-lbm.desiredNewBlocksCount {
// One of the actual "new" blocks.
lbm.allocationAttemptsRemaining = 1 << (lbm.newBlocks - i - 1)
lbm.allocationAttemptsRemaining = 1 << (lbm.newBlocks - lbm.allocationBlockIndex - 1)
} else {
// One of the "current" blocks, while still in the
// initial phase where we populate all blocks.
Expand Down Expand Up @@ -299,7 +306,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in
lbm.currentBlocks--
} else {
lbm.newBlocks--
lbm.startAllocatingFromBlock(0)
lbm.resetAllocationBlockIndex()
}
}

Expand Down Expand Up @@ -342,7 +349,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in
lbm.increaseTotalBlocksToBeReleased(lbm.totalBlocksReleased)
}
}
lbm.startAllocatingFromBlock(0)
lbm.resetAllocationBlockIndex()
}

// Repeatedly attempt to allocate a blob within a "new" block.
Expand All @@ -354,7 +361,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in
return index, nil
}
}
lbm.startAllocatingFromBlock((lbm.allocationBlockIndex + 1) % lbm.newBlocks)
lbm.incrementAllocationBlockIndex()
}
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/blobstore/local/old_current_new_location_blob_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,81 @@ func TestOldCurrentNewLocationBlobMapDataCorruption(t *testing.T) {
_, err = locationBlobPutWriter(buffer.NewBufferFromError(status.Error(codes.Unknown, "Client hung up")))()
testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err)
}

func TestOldCurrentNewLocationBlobMapDataCorruptionInAllBlocks(t *testing.T) {
ctrl := gomock.NewController(t)

blockList := mock.NewMockBlockList(ctrl)
errorLogger := mock.NewMockErrorLogger(ctrl)
locationBlobMap := local.NewOldCurrentNewLocationBlobMap(
blockList,
local.NewImmutableBlockListGrowthPolicy(
/* currentBlocksCount = */ 4,
/* newBlocksCount = */ 4),
errorLogger,
"cas",
/* blockSizeBytes = */ 16,
/* oldBlocksCount = */ 2,
/* newBlocksCount = */ 4,
/* initialBlocksCount = */ 10)

// Perform a Get() call against the new block 9. Return a buffer that
// will trigger a data integrity error in the last block, as the digest
// corresponds with "Hello", not "xyzzy". This should cause the
// all blocks to be marked for immediate release.
helloDigest := digest.MustNewDigest("example", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5)
blockList.EXPECT().Get(9, helloDigest, int64(10), int64(5), gomock.Any()).DoAndReturn(
func(blockIndex int, digest digest.Digest, offsetBytes, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer {
return buffer.NewCASBufferFromByteSlice(digest, []byte("xyzzy"), buffer.BackendProvided(dataIntegrityCallback))
})
errorLogger.EXPECT().Log(status.Error(codes.Internal, "Releasing 10 blocks due to a data integrity error"))

locationBlobGetter, needsRefresh := locationBlobMap.Get(local.Location{
BlockIndex: 9,
OffsetBytes: 10,
SizeBytes: 5,
})
require.False(t, needsRefresh)
_, err := locationBlobGetter(helloDigest).ToByteSlice(10)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Buffer has checksum 1271ed5ef305aadabc605b1609e24c52, while 8b1a9953c4611296a827abf8c47804d7 was expected"), err)

// Get() is not capable of releasing blocks immediately due to
// locking constraints. Still, we should make sure that further
// reads don't end up getting sent to these blocks.
// BlockReferenceToBlockIndex() should hide the results returned
// by the underlying BlockList.
blockList.EXPECT().BlockReferenceToBlockIndex(local.BlockReference{
EpochID: 72,
BlocksFromLast: 7,
}).Return(0, uint64(0xb8e12b9fbe428eba), true)

_, _, found := locationBlobMap.BlockReferenceToBlockIndex(local.BlockReference{
EpochID: 72,
BlocksFromLast: 7,
})
require.False(t, found)

// The next time Put() is called, we should first see that all
// the (corrupted) blocks are released. Eight blocks should be
// created, so that we continue the desired minimum number of
// blocks.
blockList.EXPECT().PopFront().Times(10)
blockList.EXPECT().PushBack().Times(8)

blockList.EXPECT().HasSpace(0, int64(5)).Return(true).Times(2)
blockListPutWriter := mock.NewMockBlockListPutWriter(ctrl)
blockList.EXPECT().Put(0, int64(5)).Return(blockListPutWriter.Call)
blockListPutFinalizer := mock.NewMockBlockListPutFinalizer(ctrl)
blockListPutWriter.EXPECT().Call(gomock.Any()).DoAndReturn(
func(b buffer.Buffer) local.BlockListPutFinalizer {
_, err := b.ToByteSlice(10)
testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err)
return blockListPutFinalizer.Call
})
blockListPutFinalizer.EXPECT().Call().Return(int64(0), status.Error(codes.Unknown, "Client hung up"))

locationBlobPutWriter, err := locationBlobMap.Put(5)
require.NoError(t, err)
_, err = locationBlobPutWriter(buffer.NewBufferFromError(status.Error(codes.Unknown, "Client hung up")))()
testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err)
}

0 comments on commit 670d2c5

Please sign in to comment.