Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

fix: make sure return "maxFetchCount" record if valid record is enough when multiGetSortKeys #148

Merged
merged 17 commits into from
Jan 29, 2021
12 changes: 4 additions & 8 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult;
import com.xiaomi.infra.pegasus.rpc.*;
import com.xiaomi.infra.pegasus.tools.Tools;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -287,9 +288,7 @@ public boolean multiGet(
PegasusTable tb = getTable(tableName);
PegasusTableInterface.MultiGetResult res =
tb.multiGet(hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, 0);
for (Pair<byte[], byte[]> kv : res.values) {
values.add(kv);
}
values.addAll(res.values);
return res.allFetched;
}

Expand Down Expand Up @@ -331,11 +330,8 @@ public boolean multiGetSortKeys(
throw new PException("Invalid parameter: sortKeys should not be null");
}
PegasusTable table = getTable(tableName);
PegasusTableInterface.MultiGetSortKeysResult result =
table.multiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, 0);
for (byte[] key : result.keys) {
sortKeys.add(key);
}
MultiGetSortKeysResult result = table.multiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, 0);
sortKeys.addAll(result.keys);
return result.allFetched;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,19 @@ public int batchMultiGet2(
*
* @param tableName table name
* @param hashKey used to decide which partition to put this k-v, should not be null or empty.
* @param maxFetchCount max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
* @param maxFetchCount max count of sortKeys to be fetched. max_fetch_count <= 0 means no limit.
* default value is 100.
* @param maxFetchSize max size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
* default value is 1000000.
* @param maxFetchSize deprecated argument, can be ignored
* @param sortKeys output sort keys.
* @return true if all data is fetched; false if only partial data is fetched.
* @throws PException throws exception if any error occurs.
*/
public boolean multiGetSortKeys(
String tableName, byte[] hashKey, int maxFetchCount, int maxFetchSize, List<byte[]> sortKeys)
String tableName,
byte[] hashKey,
int maxFetchCount,
int maxFetchSize /*Deprecated*/,
List<byte[]> sortKeys)
throws PException;

public boolean multiGetSortKeys(String tableName, byte[] hashKey, List<byte[]> sortKeys)
Expand Down
93 changes: 72 additions & 21 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1173,31 +1173,21 @@ public int batchMultiGet2(
public MultiGetSortKeysResult multiGetSortKeys(
byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout) throws PException {
if (timeout <= 0) timeout = defaultTimeout;
try {
return asyncMultiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, timeout)
.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw PException.threadInterrupted(table.getTableName(), e);
} catch (TimeoutException e) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey, maxFetchCount), timeout, e);
} catch (ExecutionException e) {
throw new PException(e);
}
MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult();
sortKeysResult.keys = new ArrayList<>();
ScanOptions options = new ScanOptions();
options.noValue = true;
ScanRangeResult result = scanRange(hashKey, null, null, options, maxFetchCount, timeout);
for (Pair<Pair<byte[], byte[]>, byte[]> pair : result.results) {
sortKeysResult.keys.add(pair.getLeft().getValue());
}
sortKeysResult.allFetched = result.allFetched;
return sortKeysResult;
}

@Override
public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout) throws PException {
if (timeout <= 0) timeout = defaultTimeout;
try {
return asyncMultiGetSortKeys(hashKey, timeout).get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw PException.threadInterrupted(table.getTableName(), e);
} catch (TimeoutException e) {
throw PException.timeout(metaList, table.getTableName(), new Request(hashKey), timeout, e);
} catch (ExecutionException e) {
throw new PException(e);
}
return multiGetSortKeys(hashKey, 100, -1, timeout);
}

@Override
Expand Down Expand Up @@ -1828,6 +1818,67 @@ public List<PegasusScannerInterface> getUnorderedScanners(
return ret;
}

/**
* {@linkplain #scanRange(byte[], byte[], byte[], ScanOptions, int, int)} result, if fetch all
* data for {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
*/
static class ScanRangeResult {
public List<Pair<Pair<byte[], byte[]>, byte[]>> results;
public boolean allFetched;
}

