Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][tiered storage] Reduce cpu usage when offloading the ledger #15063

Merged
merged 20 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.io.IOException;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
Expand All @@ -46,6 +48,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);

static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD);

private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128);

private final ReadHandle ledger;
private final long startEntryId;
Expand All @@ -69,6 +74,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
private List<ByteBuf> entriesByteBuf = null;
private LedgerOffloaderStats offloaderStats;
private String topicName;
private int currentOffset = 0;
private final AtomicBoolean close = new AtomicBoolean(false);

public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
this.ledger = ledger;
Expand All @@ -87,6 +94,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
this.topicName = ledgerName;
}

private ByteBuf readEntries(int len) throws IOException {
checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
checkState(bytesReadOffset < blockSize);

// once reach the end of entry buffer, read more, if there is more
if (bytesReadOffset < dataBlockFullOffset
&& entriesByteBuf.isEmpty()
&& startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
zymap marked this conversation as resolved.
Show resolved Hide resolved
}

if (!entriesByteBuf.isEmpty()
zymap marked this conversation as resolved.
Show resolved Hide resolved
&& bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
Copy link
Member

@horizonzy horizonzy May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needn't check every time, maybe it just need check then entriesByteBuf's 0 element changes.

// always read from the first ByteBuf in the list, once read all of its content remove it.
ByteBuf entryByteBuf = entriesByteBuf.get(0);
int readableBytes = entryByteBuf.readableBytes();
int read = Math.min(readableBytes, len);
ByteBuf buf = entryByteBuf.slice(currentOffset, read);
buf.retain();
currentOffset += read;
entryByteBuf.readerIndex(currentOffset);
bytesReadOffset += read;

if (entryByteBuf.readableBytes() == 0) {
entryByteBuf.release();
entriesByteBuf.remove(0);
blockEntryCount++;
currentOffset = 0;
}

return buf;
} else {
// no space for a new entry or there are no more entries
// set data block full, return end padding
if (dataBlockFullOffset == blockSize) {
zymap marked this conversation as resolved.
Show resolved Hide resolved
dataBlockFullOffset = bytesReadOffset;
}
paddingBuf.clear();
for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) {
paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset)
% BLOCK_END_PADDING_BYTES.length]);
}
return paddingBuf.retain();
}
}

// read ledger entries.
private int readEntries() throws IOException {
checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
Expand Down Expand Up @@ -161,6 +214,46 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
zymap marked this conversation as resolved.
Show resolved Hide resolved
if (b == null) {
throw new NullPointerException("The given bytes are null");
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
} else if (len == 0) {
return 0;
}

int offset = off;
int readLen = len;
int readBytes = 0;
// reading header
if (dataBlockHeaderStream.available() > 0) {
int read = dataBlockHeaderStream.read(b, off, len);
offset += read;
readLen -= read;
readBytes += read;
bytesReadOffset += read;
zymap marked this conversation as resolved.
Show resolved Hide resolved
}
if (readLen == 0) {
return readBytes;
}

// reading ledger entries
if (bytesReadOffset < blockSize) {
zymap marked this conversation as resolved.
Show resolved Hide resolved
readLen = Math.min(readLen, blockSize - bytesReadOffset);
ByteBuf readEntries = readEntries(readLen);
int read = readEntries.readableBytes();
readEntries.readBytes(b, offset, read);
readEntries.release();
readBytes += read;
return readBytes;
}

// reached end
return -1;
zymap marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public int read() throws IOException {
// reading header
Expand All @@ -180,11 +273,20 @@ public int read() throws IOException {

@Override
public void close() throws IOException {
super.close();
dataBlockHeaderStream.close();
if (!entriesByteBuf.isEmpty()) {
entriesByteBuf.forEach(buf -> buf.release());
entriesByteBuf.clear();
// The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method.
// The stream resource used by the try-with block which will called the close
// And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger
// the close method.
// So we add the close variable to avoid release paddingBuf twice.
if (!close.compareAndSet(false, true)) {
super.close();
dataBlockHeaderStream.close();
if (!entriesByteBuf.isEmpty()) {
entriesByteBuf.forEach(buf -> buf.release());
entriesByteBuf.clear();
}
paddingBuf.clear();
paddingBuf.release();
}
}

Expand All @@ -203,6 +305,10 @@ public int getBlockSize() {
return blockSize;
}

public int getDataBlockFullOffset() {
return dataBlockFullOffset;
}

@Override
public int getBlockEntryCount() {
return blockEntryCount;
Expand Down
Loading