Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: handle scan iteration exceed limit and refactor code #96

Merged
merged 8 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,20 @@ public PegasusScanner(
long[] splitHash,
boolean needCheckHash) {
_table = table;
_split_hash = splitHash;
_split_gpid = splitGpid == null ? new gpid[0] : splitGpid;
_splitHash = splitHash;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
_splitGpid = splitGpid == null ? new gpid[0] : splitGpid;
_options = options;
_startKey = startKey;
_stopKey = stopKey;
_p = -1;
_context = CONTEXT_ID_COMPLETED;
_hash_p = _split_gpid.length;
_contextId = CONTEXT_ID_COMPLETED;
_splitCount = _splitGpid.length;
_kvs = new ArrayList<key_value>();
_promises = new LinkedList<DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>>>();
_rpcRunning = false;
_encounterError = false;
_needCheckHash = needCheckHash;
_incomplete = false;
}

public Pair<Pair<byte[], byte[]>, byte[]> next() throws PException {
Expand Down Expand Up @@ -96,17 +97,17 @@ public Future<Pair<Pair<byte[], byte[]>, byte[]>> asyncNext() {

@Override
public void close() {
if (_context >= CONTEXT_ID_VALID_MIN) {
if (_contextId >= CONTEXT_ID_VALID_MIN) {
try {
rrdb_clear_scanner_operator op =
new rrdb_clear_scanner_operator(_gpid, _table.getTableName(), _context, _hash);
new rrdb_clear_scanner_operator(_gpid, _table.getTableName(), _contextId, _hash);
_table.operate(op, 0);
} catch (Throwable e) {
// ignore
}
_context = CONTEXT_ID_COMPLETED;
_contextId = CONTEXT_ID_COMPLETED;
}
_hash_p = 0;
_splitCount = 0;
}

private void asyncStartScan() {
Expand Down Expand Up @@ -170,7 +171,7 @@ private void asyncNextBatch() {
return;
}
_rpcRunning = true;
scan_request request = new scan_request(_context);
scan_request request = new scan_request(_contextId);
rrdb_scan_operator op = new rrdb_scan_operator(_gpid, _table.getTableName(), request, _hash);
Table.ClientOPCallback callback =
new Table.ClientOPCallback() {
Expand Down Expand Up @@ -204,10 +205,15 @@ private void onRecvRpcResponse(error_code err, scan_response response) {
if (response.error == 0) { // ERR_OK
_kvs = response.kvs;
_p = -1;
_context = response.context_id;
_contextId = response.context_id;
} else if (response.error
== 1) { // rocksDB error kNotFound, that scan context has been removed
_context = CONTEXT_ID_NOT_EXIST;
_contextId = CONTEXT_ID_NOT_EXIST;
} else if (response.error == 7) { // rocksDB error kIncomplete

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the server with scan-limit meets with the client of the previous version? Will the client throw an exception when it accepts rocksdb::Status::kIncomplete or just continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current version, client will throw an exception when it accepts rocksdb::Status::kIncomplete immediately and can not get the result of the scanner. In this version, client will also throw an exception but will return scan result.

_kvs = response.kvs;
_p = -1;
_contextId = CONTEXT_ID_COMPLETED;
_incomplete = true;
} else { // rpc succeed, but operation encounter some error in server side
_encounterError = true;
_cause = new PException("rocksDB error: " + response.error);
Expand All @@ -225,25 +231,37 @@ private void asyncNextInternal() {
}
_promises.clear();
// we don't reset the flag, just abandon this scan operation
// _encounterError = false;
return;
}
while (!_promises.isEmpty()) {
while (++_p >= _kvs.size()) {
if (_context == CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_hash_p <= 0) {
if (_contextId == CONTEXT_ID_COMPLETED) {
// this scan operation got incomplete from server, abandon scan operation
if (_incomplete) {
for (DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>> p : _promises) {
logger.error(
"scan got incomplete error, " + "tableName({}), {}",
_table.getTableName(),
_gpid.toString());
p.setFailure(new PException("scan got incomplete error, retry later"));
}
_promises.clear();
return;
}

// reach the end of one partition, finish scan operation
if (_splitCount <= 0) {
for (DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>> p : _promises) {
p.setSuccess(null);
}
_promises.clear();
return;
} else {
_gpid = _split_gpid[--_hash_p];
_hash = _split_hash[_hash_p];
splitReset();
}
} else if (_context == CONTEXT_ID_NOT_EXIST) {

_gpid = _splitGpid[--_splitCount];
_hash = _splitHash[_splitCount];
splitReset();
} else if (_contextId == CONTEXT_ID_NOT_EXIST) {
// no valid context_id found
asyncStartScan();
return;
Expand All @@ -263,24 +281,24 @@ private void asyncNextInternal() {
private void splitReset() {
_kvs.clear();
_p = -1;
_context = CONTEXT_ID_NOT_EXIST;
_contextId = CONTEXT_ID_NOT_EXIST;
}

private Table _table;
private blob _startKey;
private blob _stopKey;
private ScanOptions _options;
private gpid[] _split_gpid;
private long[] _split_hash;
private int _hash_p;
private gpid[] _splitGpid;
private long[] _splitHash;
private int _splitCount;
hycdong marked this conversation as resolved.
Show resolved Hide resolved

private gpid _gpid;
private long _hash;

private List<key_value> _kvs;
private int _p;
hycdong marked this conversation as resolved.
Show resolved Hide resolved

private long _context;
private long _contextId;

private final Object _promisesLock = new Object();
private Deque<DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>>> _promises;
Expand All @@ -290,6 +308,8 @@ private void splitReset() {
Throwable _cause;

private boolean _needCheckHash;
// whether scan operation got incomplete error
private boolean _incomplete;

private static final Logger logger = org.slf4j.LoggerFactory.getLogger(PegasusScanner.class);
}
90 changes: 47 additions & 43 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1665,49 +1665,51 @@ public PegasusScannerInterface getScanner(
options.timeoutMillis = defaultTimeout;
}

ScanOptions o = new ScanOptions(options);
ScanOptions scanOptions = new ScanOptions(options);

// generate key range by start_sort_key and stop_sort_key
byte[] start = PegasusClient.generateKey(hashKey, startSortKey);
byte[] stop;
if (stopSortKey == null || stopSortKey.length == 0) {
stop = PegasusClient.generateNextBytes(hashKey);
o.stopInclusive = false;
scanOptions.stopInclusive = false;
} else {
stop = PegasusClient.generateKey(hashKey, stopSortKey);
}

// limit key range by prefix filter
if (o.sortKeyFilterType == FilterType.FT_MATCH_PREFIX
&& o.sortKeyFilterPattern != null
&& o.sortKeyFilterPattern.length > 0) {
byte[] prefix_start = PegasusClient.generateKey(hashKey, o.sortKeyFilterPattern);
if (scanOptions.sortKeyFilterType == FilterType.FT_MATCH_PREFIX
&& scanOptions.sortKeyFilterPattern != null
&& scanOptions.sortKeyFilterPattern.length > 0) {
byte[] prefix_start = PegasusClient.generateKey(hashKey, scanOptions.sortKeyFilterPattern);
if (PegasusClient.bytesCompare(prefix_start, start) > 0) {
start = prefix_start;
o.startInclusive = true;
scanOptions.startInclusive = true;
}
byte[] prefix_stop = PegasusClient.generateNextBytes(hashKey, o.sortKeyFilterPattern);
byte[] prefix_stop =
PegasusClient.generateNextBytes(hashKey, scanOptions.sortKeyFilterPattern);
if (PegasusClient.bytesCompare(prefix_stop, stop) <= 0) {
stop = prefix_stop;
o.stopInclusive = false;
scanOptions.stopInclusive = false;
}
}

// check if range is empty
int cmp = PegasusClient.bytesCompare(start, stop);

long[] hash;
gpid[] v;
if (cmp < 0 || cmp == 0 && o.startInclusive && o.stopInclusive) {
long[] hashArray;
gpid[] gpidArray;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (cmp < 0 || cmp == 0 && scanOptions.startInclusive && scanOptions.stopInclusive) {
long startHash = table.getHash(start);
hash = new long[] {startHash};
v = new gpid[] {table.getGpidByHash(startHash)};
hashArray = new long[] {startHash};
gpidArray = new gpid[] {table.getGpidByHash(startHash)};
} else {
hash = new long[] {0};
v = new gpid[0];
hashArray = new long[] {0};
gpidArray = new gpid[0];
}

return new PegasusScanner(table, v, o, new blob(start), new blob(stop), hash, false);
return new PegasusScanner(
table, gpidArray, scanOptions, new blob(start), new blob(stop), hashArray, false);
}

@Override
Expand All @@ -1720,33 +1722,35 @@ public List<PegasusScannerInterface> getUnorderedScanners(int maxSplitCount, Sca
options.timeoutMillis = defaultTimeout;
}

gpid[] all = table.getAllGpid();
int count = all.length;
int split = count < maxSplitCount ? count : maxSplitCount;
List<PegasusScannerInterface> ret = new ArrayList<PegasusScannerInterface>(split);

int size = count / split;
int more = count - size * split;

// use default value for other fields in scan_options
ScanOptions opt = new ScanOptions();
opt.timeoutMillis = options.timeoutMillis;
opt.batchSize = options.batchSize;
opt.noValue = options.noValue;
opt.sortKeyFilterType = options.sortKeyFilterType;
opt.sortKeyFilterPattern = options.sortKeyFilterPattern;
opt.hashKeyFilterPattern = options.hashKeyFilterPattern;
opt.hashKeyFilterType = options.hashKeyFilterType;
for (int i = 0; i < split; i++) {
int s = i < more ? size + 1 : size;
gpid[] v = new gpid[s];
long[] hash = new long[s];
for (int j = 0; j < s; j++) {
--count;
v[j] = all[count];
hash[j] = count;
gpid[] allGpid = table.getAllGpid();
int gpidCount = allGpid.length;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
int scannerCount = gpidCount < maxSplitCount ? gpidCount : maxSplitCount;
List<PegasusScannerInterface> ret = new ArrayList<PegasusScannerInterface>(scannerCount);

int averageSize = gpidCount / scannerCount;
int more = gpidCount % scannerCount;
hycdong marked this conversation as resolved.
Show resolved Hide resolved

ScanOptions scanOption = new ScanOptions(options);
scanOption.startInclusive = true;
scanOption.stopInclusive = false;

/*
* For example, if gpidCount = 16, maxSplitCount = 9
* then scannerCount = 9, averageSize = 1, more = 7
* It means that we should return nine scanners
* the first seventh scanners will contain two partitions' data
* and the remainder two scanners will contain only one partition's data
hycdong marked this conversation as resolved.
Show resolved Hide resolved
* */
for (int i = 0; i < scannerCount; i++) {
int size = i < more ? averageSize + 1 : averageSize;
gpid[] gpidArray = new gpid[size];
long[] hashArray = new long[size];
for (int j = 0; j < size; j++) {
--gpidCount;
gpidArray[j] = allGpid[gpidCount];
hashArray[j] = gpidCount;
}
PegasusScanner scanner = new PegasusScanner(table, v, opt, hash, true);
PegasusScanner scanner = new PegasusScanner(table, gpidArray, scanOption, hashArray, true);
ret.add(scanner);
}
return ret;
Expand Down