/**
* get scan result for {startSortKey, stopSortKey} within hashKey
*
* @param hashKey used to decide which partition to put this k-v,
* @param startSortKey start sort key scan from if null or length == 0, means start from begin
* @param stopSortKey stop sort key scan to if null or length == 0, means stop to end
* @param options scan options like endpoint inclusive/exclusive
* @param maxFetchCount max count of k-v pairs to be fetched. if <=0 means fetch all data for
* {startSortKey, stopSortKey}
* @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with
* "timeout" of config
* @return ScanRangeResult result{pair((hashKey, sortKey), value}, if fetch all data for
* {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
* @throws PException
*/
ScanRangeResult scanRange(
byte[] hashKey,
byte[] startSortKey,
byte[] stopSortKey,
ScanOptions options,
int maxFetchCount,
int timeout /*ms*/)
throws PException {
if (timeout <= 0) timeout = defaultTimeout;
long deadlineTime = System.currentTimeMillis() + timeout;

PegasusScannerInterface pegasusScanner =
getScanner(hashKey, startSortKey, stopSortKey, options);
ScanRangeResult scanRangeResult = new ScanRangeResult();
scanRangeResult.allFetched = false;
scanRangeResult.results = new ArrayList<>();
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
}

Pair<Pair<byte[], byte[]>, byte[]> pair;
while ((pair = pegasusScanner.next()) != null
&& (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) {
if (System.currentTimeMillis() >= deadlineTime) {
throw PException.timeout(
metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
}
scanRangeResult.results.add(pair);
}

if (pegasusScanner.next() == null) {
scanRangeResult.allFetched = true;
}
return scanRangeResult;
}

public void handleReplicaException(
Request request, DefaultPromise promise, client_operator op, Table table, int timeout) {
if (timeout <= 0) timeout = defaultTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,11 +858,20 @@ public int batchMultiGet2(
throws PException;

/**
* sync version of MultiGetSortKeys, please refer to the async version {@link
* #asyncMultiGetSortKeys(byte[], int, int, int)} and {@link #asyncMultiGetSortKeys(byte[], int)}
* sync fetch sortKeys under the same hash key, which is wrapper of {@linkplain
* #getScanner(byte[], byte[], byte[], ScanOptions)}
*
* @param hashKey hashKey, should not be null or empty
* @param maxFetchCount max count of sortKeys to be fetched. max_fetch_count <= 0 means no limit.
* default value is 100.
* @param maxFetchSize deprecated argument, can be ignored
* @param timeout timeout
* @return MultiGetSortKeysResult
* @throws PException if exceed timeout will throw exception
*/
public MultiGetSortKeysResult multiGetSortKeys(
byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout /*ms*/) throws PException;
byte[] hashKey, int maxFetchCount, int maxFetchSize /*Deprecated*/, int timeout /*ms*/)
throws PException;

public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout /*ms*/)
throws PException;
Expand Down
129 changes: 129 additions & 0 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.xiaomi.infra.pegasus.client;

/** @author qinzuoyan */
import com.xiaomi.infra.pegasus.client.PegasusTable.ScanRangeResult;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -2629,4 +2631,131 @@ private void testWriteSizeLimit(PegasusClientInterface client) {
e.getMessage().contains("Exceed the multi value length threshold = 1048576"));
}
}

@Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record
public void testScanRangeWithValueExpired() throws PException, InterruptedException {
String tableName = "temp";
String hashKey = "hashKey";
// generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9]
generateRecordsWithExpired(tableName, hashKey, 1000, 10);

PegasusTable table =
(PegasusTable) PegasusClientFactory.getSingletonClient().openTable(tableName);
// case A: scan all record
// case A1: scan all record: if persistent record count >= maxFetchCount, it must return
// maxFetchCount records
ScanRangeResult caseA1 =
table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 5, 0);
assertScanResult(0, 4, false, caseA1);
// case A2: scan all record: if persistent record count < maxFetchCount, it only return
// persistent count records
ScanRangeResult caseA2 =
table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 100, 0);
assertScanResult(0, 9, true, caseA2);

