Skip to content

Commit

Permalink
[BP-62] Add more test for Bookkeeper. (#4210)
Browse files Browse the repository at this point in the history
* Add test to cover Bookkeeper test.

* fix ci.

* Fix ci.
  • Loading branch information
horizonzy authored Feb 20, 2024
1 parent 978414f commit eb27e6e
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public int compare(String o1, String o2) {
}
};

private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] passwd) {
private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] passwd, int batchEntries) {
LOG.info("Reading ledger {}", ledgerId);
BookKeeper bk = null;
long time = 0;
Expand Down Expand Up @@ -102,17 +102,23 @@ private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] p
}
long starttime = System.nanoTime();

while (lastRead < lastConfirmed) {
while (entriesRead <= lastConfirmed) {
long nextLimit = lastRead + 100000;
long readTo = Math.min(nextLimit, lastConfirmed);
Enumeration<LedgerEntry> entries = lh.readEntries(lastRead + 1, readTo);
lastRead = readTo;
Enumeration<LedgerEntry> entries;
if (batchEntries <= 0) {
long readTo = Math.min(nextLimit, lastConfirmed);
entries = lh.readEntries(lastRead + 1, readTo);
} else {
entries = lh.batchReadEntries(lastRead, batchEntries, -1);
}
while (entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
entriesRead++;
lastRead = e.getEntryId();
if ((entriesRead % 10000) == 0) {
LOG.info("{} entries read", entriesRead);
LOG.info("{} entries read from ledger {}", entriesRead, ledgerId);
}
e.getEntryBuffer().release();
}
}
long endtime = System.nanoTime();
Expand Down Expand Up @@ -159,6 +165,8 @@ public static void main(String[] args) throws Exception {
options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
options.addOption("useV2", false, "Whether use V2 protocol to read ledgers from the bookie server.");
options.addOption("help", false, "This message");
options.addOption("batchentries", true, "The batch read entries count. "
+ "If the value is greater than 0, uses batch read. Or uses the single read. Default 1000");

CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
Expand All @@ -171,6 +179,7 @@ public static void main(String[] args) throws Exception {
final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes(UTF_8);
final int sockTimeout = Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));
final int batchentries = Integer.parseInt(cmd.getOptionValue("batchentries", "1000"));
if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
LOG.error("Cannot used -ledger and -listen together");
usage(options);
Expand Down Expand Up @@ -210,7 +219,7 @@ public void process(WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeCreated
&& event.getPath().equals(nodepath)) {
readLedger(conf, ledger.get(), passwd);
readLedger(conf, ledger.get(), passwd, batchentries);
shutdownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
if (numLedgers.get() < 0) {
Expand All @@ -236,7 +245,7 @@ public void process(WatchedEvent event) {
Thread t = new Thread() {
@Override
public void run() {
readLedger(conf, ledgerId, passwd);
readLedger(conf, ledgerId, passwd, batchentries);
}
};
t.start();
Expand All @@ -259,7 +268,7 @@ public void run() {

if (ledger.get() != 0) {
if (zk.exists(nodepath, true) != null) {
readLedger(conf, ledger.get(), passwd);
readLedger(conf, ledger.get(), passwd, batchentries);
shutdownLatch.countDown();
} else {
LOG.info("Watching for creation of" + nodepath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
* the total entries count.
* @param maxSize
* the total entries size.
* @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object)
* @see #asyncBatchReadEntries(long, int, long, ReadCallback, Object)
*/
public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int maxCount, long maxSize)
throws InterruptedException, BKException {
Expand Down Expand Up @@ -688,7 +688,7 @@ public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long las
/**
* Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br>
* This is the same of
* {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, ReadCallback, Object) }
* {@link #asyncBatchReadUnconfirmedEntries(long, int, long, ReadCallback, Object) }
*
* @param firstEntry
* id of first entry of sequence (included)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,11 @@ public long getClientConnectBookieUnavailableLogThrottlingMs() {
return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 5_000L);
}

public ClientConfiguration setBatchReadEnabled(boolean enable) {
setProperty(BATCH_READ_ENABLED, enable);
return this;
}

public boolean isBatchReadEnabled() {
return getBoolean(BATCH_READ_ENABLED, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,185 @@ public void testReadWriteWithV2WireProtocol() throws Exception {
}
}

@Test
public void testBatchReadFailBackToSingleRead1() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
int numEntries = 100;
byte[] data = "foobar".getBytes();
try (BookKeeper bkc = new BookKeeper(conf)) {
// basic read/write
{
long ledgerId;
try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
digestType, "testPasswd".getBytes())) {
ledgerId = lh.getId();
for (int i = 0; i < numEntries; i++) {
lh.addEntry(data);
}
}
try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes())) {
assertEquals(numEntries - 1, lh.readLastConfirmed());
//V3 protocol not support batch read. In theory, it will throw UnsupportedOperationException.
try {
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
fail("Should throw UnsupportedOperationException.");
} catch (UnsupportedOperationException e) {
assertEquals("Unsupported batch read entry operation for v3 protocol.", e.getMessage());
}
}
}
}

try (BookKeeper bkc = new BookKeeper(conf)) {
// basic read/write
{
long ledgerId;
try (LedgerHandle lh = bkc.createLedger(3, 2, 2,
digestType, "testPasswd".getBytes())) {
ledgerId = lh.getId();
for (int i = 0; i < numEntries; i++) {
lh.addEntry(data);
}
}
try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes())) {
assertEquals(numEntries - 1, lh.readLastConfirmed());
//The ledger ensemble is not equals write quorum, so failback to single read, it also can
//read data successfully.
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
}
}
}
}
}

@Test
public void testBatchReadFailBackToSingleRead2() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
int numEntries = 100;
byte[] data = "foobar".getBytes();
try (BookKeeper bkc = new BookKeeper(conf)) {
// basic read/write
{
long ledgerId;
try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
digestType, "testPasswd".getBytes())) {
ledgerId = lh.getId();
for (int i = 0; i < numEntries; i++) {
lh.addEntry(data);
}
}
try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes())) {
assertEquals(numEntries - 1, lh.readLastConfirmed());
//V3 protocol not support batch read, it will throw UnsupportedOperationException.
try {
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
fail("Should throw UnsupportedOperationException.");
} catch (UnsupportedOperationException e) {
assertEquals("Unsupported batch read entry operation for v3 protocol.", e.getMessage());
}
}
}
}

conf.setBatchReadEnabled(false);
try (BookKeeper bkc = new BookKeeper(conf)) {
// basic read/write
{
long ledgerId;
try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
digestType, "testPasswd".getBytes())) {
ledgerId = lh.getId();
for (int i = 0; i < numEntries; i++) {
lh.addEntry(data);
}
}
try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes())) {
assertEquals(numEntries - 1, lh.readLastConfirmed());
//We config disable the batch read, so failback to single read, it also can
//read data successfully.
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
}
}
}
}
}

@Test
public void testBatchReadWithV2Protocol() throws Exception {
ClientConfiguration conf = new ClientConfiguration().setUseV2WireProtocol(true);
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
int numEntries = 100;
byte[] data = "foobar".getBytes();
try (BookKeeper bkc = new BookKeeper(conf)) {
// basic read/write
{
long ledgerId;
try (LedgerHandle lh = bkc.createLedger(2, 2, 2, digestType, "testPasswd".getBytes())) {
ledgerId = lh.getId();
for (int i = 0; i < numEntries; i++) {
lh.addEntry(data);
}
}
try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes())) {
assertEquals(numEntries - 1, lh.readLastConfirmed());
int entries = 0;
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
entries++;
}
assertEquals(numEntries, entries);

//The maxCount is 0, the result is only limited by maxSize.
entries = 0;
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, 0, 5 * 1024 * 1024);
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
entries++;
}
assertEquals(numEntries, entries);

// one entry size = 8(ledgerId) + 8(entryId) + 8(lac) + 8(length) + 8(digest) + payload size
long entrySize = 8 + 8 + 8 + 8 + 8 + data.length;
//response header size.
int headerSize = 24 + 8 + 4;
//The maxCount is 0, the result is only limited by maxSize.
entries = 0;
int expectEntriesNum = 5;
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, 0,
expectEntriesNum * entrySize + headerSize + (expectEntriesNum * 4));
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
entries++;
}
assertEquals(expectEntriesNum, entries);

//The maxCount is 100, the result entries reach maxSize limit.
entries = 0;
for (Enumeration<LedgerEntry> readEntries = lh.batchReadEntries(0, 20,
expectEntriesNum * entrySize + headerSize + (expectEntriesNum * 4));
readEntries.hasMoreElements();) {
LedgerEntry entry = readEntries.nextElement();
assertArrayEquals(data, entry.getEntry());
entries++;
}
assertEquals(expectEntriesNum, entries);
}
}
}
}

@SuppressWarnings("deprecation")
@Test
public void testReadEntryReleaseByteBufs() throws Exception {
Expand Down

0 comments on commit eb27e6e

Please sign in to comment.