Skip to content

Commit

Permalink
Avoid thread-local state when computing CRCs (#3811)
Browse files Browse the repository at this point in the history
### Motivation

In `DigestManager` there are several accesses to ThreadLocal variable per each entry processed.

The reason is the mainly due to `DigestManager` API which exposes a stateful `update()` method which can be invoked multiple times and keeps the current checksum as a thread-local variable.

If we exclude MAC digest which is 20 bytes, for other digests we can instead keep the current checksum in a local variable and pass it each time, avoiding all the thread-locals and also the need for writing the checksum result into a buffer.

### Benchmarks

#### Before #3810 

```
Benchmark                            (entrySize)   Mode  Cnt   Score   Error   Units
DigestManagerBenchmark.verifyDigest           64  thrpt    3  13.450 ± 3.634  ops/us
DigestManagerBenchmark.verifyDigest         1024  thrpt    3   7.908 ± 2.637  ops/us
DigestManagerBenchmark.verifyDigest         4086  thrpt    3   3.233 ± 0.882  ops/us
DigestManagerBenchmark.verifyDigest         8192  thrpt    3   1.846 ± 0.047  ops/us
```

#### After #3810 

```
Benchmark                            (entrySize)   Mode  Cnt   Score   Error   Units
DigestManagerBenchmark.verifyDigest           64  thrpt    3  46.312 ± 7.414  ops/us
DigestManagerBenchmark.verifyDigest         1024  thrpt    3  13.379 ± 1.069  ops/us
DigestManagerBenchmark.verifyDigest         4086  thrpt    3   3.787 ± 0.059  ops/us
DigestManagerBenchmark.verifyDigest         8192  thrpt    3   1.956 ± 0.052  ops/us
```

#### After this change



```
Benchmark                            (entrySize)   Mode  Cnt    Score   Error   Units
DigestManagerBenchmark.verifyDigest           64  thrpt    3  130.108 ± 4.854  ops/us
DigestManagerBenchmark.verifyDigest         1024  thrpt    3   17.744 ± 0.238  ops/us
DigestManagerBenchmark.verifyDigest         4086  thrpt    3    4.104 ± 0.181  ops/us
DigestManagerBenchmark.verifyDigest         8192  thrpt    3    2.050 ± 0.066  ops/us
```
  • Loading branch information
merlimat authored Feb 27, 2023
1 parent 1f8de8f commit 1e02853
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,11 @@
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;

@Slf4j
class CRC32CDigestManager extends DigestManager {

private static final FastThreadLocal<MutableInt> currentCrc = new FastThreadLocal<MutableInt>() {
@Override
protected MutableInt initialValue() throws Exception {
return new MutableInt(0);
}
};

public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
super(ledgerId, useV2Protocol, allocator);
}
Expand All @@ -45,16 +36,17 @@ int getMacCodeLength() {
}

@Override
void populateValueAndReset(ByteBuf buf) {
MutableInt current = currentCrc.get();
buf.writeInt(current.intValue());
current.setValue(0);
boolean isInt32Digest() {
return true;
}

@Override
void populateValueAndReset(int digest, ByteBuf buf) {
buf.writeInt(digest);
}

@Override
void update(ByteBuf data, int offset, int len) {
MutableInt current = currentCrc.get();
final int lastCrc = current.intValue();
current.setValue(Crc32cIntChecksum.resumeChecksum(lastCrc, data, offset, len));
int update(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ int getMacCodeLength() {
}

@Override
void populateValueAndReset(ByteBuf buf) {
void populateValueAndReset(int digest, ByteBuf buf) {
buf.writeLong(crc.get().getValueAndReset());
}

@Override
void update(ByteBuf data, int offset, int len) {
int update(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
// This is stored as 8 bytes
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ public abstract class DigestManager {

abstract int getMacCodeLength();

void update(byte[] data) {
update(Unpooled.wrappedBuffer(data), 0, data.length);
}
abstract int update(int digest, ByteBuf buffer, int offset, int len);

abstract void update(ByteBuf buffer, int offset, int len);
abstract void populateValueAndReset(int digest, ByteBuf buffer);

abstract void populateValueAndReset(ByteBuf buffer);
abstract boolean isInt32Digest();

final int macCodeLength;

Expand Down Expand Up @@ -112,7 +110,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
headersBuffer.writeLong(lastAddConfirmed);
headersBuffer.writeLong(length);

update(headersBuffer, 0, METADATA_LENGTH);
int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
Expand All @@ -121,11 +119,15 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> update(b, b.readerIndex(), b.readableBytes()));
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
update(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
populateValueAndReset(headersBuffer);
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
}
Expand All @@ -147,8 +149,8 @@ public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(lac);

update(headersBuffer, 0, LAC_METADATA_LENGTH);
populateValueAndReset(headersBuffer);
int digest = update(0, headersBuffer, 0, LAC_METADATA_LENGTH);
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer);
}
Expand Down Expand Up @@ -183,18 +185,26 @@ private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryI
this.getClass().getName(), dataReceived.readableBytes());
throw new BKDigestMatchException();
}
update(dataReceived, 0, METADATA_LENGTH);
int digest = update(0, dataReceived, 0, METADATA_LENGTH);

int offset = METADATA_LENGTH + macCodeLength;
update(dataReceived, offset, dataReceived.readableBytes() - offset);

ByteBuf digest = DIGEST_BUFFER.get();
digest.clear();
populateValueAndReset(digest);

if (!ByteBufUtil.equals(digest, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
digest = update(digest, dataReceived, offset, dataReceived.readableBytes() - offset);

if (isInt32Digest()) {
int receivedDigest = dataReceived.getInt(METADATA_LENGTH);
if (receivedDigest != digest) {
logger.error("Digest mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
}
} else {
ByteBuf digestBuf = DIGEST_BUFFER.get();
digestBuf.clear();
populateValueAndReset(digest, digestBuf);

if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
}
}

long actualLedgerId = dataReceived.readLong();
Expand Down Expand Up @@ -223,17 +233,25 @@ public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchE
throw new BKDigestMatchException();
}

update(dataReceived, 0, LAC_METADATA_LENGTH);
int digest = update(0, dataReceived, 0, LAC_METADATA_LENGTH);

ByteBuf digest = DIGEST_BUFFER.get();
digest.clear();

populateValueAndReset(digest);

if (!ByteBufUtil.equals(digest, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
if (isInt32Digest()) {
int receivedDigest = dataReceived.getInt(LAC_METADATA_LENGTH);
if (receivedDigest != digest) {
logger.error("Digest mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
}
} else {
ByteBuf digestBuf = DIGEST_BUFFER.get();
digestBuf.clear();
populateValueAndReset(digest, digestBuf);

if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
}
}

long actualLedgerId = dataReceived.readLong();
long lac = dataReceived.readLong();
if (actualLedgerId != ledgerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,15 @@ int getMacCodeLength() {
}

@Override
void update(ByteBuf buffer, int offset, int len) {}
int update(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

@Override
void populateValueAndReset(ByteBuf buffer) {}
void populateValueAndReset(int digest, ByteBuf buffer) {}

@Override
boolean isInt32Digest() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ int getMacCodeLength() {


@Override
void populateValueAndReset(ByteBuf buffer) {
void populateValueAndReset(int digest, ByteBuf buffer) {
buffer.writeBytes(mac.get().doFinal());
}

@Override
void update(ByteBuf data, int offset, int len) {
int update(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}


@Override
boolean isInt32Digest() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public DigestManager getDigestManager(Digest digest) {
public void digestManager(MyState state) {
final ByteBuf buff = state.getByteBuff(state.bufferType);
final DigestManager dm = state.getDigestManager(state.digest);
dm.update(buff, 0, buff.readableBytes());
int digest = dm.update(0, buff, 0, buff.readableBytes());
state.digestBuf.clear();
dm.populateValueAndReset(state.digestBuf);
dm.populateValueAndReset(digest, state.digestBuf);
}

}

0 comments on commit 1e02853

Please sign in to comment.