// case B: scan limit record by "startSortKey" and "":
// case B1: scan limit record by "expired_0" and "", if persistent record count >=
// maxFetchCount, it must return maxFetchCount records
ScanRangeResult caseB1 =
table.scanRange(
hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 5, 0);
assertScanResult(0, 4, false, caseB1);
// case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount,
// it only return valid records
ScanRangeResult caseB2 =
table.scanRange(
hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 50, 0);
assertScanResult(0, 9, true, caseB2);
// case B3: scan limit record by "persistent_5" and "", if following persistent record count <
// maxFetchCount, it only return valid records
ScanRangeResult caseB3 =
table.scanRange(
hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 50, 0);
assertScanResult(5, 9, true, caseB3);
// case B4: scan limit record by "persistent_5" and "", if following persistent record count >
// maxFetchCount, it only return valid records
ScanRangeResult caseB4 =
table.scanRange(
hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 3, 0);
assertScanResult(5, 7, false, caseB4);

// case C: scan limit record by "" and "stopSortKey":
// case C1: scan limit record by "" and "expired_7", if will return 0 record
ScanRangeResult caseC1 =
table.scanRange(
hashKey.getBytes(), "".getBytes(), "expired_7".getBytes(), new ScanOptions(), 3, 0);
Assert.assertTrue(caseC1.allFetched);
Assert.assertEquals(0, caseC1.results.size()); // among "" and "expired_7" has 0 valid record
// case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount,
// it only return valid record
ScanRangeResult caseC2 =
table.scanRange(
hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 10, 0);
assertScanResult(0, 6, true, caseC2);
// case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount,
// it only return valid record
ScanRangeResult caseC3 =
table.scanRange(
hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 2, 0);
assertScanResult(0, 1, false, caseC3);

// case D: use multiGetSortKeys, which actually equal with case A but no value
// case D1: maxFetchCount > 0, return maxFetchCount valid record
MultiGetSortKeysResult caseD1 = table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0);
Assert.assertFalse(caseD1.allFetched);
Assert.assertEquals(5, caseD1.keys.size());
for (int i = 0; i <= 4; i++) {
Assertions.assertEquals("persistent_" + i, new String(caseD1.keys.get(i)));
}
// case D1: maxFetchCount < 0, return all valid record
MultiGetSortKeysResult caseD2 = table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0);
Assert.assertTrue(caseD2.allFetched);
Assert.assertEquals(10, caseD2.keys.size());
for (int i = 0; i <= 9; i++) {
Assertions.assertEquals("persistent_" + i, new String(caseD2.keys.get(i)));
}
}

private void generateRecordsWithExpired(
String tableName, String hashKey, int expiredCount, int persistentCount)
throws PException, InterruptedException {
PegasusClientInterface client = PegasusClientFactory.getSingletonClient();
// assign prefix to make sure the expire record is stored front of persistent
String expiredSortKeyPrefix = "expired_";
String persistentSortKeyPrefix = "persistent_";
while (expiredCount-- > 0) {
client.set(
tableName,
hashKey.getBytes(),
(expiredSortKeyPrefix + expiredCount).getBytes(),
(expiredSortKeyPrefix + expiredCount + "_value").getBytes(),
1);
}
// sleep to make sure the record is expired
Thread.sleep(1000);
while (persistentCount-- > 0) {
client.set(
tableName,
hashKey.getBytes(),
(persistentSortKeyPrefix + persistentCount).getBytes(),
(persistentSortKeyPrefix + persistentCount + "_value").getBytes());
}
PegasusClientFactory.closeSingletonClient();
}

private void assertScanResult(
int startIndex, int stopIndex, boolean expectAllFetched, ScanRangeResult actuallyRes) {
Assertions.assertEquals(expectAllFetched, actuallyRes.allFetched);
Assertions.assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size());
for (int i = startIndex; i <= stopIndex; i++) {
Assertions.assertEquals(
"hashKey", new String(actuallyRes.results.get(i - startIndex).getLeft().getKey()));
Assertions.assertEquals(
"persistent_" + i,
new String(actuallyRes.results.get(i - startIndex).getLeft().getValue()));
Assertions.assertEquals(
"persistent_" + i + "_value",
new String(actuallyRes.results.get(i - startIndex).getRight()));
}
}
}