diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java index 446e0b35ab8..1fb3d47b3e0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java @@ -21,20 +21,11 @@ import com.scurrilous.circe.checksum.Crc32cIntChecksum; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.concurrent.FastThreadLocal; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.mutable.MutableInt; @Slf4j class CRC32CDigestManager extends DigestManager { - private static final FastThreadLocal currentCrc = new FastThreadLocal() { - @Override - protected MutableInt initialValue() throws Exception { - return new MutableInt(0); - } - }; - public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) { super(ledgerId, useV2Protocol, allocator); } @@ -45,16 +36,17 @@ int getMacCodeLength() { } @Override - void populateValueAndReset(ByteBuf buf) { - MutableInt current = currentCrc.get(); - buf.writeInt(current.intValue()); - current.setValue(0); + boolean isInt32Digest() { + return true; + } + + @Override + void populateValueAndReset(int digest, ByteBuf buf) { + buf.writeInt(digest); } @Override - void update(ByteBuf data, int offset, int len) { - MutableInt current = currentCrc.get(); - final int lastCrc = current.intValue(); - current.setValue(Crc32cIntChecksum.resumeChecksum(lastCrc, data, offset, len)); + int update(int digest, ByteBuf data, int offset, int len) { + return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java index ea34f130699..21be2651a7a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java @@ -57,12 +57,19 @@ int getMacCodeLength() { } @Override - void populateValueAndReset(ByteBuf buf) { + void populateValueAndReset(int digest, ByteBuf buf) { buf.writeLong(crc.get().getValueAndReset()); } @Override - void update(ByteBuf data, int offset, int len) { + int update(int digest, ByteBuf data, int offset, int len) { crc.get().update(data, offset, len); + return 0; + } + + @Override + boolean isInt32Digest() { + // This is stored as 8 bytes + return false; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index b142448aa46..87fd58eebf1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -53,13 +53,11 @@ public abstract class DigestManager { abstract int getMacCodeLength(); - void update(byte[] data) { - update(Unpooled.wrappedBuffer(data), 0, data.length); - } + abstract int update(int digest, ByteBuf buffer, int offset, int len); - abstract void update(ByteBuf buffer, int offset, int len); + abstract void populateValueAndReset(int digest, ByteBuf buffer); - abstract void populateValueAndReset(ByteBuf buffer); + abstract boolean isInt32Digest(); final int macCodeLength; @@ -112,7 +110,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC headersBuffer.writeLong(lastAddConfirmed); headersBuffer.writeLong(length); - update(headersBuffer, 0, METADATA_LENGTH); + int digest = update(0, headersBuffer, 0, METADATA_LENGTH); // don't unwrap slices final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf @@ -121,11 +119,15 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC ReferenceCountUtil.safeRelease(data); if (unwrapped instanceof CompositeByteBuf) { - ((CompositeByteBuf) unwrapped).forEach(b -> update(b, b.readerIndex(), b.readableBytes())); + CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped); + for (int i = 0; i < cbb.numComponents(); i++) { + ByteBuf b = cbb.component(i); + digest = update(digest, b, b.readerIndex(), b.readableBytes()); + } } else { - update(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); + digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); } - populateValueAndReset(headersBuffer); + populateValueAndReset(digest, headersBuffer); return ByteBufList.get(headersBuffer, unwrapped); } @@ -147,8 +149,8 @@ public ByteBufList computeDigestAndPackageForSendingLac(long lac) { headersBuffer.writeLong(ledgerId); headersBuffer.writeLong(lac); - update(headersBuffer, 0, LAC_METADATA_LENGTH); - populateValueAndReset(headersBuffer); + int digest = update(0, headersBuffer, 0, LAC_METADATA_LENGTH); + populateValueAndReset(digest, headersBuffer); return ByteBufList.get(headersBuffer); } @@ -183,18 +185,26 @@ private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryI this.getClass().getName(), dataReceived.readableBytes()); throw new BKDigestMatchException(); } - update(dataReceived, 0, METADATA_LENGTH); + int digest = update(0, dataReceived, 0, METADATA_LENGTH); int offset = METADATA_LENGTH + macCodeLength; - update(dataReceived, offset, dataReceived.readableBytes() - offset); - - ByteBuf digest = DIGEST_BUFFER.get(); - digest.clear(); - populateValueAndReset(digest); - - if (!ByteBufUtil.equals(digest, 0, dataReceived, METADATA_LENGTH, macCodeLength)) { - logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId); - throw new BKDigestMatchException(); + digest = update(digest, dataReceived, offset, dataReceived.readableBytes() - offset); + + if (isInt32Digest()) { + int receivedDigest = dataReceived.getInt(METADATA_LENGTH); + if (receivedDigest != digest) { + logger.error("Digest mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId); + throw new BKDigestMatchException(); + } + } else { + ByteBuf digestBuf = DIGEST_BUFFER.get(); + digestBuf.clear(); + populateValueAndReset(digest, digestBuf); + + if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, METADATA_LENGTH, macCodeLength)) { + logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId); + throw new BKDigestMatchException(); + } } long actualLedgerId = dataReceived.readLong(); @@ -223,17 +233,25 @@ public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchE throw new BKDigestMatchException(); } - update(dataReceived, 0, LAC_METADATA_LENGTH); + int digest = update(0, dataReceived, 0, LAC_METADATA_LENGTH); - ByteBuf digest = DIGEST_BUFFER.get(); - digest.clear(); - - populateValueAndReset(digest); - - if (!ByteBufUtil.equals(digest, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) { - logger.error("Mac mismatch for ledger-id LAC: " + ledgerId); - throw new BKDigestMatchException(); + if (isInt32Digest()) { + int receivedDigest = dataReceived.getInt(LAC_METADATA_LENGTH); + if (receivedDigest != digest) { + logger.error("Digest mismatch for ledger-id LAC: " + ledgerId); + throw new BKDigestMatchException(); + } + } else { + ByteBuf digestBuf = DIGEST_BUFFER.get(); + digestBuf.clear(); + populateValueAndReset(digest, digestBuf); + + if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) { + logger.error("Mac mismatch for ledger-id LAC: " + ledgerId); + throw new BKDigestMatchException(); + } } + long actualLedgerId = dataReceived.readLong(); long lac = dataReceived.readLong(); if (actualLedgerId != ledgerId) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java index 9414c3ee55c..b15499f0cc5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java @@ -38,8 +38,15 @@ int getMacCodeLength() { } @Override - void update(ByteBuf buffer, int offset, int len) {} + int update(int digest, ByteBuf buffer, int offset, int len) { + return 0; + } @Override - void populateValueAndReset(ByteBuf buffer) {} + void populateValueAndReset(int digest, ByteBuf buffer) {} + + @Override + boolean isInt32Digest() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java index 515e0f24845..c04c411c6c7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java @@ -91,14 +91,18 @@ int getMacCodeLength() { @Override - void populateValueAndReset(ByteBuf buffer) { + void populateValueAndReset(int digest, ByteBuf buffer) { buffer.writeBytes(mac.get().doFinal()); } @Override - void update(ByteBuf data, int offset, int len) { + int update(int digest, ByteBuf data, int offset, int len) { mac.get().update(data.slice(offset, len).nioBuffer()); + return 0; } - + @Override + boolean isInt32Digest() { + return false; + } } diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java index 59bb6708bf6..7201aba8cf0 100644 --- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java +++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java @@ -172,9 +172,9 @@ public DigestManager getDigestManager(Digest digest) { public void digestManager(MyState state) { final ByteBuf buff = state.getByteBuff(state.bufferType); final DigestManager dm = state.getDigestManager(state.digest); - dm.update(buff, 0, buff.readableBytes()); + int digest = dm.update(0, buff, 0, buff.readableBytes()); state.digestBuf.clear(); - dm.populateValueAndReset(state.digestBuf); + dm.populateValueAndReset(digest, state.digestBuf); } }