Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: handle scan iteration exceed limit and refactor code #96

Merged
merged 8 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ public PegasusScannerInterface getScanner(

@Override
public List<PegasusScannerInterface> getUnorderedScanners(
String tableName, int maxSplitCount, ScanOptions options) throws PException {
String tableName, int maxScannerCount, ScanOptions options) throws PException {
PegasusTable tb = getTable(tableName);
return tb.getUnorderedScanners(maxSplitCount, options);
return tb.getUnorderedScanners(maxScannerCount, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,11 @@ public PegasusScannerInterface getScanner(
* Get Scanners for all data in database
*
* @param tableName TableHandler name
* @param maxSplitCount how many scanner expected
* @param maxScannerCount how many scanner expected
* @param options scan options like batchSize
* @return scanners, count of which would be no more than maxSplitCount
* @return scanners, count of which would be no more than maxScannerCount
* @throws PException throws exception if any error occurs.
*/
public List<PegasusScannerInterface> getUnorderedScanners(
String tableName, int maxSplitCount, ScanOptions options) throws PException;
String tableName, int maxScannerCount, ScanOptions options) throws PException;
}
97 changes: 61 additions & 36 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,39 @@ public class PegasusScanner implements PegasusScannerInterface {
private static final int CONTEXT_ID_NOT_EXIST = -2;

public PegasusScanner(
Table table, gpid[] splitGpid, ScanOptions options, long[] splitHash, boolean needCheckHash) {
this(table, splitGpid, options, min, max, splitHash, needCheckHash);
Table table,
gpid[] partitions,
ScanOptions options,
long[] partitionHashes,
boolean needCheckHash) {
this(table, partitions, options, min, max, partitionHashes, needCheckHash);
options.startInclusive = true;
options.stopInclusive = false;
}

public PegasusScanner(
Table table,
gpid[] splitGpid,
gpid[] partitions,
ScanOptions options,
blob startKey,
blob stopKey,
long[] splitHash,
long[] partitionHashes,
boolean needCheckHash) {
_table = table;
_split_hash = splitHash;
_split_gpid = splitGpid == null ? new gpid[0] : splitGpid;
_partitionHashes = partitionHashes;
_partitions = partitions == null ? new gpid[0] : partitions;
_options = options;
_startKey = startKey;
_stopKey = stopKey;
_p = -1;
_context = CONTEXT_ID_COMPLETED;
_hash_p = _split_gpid.length;
_readKvIter = -1;
_contextId = CONTEXT_ID_COMPLETED;
_partitionIter = _partitions.length;
_kvs = new ArrayList<key_value>();
_promises = new LinkedList<DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>>>();
_rpcRunning = false;
_encounterError = false;
_needCheckHash = needCheckHash;
_incomplete = false;
}

public Pair<Pair<byte[], byte[]>, byte[]> next() throws PException {
Expand Down Expand Up @@ -96,17 +101,17 @@ public Future<Pair<Pair<byte[], byte[]>, byte[]>> asyncNext() {

@Override
public void close() {
if (_context >= CONTEXT_ID_VALID_MIN) {
if (_contextId >= CONTEXT_ID_VALID_MIN) {
try {
rrdb_clear_scanner_operator op =
new rrdb_clear_scanner_operator(_gpid, _table.getTableName(), _context, _hash);
new rrdb_clear_scanner_operator(_gpid, _table.getTableName(), _contextId, _hash);
_table.operate(op, 0);
} catch (Throwable e) {
// ignore
}
_context = CONTEXT_ID_COMPLETED;
_contextId = CONTEXT_ID_COMPLETED;
}
_hash_p = 0;
_partitionIter = 0;
}

private void asyncStartScan() {
Expand Down Expand Up @@ -170,7 +175,7 @@ private void asyncNextBatch() {
return;
}
_rpcRunning = true;
scan_request request = new scan_request(_context);
scan_request request = new scan_request(_contextId);
rrdb_scan_operator op = new rrdb_scan_operator(_gpid, _table.getTableName(), request, _hash);
Table.ClientOPCallback callback =
new Table.ClientOPCallback() {
Expand Down Expand Up @@ -203,11 +208,16 @@ private void onRecvRpcResponse(error_code err, scan_response response) {
if (err.errno == error_code.error_types.ERR_OK) {
if (response.error == 0) { // ERR_OK
_kvs = response.kvs;
_p = -1;
_context = response.context_id;
_readKvIter = -1;
_contextId = response.context_id;
} else if (response.error
== 1) { // rocksDB error kNotFound, that scan context has been removed
_context = CONTEXT_ID_NOT_EXIST;
_contextId = CONTEXT_ID_NOT_EXIST;
} else if (response.error == 7) { // rocksDB error kIncomplete

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the server with scan-limit meets with the client of the previous version? Will the client throw an exception when it accepts rocksdb::Status::kIncomplete or just continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current version, client will throw an exception when it accepts rocksdb::Status::kIncomplete immediately and can not get the result of the scanner. In this version, client will also throw an exception but will return scan result.

_kvs = response.kvs;
_readKvIter = -1;
_contextId = CONTEXT_ID_COMPLETED;
_incomplete = true;
} else { // rpc succeed, but operation encounter some error in server side
_encounterError = true;
_cause = new PException("rocksDB error: " + response.error);
Expand All @@ -225,25 +235,37 @@ private void asyncNextInternal() {
}
_promises.clear();
// we don't reset the flag, just abandon this scan operation
// _encounterError = false;
return;
}
while (!_promises.isEmpty()) {
while (++_p >= _kvs.size()) {
if (_context == CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_hash_p <= 0) {
while (++_readKvIter >= _kvs.size()) {
if (_contextId == CONTEXT_ID_COMPLETED) {
// this scan operation got incomplete from server, abandon scan operation
if (_incomplete) {
for (DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>> p : _promises) {
logger.error(
"scan got incomplete error, " + "tableName({}), {}",
_table.getTableName(),
_gpid.toString());
p.setFailure(new PException("scan got incomplete error, retry later"));
}
_promises.clear();
return;
}

// reach the end of one partition, finish scan operation
if (_partitionIter <= 0) {
for (DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>> p : _promises) {
p.setSuccess(null);
}
_promises.clear();
return;
} else {
_gpid = _split_gpid[--_hash_p];
_hash = _split_hash[_hash_p];
splitReset();
}
} else if (_context == CONTEXT_ID_NOT_EXIST) {

_gpid = _partitions[--_partitionIter];
_hash = _partitionHashes[_partitionIter];
contextReset();
} else if (_contextId == CONTEXT_ID_NOT_EXIST) {
// no valid context_id found
asyncStartScan();
return;
Expand All @@ -255,32 +277,33 @@ private void asyncNextInternal() {
DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>> p = _promises.getFirst();
p.setSuccess(
new ImmutablePair<Pair<byte[], byte[]>, byte[]>(
PegasusClient.restoreKey(_kvs.get(_p).key.data), _kvs.get(_p).value.data));
PegasusClient.restoreKey(_kvs.get(_readKvIter).key.data),
_kvs.get(_readKvIter).value.data));
_promises.removeFirst();
}
}

private void splitReset() {
private void contextReset() {
_kvs.clear();
_p = -1;
_context = CONTEXT_ID_NOT_EXIST;
_readKvIter = -1;
_contextId = CONTEXT_ID_NOT_EXIST;
}

private Table _table;
private blob _startKey;
private blob _stopKey;
private ScanOptions _options;
private gpid[] _split_gpid;
private long[] _split_hash;
private int _hash_p;
private gpid[] _partitions;
private long[] _partitionHashes;
private int _partitionIter;

private gpid _gpid;
private long _hash;

private List<key_value> _kvs;
private int _p;
private int _readKvIter;

private long _context;
private long _contextId;

private final Object _promisesLock = new Object();
private Deque<DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>>> _promises;
Expand All @@ -290,6 +313,8 @@ private void splitReset() {
Throwable _cause;

private boolean _needCheckHash;
// whether scan operation got incomplete error
private boolean _incomplete;

private static final Logger logger = org.slf4j.LoggerFactory.getLogger(PegasusScanner.class);
}
96 changes: 50 additions & 46 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1696,88 +1696,92 @@ public PegasusScannerInterface getScanner(
options.timeoutMillis = defaultTimeout;
}

ScanOptions o = new ScanOptions(options);
ScanOptions scanOptions = new ScanOptions(options);

// generate key range by start_sort_key and stop_sort_key
byte[] start = PegasusClient.generateKey(hashKey, startSortKey);
byte[] stop;
if (stopSortKey == null || stopSortKey.length == 0) {
stop = PegasusClient.generateNextBytes(hashKey);
o.stopInclusive = false;
scanOptions.stopInclusive = false;
} else {
stop = PegasusClient.generateKey(hashKey, stopSortKey);
}

// limit key range by prefix filter
if (o.sortKeyFilterType == FilterType.FT_MATCH_PREFIX
&& o.sortKeyFilterPattern != null
&& o.sortKeyFilterPattern.length > 0) {
byte[] prefix_start = PegasusClient.generateKey(hashKey, o.sortKeyFilterPattern);
if (scanOptions.sortKeyFilterType == FilterType.FT_MATCH_PREFIX
&& scanOptions.sortKeyFilterPattern != null
&& scanOptions.sortKeyFilterPattern.length > 0) {
byte[] prefix_start = PegasusClient.generateKey(hashKey, scanOptions.sortKeyFilterPattern);
if (PegasusClient.bytesCompare(prefix_start, start) > 0) {
start = prefix_start;
o.startInclusive = true;
scanOptions.startInclusive = true;
}
byte[] prefix_stop = PegasusClient.generateNextBytes(hashKey, o.sortKeyFilterPattern);
byte[] prefix_stop =
PegasusClient.generateNextBytes(hashKey, scanOptions.sortKeyFilterPattern);
if (PegasusClient.bytesCompare(prefix_stop, stop) <= 0) {
stop = prefix_stop;
o.stopInclusive = false;
scanOptions.stopInclusive = false;
}
}

// check if range is empty
int cmp = PegasusClient.bytesCompare(start, stop);

long[] hash;
gpid[] v;
if (cmp < 0 || cmp == 0 && o.startInclusive && o.stopInclusive) {
long[] partitionHashes;
gpid[] partitions;
if (cmp < 0 || cmp == 0 && scanOptions.startInclusive && scanOptions.stopInclusive) {
long startHash = table.getHash(start);
hash = new long[] {startHash};
v = new gpid[] {table.getGpidByHash(startHash)};
partitionHashes = new long[] {startHash};
partitions = new gpid[] {table.getGpidByHash(startHash)};
} else {
hash = new long[] {0};
v = new gpid[0];
partitionHashes = new long[] {0};
partitions = new gpid[0];
}

return new PegasusScanner(table, v, o, new blob(start), new blob(stop), hash, false);
return new PegasusScanner(
table, partitions, scanOptions, new blob(start), new blob(stop), partitionHashes, false);
}

@Override
public List<PegasusScannerInterface> getUnorderedScanners(int maxSplitCount, ScanOptions options)
throws PException {
if (maxSplitCount <= 0) {
public List<PegasusScannerInterface> getUnorderedScanners(
int maxScannerCount, ScanOptions options) throws PException {
if (maxScannerCount <= 0) {
throw new PException("Invalid parameter: the max count of splits must be greater than 0");
}
if (options.timeoutMillis <= 0) {
options.timeoutMillis = defaultTimeout;
}

gpid[] all = table.getAllGpid();
int count = all.length;
int split = count < maxSplitCount ? count : maxSplitCount;
List<PegasusScannerInterface> ret = new ArrayList<PegasusScannerInterface>(split);

int size = count / split;
int more = count - size * split;

// use default value for other fields in scan_options
ScanOptions opt = new ScanOptions();
opt.timeoutMillis = options.timeoutMillis;
opt.batchSize = options.batchSize;
opt.noValue = options.noValue;
opt.sortKeyFilterType = options.sortKeyFilterType;
opt.sortKeyFilterPattern = options.sortKeyFilterPattern;
opt.hashKeyFilterPattern = options.hashKeyFilterPattern;
opt.hashKeyFilterType = options.hashKeyFilterType;
for (int i = 0; i < split; i++) {
int s = i < more ? size + 1 : size;
gpid[] v = new gpid[s];
long[] hash = new long[s];
for (int j = 0; j < s; j++) {
--count;
v[j] = all[count];
hash[j] = count;
gpid[] allPartitions = table.getAllGpid();
int partitionCount = allPartitions.length;
int scannerCount = partitionCount < maxScannerCount ? partitionCount : maxScannerCount;
List<PegasusScannerInterface> ret = new ArrayList<PegasusScannerInterface>(scannerCount);

int averageSize = partitionCount / scannerCount;
int remainder = partitionCount % scannerCount;

ScanOptions scanOption = new ScanOptions(options);
scanOption.startInclusive = true;
scanOption.stopInclusive = false;

/*
* For example, if gpidCount = 16, maxScannerCount = 9
* then scannerCount = 9, averageSize = 1, more = 7
* It means that we should return nine scanners
* the first seven scanners will serve two partitions' data
* and the remaining two scanners will serve only one partition's data
* */
for (int i = 0; i < scannerCount; i++) {
int size = i < remainder ? averageSize + 1 : averageSize;
gpid[] gpidArray = new gpid[size];
long[] hashArray = new long[size];
for (int j = 0; j < size; j++) {
--partitionCount;
gpidArray[j] = allPartitions[partitionCount];
hashArray[j] = partitionCount;
}
PegasusScanner scanner = new PegasusScanner(table, v, opt, hash, true);
PegasusScanner scanner = new PegasusScanner(table, gpidArray, scanOption, hashArray, true);
ret.add(scanner);
}
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,11 +1122,11 @@ public PegasusScannerInterface getScanner(
/**
* Get Scanners for all data in database
*
* @param maxSplitCount how many scanner expected
* @param maxScannerCount how many scanner expected
* @param options scan options like batchSize
* @return scanners, count of which would be no more than maxSplitCount
* @return scanners, count of which would be no more than maxScannerCount
* @throws PException throw exception if any error occurs.
*/
public List<PegasusScannerInterface> getUnorderedScanners(int maxSplitCount, ScanOptions options)
throws PException;
public List<PegasusScannerInterface> getUnorderedScanners(
int maxScannerCount, ScanOptions options) throws PException;
}