From 400429c497161fd94be305c65c987b9e130057ee Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 16:27:39 +0800 Subject: [PATCH 01/15] init --- .../infra/pegasus/client/PegasusClient.java | 8 +- .../infra/pegasus/client/PegasusTable.java | 85 +++++++++++++----- .../infra/pegasus/client/TestBasic.java | 87 +++++++++++++++++++ 3 files changed, 154 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index f80dc8b6..d845f8be 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -287,9 +287,7 @@ public boolean multiGet( PegasusTable tb = getTable(tableName); PegasusTableInterface.MultiGetResult res = tb.multiGet(hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, 0); - for (Pair kv : res.values) { - values.add(kv); - } + values.addAll(res.values); return res.allFetched; } @@ -333,9 +331,7 @@ public boolean multiGetSortKeys( PegasusTable table = getTable(tableName); PegasusTableInterface.MultiGetSortKeysResult result = table.multiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, 0); - for (byte[] key : result.keys) { - sortKeys.add(key); - } + sortKeys.addAll(result.keys); return result.allFetched; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 15f64e43..ecef037d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1067,17 +1067,62 @@ public MultiGetResult multiGet( int timeout /*ms*/) throws PException { if (timeout <= 0) timeout = defaultTimeout; + + String hashKeyStr = hashKey == null ? "" : new String(hashKey); + String startSortKeyStr = startSortKey == null ? "" : new String(startSortKey); + String stopSortKeyStr = stopSortKey == null ? "" : new String(stopSortKey); + + long startTime = System.currentTimeMillis(); + long lastCheckTime = startTime; + long deadlineTime = startTime + timeout; + + ScanOptions scanOptions = new ScanOptions(); + scanOptions.noValue = options.noValue; + scanOptions.startInclusive = options.startInclusive; + scanOptions.stopInclusive = options.stopInclusive; + scanOptions.sortKeyFilterType = options.sortKeyFilterType; + scanOptions.sortKeyFilterPattern = options.sortKeyFilterPattern; + + PegasusScannerInterface pegasusScanner = + getScanner(hashKey, startSortKey, stopSortKey, scanOptions); + lastCheckTime = System.currentTimeMillis(); + MultiGetResult multiGetResult = new MultiGetResult(); + multiGetResult.allFetched = false; + multiGetResult.values = new ArrayList<>(); try { - return asyncMultiGet( - hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, timeout) - .get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw PException.threadInterrupted(table.getTableName(), e); + if (lastCheckTime >= deadlineTime) { + throw new TimeoutException( + String.format( + "getting pegasusScanner takes too long time when multiGet hashKey=%s, startSortKey=%s, stopSortKey=%s, timeUsed=%s", + hashKeyStr, startSortKeyStr, stopSortKeyStr, lastCheckTime - startTime)); + } + + int remainingTime; + Pair, byte[]> pairs; + while ((pairs = pegasusScanner.next()) != null + && multiGetResult.values.size() < maxFetchCount) { + multiGetResult.values.add(Pair.of(pairs.getLeft().getValue(), pairs.getValue())); + lastCheckTime = System.currentTimeMillis(); + remainingTime = (int) (deadlineTime - lastCheckTime); + if (remainingTime <= 0) { + throw new TimeoutException( + String.format( + "getting pegasusScanner takes too long time when multiGet hashKey=%s, " + + "startSortKey=%s, stopSortKey=%s, timeUsed=%s, fetchCount=%d", + hashKeyStr, + startSortKeyStr, + stopSortKeyStr, + lastCheckTime - startTime, + multiGetResult.values.size())); + } + } + + if (pegasusScanner.next() == null) { + multiGetResult.allFetched = true; + } + return multiGetResult; } catch (TimeoutException e) { - throw PException.timeout( - metaList, table.getTableName(), new Request(hashKey, maxFetchCount), timeout, e); - } catch (ExecutionException e) { - throw new PException(e); + throw new PException(new ReplicationException(error_code.error_types.ERR_TIMEOUT)); } } @@ -1173,17 +1218,17 @@ public int batchMultiGet2( public MultiGetSortKeysResult multiGetSortKeys( byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; - try { - return asyncMultiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, timeout) - .get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw PException.threadInterrupted(table.getTableName(), e); - } catch (TimeoutException e) { - throw PException.timeout( - metaList, table.getTableName(), new Request(hashKey, maxFetchCount), timeout, e); - } catch (ExecutionException e) { - throw new PException(e); - } + MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult(); + sortKeysResult.keys = new ArrayList<>(); + MultiGetOptions options = new MultiGetOptions(); + options.noValue = true; + MultiGetResult result = + multiGet(hashKey, null, null, options, maxFetchCount, maxFetchSize, timeout); + for (Pair value : result.values) { + sortKeysResult.keys.add(value.getKey()); + } + sortKeysResult.allFetched = result.allFetched; + return sortKeysResult; } @Override diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 220060ec..d2d61d98 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2629,4 +2629,91 @@ private void testWriteSizeLimit(PegasusClientInterface client) { e.getMessage().contains("Exceed the multi value length threshold = 1048576")); } } + + // make sure return "maxFetchCount" if has "maxFetchCount" record though some record expired + @Test + public void testMultiGetWhenValueExpired() throws PException, InterruptedException { + String tableName = "temp"; + String hashKey = "hashKey"; + generateRecordsWithExpired(tableName, hashKey, 1000, 10); + + PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); + List> values = new ArrayList<>(); + // case 1: scan all record: if persistent record count >= maxFetchCount, it must return + // maxFetchCount records + boolean case1 = + client.multiGet( + tableName, hashKey.getBytes(), null, null, new MultiGetOptions(), 5, -1, values); + Assert.assertFalse(case1); + Assert.assertEquals(5, values.size()); + Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); + values.clear(); + + // case 2: scan all record: if persistent record count < maxFetchCount, it only return + // persistent count records + boolean case2 = + client.multiGet( + tableName, hashKey.getBytes(), null, null, new MultiGetOptions(), 100, -1, values); + Assert.assertTrue(case2); + Assert.assertEquals(10, values.size()); + Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_9", new String(values.get(9).getKey())); + values.clear(); + + // case 3: scan limit record by "startSortKey" and "stopSortKey": it only query from + // "startSortKey" and "stopSortKey", though it may be expired + boolean case3 = + client.multiGet( + tableName, + hashKey.getBytes(), + "expired_0".getBytes(), + "persistent_5".getBytes(), + new MultiGetOptions(), + 50, + -1, + values); + Assert.assertTrue(case3); + Assert.assertEquals( + 5, values.size()); // among "expired_0" and "persistent_4" only has 5 valid record + Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); + values.clear(); + + // case 4: use multiGetSortKeys, which actually equal with case 1 + List sortKeys = new ArrayList<>(); + boolean case4 = client.multiGetSortKeys(tableName, hashKey.getBytes(), 10, -1, sortKeys); + Assert.assertTrue(case4); + Assert.assertEquals(10, sortKeys.size()); + Assert.assertEquals("persistent_0", new String(sortKeys.get(0))); + Assert.assertEquals("persistent_9", new String(sortKeys.get(9))); + values.clear(); + } + + public void generateRecordsWithExpired( + String tableName, String hashKey, int expiredCount, int persistentCount) + throws PException, InterruptedException { + PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); + // assign prefix to make sure the expire record is stored front of persistent + String expiredSortKeyPrefix = "expired_"; + String persistentSortKeyPrefix = "persistent_"; + while (expiredCount-- > 0) { + client.set( + tableName, + hashKey.getBytes(), + (expiredSortKeyPrefix + expiredCount).getBytes(), + tableName.getBytes(), + 1); + } + // sleep to make sure the record is expired + Thread.sleep(1000); + while (persistentCount-- > 0) { + client.set( + tableName, + hashKey.getBytes(), + (persistentSortKeyPrefix + persistentCount).getBytes(), + tableName.getBytes()); + } + PegasusClientFactory.closeSingletonClient(); + } } From 57c152ab97c97fa3f3abb1ef2916b99bd32de9c8 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 16:34:24 +0800 Subject: [PATCH 02/15] init --- src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index ecef037d..ccc3bd5a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1085,11 +1085,11 @@ public MultiGetResult multiGet( PegasusScannerInterface pegasusScanner = getScanner(hashKey, startSortKey, stopSortKey, scanOptions); - lastCheckTime = System.currentTimeMillis(); MultiGetResult multiGetResult = new MultiGetResult(); multiGetResult.allFetched = false; multiGetResult.values = new ArrayList<>(); try { + lastCheckTime = System.currentTimeMillis(); if (lastCheckTime >= deadlineTime) { throw new TimeoutException( String.format( From 968a88b844b5b9b2c6754bc66979c6d6220ccfdd Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 18:54:17 +0800 Subject: [PATCH 03/15] init --- .../infra/pegasus/client/TestBasic.java | 153 ++++++++++++++++-- 1 file changed, 136 insertions(+), 17 deletions(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index d2d61d98..6a2af70e 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2635,55 +2635,174 @@ private void testWriteSizeLimit(PegasusClientInterface client) { public void testMultiGetWhenValueExpired() throws PException, InterruptedException { String tableName = "temp"; String hashKey = "hashKey"; + // generate records whose values are all "temp" generateRecordsWithExpired(tableName, hashKey, 1000, 10); PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); + // case A: scan all record List> values = new ArrayList<>(); - // case 1: scan all record: if persistent record count >= maxFetchCount, it must return + // case A1: scan all record: if persistent record count >= maxFetchCount, it must return // maxFetchCount records - boolean case1 = + boolean caseA1 = client.multiGet( tableName, hashKey.getBytes(), null, null, new MultiGetOptions(), 5, -1, values); - Assert.assertFalse(case1); + Assert.assertFalse(caseA1); Assert.assertEquals(5, values.size()); Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); + Assert.assertEquals("persistent_4_value", new String(values.get(4).getValue())); values.clear(); - // case 2: scan all record: if persistent record count < maxFetchCount, it only return + // case A2: scan all record: if persistent record count < maxFetchCount, it only return // persistent count records - boolean case2 = + boolean caseA2 = client.multiGet( tableName, hashKey.getBytes(), null, null, new MultiGetOptions(), 100, -1, values); - Assert.assertTrue(case2); + Assert.assertTrue(caseA2); Assert.assertEquals(10, values.size()); Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); Assert.assertEquals("persistent_9", new String(values.get(9).getKey())); + Assert.assertEquals("persistent_9_value", new String(values.get(9).getValue())); values.clear(); - // case 3: scan limit record by "startSortKey" and "stopSortKey": it only query from - // "startSortKey" and "stopSortKey", though it may be expired - boolean case3 = + // case B: scan limit record by "startSortKey" and "": + // case B1: scan limit record by "expired_0" and "", if persistent record count >= + // maxFetchCount, it must return maxFetchCount records + boolean caseB1 = client.multiGet( tableName, hashKey.getBytes(), "expired_0".getBytes(), - "persistent_5".getBytes(), + "".getBytes(), new MultiGetOptions(), - 50, + 5, -1, values); - Assert.assertTrue(case3); + Assert.assertFalse(caseB1); Assert.assertEquals( 5, values.size()); // among "expired_0" and "persistent_4" only has 5 valid record Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); + Assert.assertEquals("persistent_4_value", new String(values.get(4).getValue())); + values.clear(); + // case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount, + // it only return valid records + boolean caseB2 = + client.multiGet( + tableName, + hashKey.getBytes(), + "expired_0".getBytes(), + "".getBytes(), + new MultiGetOptions(), + 50, + -1, + values); + Assert.assertTrue(caseB2); + Assert.assertEquals(10, values.size()); // among "expired_0" and "" only has 10 valid record + Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); + Assert.assertEquals("persistent_9", new String(values.get(9).getKey())); + Assert.assertEquals("persistent_9_value", new String(values.get(9).getValue())); + values.clear(); + // case B3: scan limit record by "persistent_5" and "", if following persistent record count < + // maxFetchCount, it only return valid records + boolean caseB3 = + client.multiGet( + tableName, + hashKey.getBytes(), + "persistent_5".getBytes(), + "".getBytes(), + new MultiGetOptions(), + 50, + -1, + values); + Assert.assertTrue(caseB3); + Assert.assertEquals(5, values.size()); // among "persistent_5" and "" only has 5 valid record + Assert.assertEquals("persistent_5", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_5_value", new String(values.get(0).getValue())); + Assert.assertEquals("persistent_9", new String(values.get(4).getKey())); + Assert.assertEquals("persistent_9_value", new String(values.get(4).getValue())); + values.clear(); + // case B4: scan limit record by "persistent_5" and "", if following persistent record count > + // maxFetchCount, it only return valid records + boolean caseB4 = + client.multiGet( + tableName, + hashKey.getBytes(), + "persistent_5".getBytes(), + "".getBytes(), + new MultiGetOptions(), + 3, + -1, + values); + Assert.assertFalse(caseB4); + Assert.assertEquals(3, values.size()); // among "persistent_5" and "" only has 5 valid record + Assert.assertEquals("persistent_5", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_5_value", new String(values.get(0).getValue())); + Assert.assertEquals("persistent_7", new String(values.get(2).getKey())); + Assert.assertEquals("persistent_7_value", new String(values.get(2).getValue())); + values.clear(); + + // case C: scan limit record by "" and "stopSortKey": + // case C1: scan limit record by "" and "expired_7", if will return 0 record + boolean caseC1 = + client.multiGet( + tableName, + hashKey.getBytes(), + "".getBytes(), + "expired_7".getBytes(), + new MultiGetOptions(), + 3, + -1, + values); + Assert.assertTrue(caseC1); + Assert.assertEquals(0, values.size()); // among "" and "expired_7" has 0 valid record + // case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount, + // it only return valid record + boolean caseC2 = + client.multiGet( + tableName, + hashKey.getBytes(), + "".getBytes(), + "persistent_7".getBytes(), + new MultiGetOptions(), + 10, + -1, + values); + Assert.assertTrue(caseC2); + Assert.assertEquals(7, values.size()); // among "" and "persistent_7" only has 7 valid record + Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); + Assert.assertEquals("persistent_6", new String(values.get(6).getKey())); + Assert.assertEquals("persistent_6_value", new String(values.get(6).getValue())); + values.clear(); + // case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount, + // it only return valid record + boolean caseC3 = + client.multiGet( + tableName, + hashKey.getBytes(), + "".getBytes(), + "persistent_7".getBytes(), + new MultiGetOptions(), + 2, + -1, + values); + Assert.assertFalse(caseC3); + Assert.assertEquals(2, values.size()); + Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); + Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); + Assert.assertEquals("persistent_1", new String(values.get(1).getKey())); + Assert.assertEquals("persistent_1_value", new String(values.get(1).getValue())); values.clear(); - // case 4: use multiGetSortKeys, which actually equal with case 1 + // case D: use multiGetSortKeys, which actually equal with case 1 but no value List sortKeys = new ArrayList<>(); - boolean case4 = client.multiGetSortKeys(tableName, hashKey.getBytes(), 10, -1, sortKeys); - Assert.assertTrue(case4); + boolean caseD = client.multiGetSortKeys(tableName, hashKey.getBytes(), 10, -1, sortKeys); + Assert.assertTrue(caseD); Assert.assertEquals(10, sortKeys.size()); Assert.assertEquals("persistent_0", new String(sortKeys.get(0))); Assert.assertEquals("persistent_9", new String(sortKeys.get(9))); @@ -2702,7 +2821,7 @@ public void generateRecordsWithExpired( tableName, hashKey.getBytes(), (expiredSortKeyPrefix + expiredCount).getBytes(), - tableName.getBytes(), + (expiredSortKeyPrefix + expiredCount + "_value").getBytes(), 1); } // sleep to make sure the record is expired @@ -2712,7 +2831,7 @@ public void generateRecordsWithExpired( tableName, hashKey.getBytes(), (persistentSortKeyPrefix + persistentCount).getBytes(), - tableName.getBytes()); + (persistentSortKeyPrefix + persistentCount + "_value").getBytes()); } PegasusClientFactory.closeSingletonClient(); } From b18ff622eed77fcf8cc99f1cdb5fada15766778e Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 18:58:53 +0800 Subject: [PATCH 04/15] init --- src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index ccc3bd5a..16221456 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1101,7 +1101,6 @@ public MultiGetResult multiGet( Pair, byte[]> pairs; while ((pairs = pegasusScanner.next()) != null && multiGetResult.values.size() < maxFetchCount) { - multiGetResult.values.add(Pair.of(pairs.getLeft().getValue(), pairs.getValue())); lastCheckTime = System.currentTimeMillis(); remainingTime = (int) (deadlineTime - lastCheckTime); if (remainingTime <= 0) { @@ -1115,6 +1114,7 @@ public MultiGetResult multiGet( lastCheckTime - startTime, multiGetResult.values.size())); } + multiGetResult.values.add(Pair.of(pairs.getLeft().getValue(), pairs.getValue())); } if (pegasusScanner.next() == null) { From e787bdcc874234f71633337b7c5e3797045ff056 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 19:00:50 +0800 Subject: [PATCH 05/15] init --- .../xiaomi/infra/pegasus/client/PegasusTable.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 16221456..7dc8da46 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1073,7 +1073,7 @@ public MultiGetResult multiGet( String stopSortKeyStr = stopSortKey == null ? "" : new String(stopSortKey); long startTime = System.currentTimeMillis(); - long lastCheckTime = startTime; + long currentCheckTime = startTime; long deadlineTime = startTime + timeout; ScanOptions scanOptions = new ScanOptions(); @@ -1089,20 +1089,20 @@ public MultiGetResult multiGet( multiGetResult.allFetched = false; multiGetResult.values = new ArrayList<>(); try { - lastCheckTime = System.currentTimeMillis(); - if (lastCheckTime >= deadlineTime) { + currentCheckTime = System.currentTimeMillis(); + if (currentCheckTime >= deadlineTime) { throw new TimeoutException( String.format( "getting pegasusScanner takes too long time when multiGet hashKey=%s, startSortKey=%s, stopSortKey=%s, timeUsed=%s", - hashKeyStr, startSortKeyStr, stopSortKeyStr, lastCheckTime - startTime)); + hashKeyStr, startSortKeyStr, stopSortKeyStr, currentCheckTime - startTime)); } int remainingTime; Pair, byte[]> pairs; while ((pairs = pegasusScanner.next()) != null && multiGetResult.values.size() < maxFetchCount) { - lastCheckTime = System.currentTimeMillis(); - remainingTime = (int) (deadlineTime - lastCheckTime); + currentCheckTime = System.currentTimeMillis(); + remainingTime = (int) (deadlineTime - currentCheckTime); if (remainingTime <= 0) { throw new TimeoutException( String.format( @@ -1111,7 +1111,7 @@ public MultiGetResult multiGet( hashKeyStr, startSortKeyStr, stopSortKeyStr, - lastCheckTime - startTime, + currentCheckTime - startTime, multiGetResult.values.size())); } multiGetResult.values.add(Pair.of(pairs.getLeft().getValue(), pairs.getValue())); From a415f5524e277dd02bab4a131604ae66e0e0ad0b Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 19:04:01 +0800 Subject: [PATCH 06/15] init --- src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 6a2af70e..231d9710 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2635,12 +2635,12 @@ private void testWriteSizeLimit(PegasusClientInterface client) { public void testMultiGetWhenValueExpired() throws PException, InterruptedException { String tableName = "temp"; String hashKey = "hashKey"; - // generate records whose values are all "temp" + // generate records: sortKeys=[expired_0....expired_9999,persistent_0...persistent_9] generateRecordsWithExpired(tableName, hashKey, 1000, 10); PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); - // case A: scan all record List> values = new ArrayList<>(); + // case A: scan all record // case A1: scan all record: if persistent record count >= maxFetchCount, it must return // maxFetchCount records boolean caseA1 = From ba3da50b0bf8db0ec174b6ee0b9bf41fb9114009 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 19:07:14 +0800 Subject: [PATCH 07/15] init --- src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 231d9710..e5402b21 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2635,7 +2635,7 @@ private void testWriteSizeLimit(PegasusClientInterface client) { public void testMultiGetWhenValueExpired() throws PException, InterruptedException { String tableName = "temp"; String hashKey = "hashKey"; - // generate records: sortKeys=[expired_0....expired_9999,persistent_0...persistent_9] + // generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9] generateRecordsWithExpired(tableName, hashKey, 1000, 10); PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); From aaee971912a19653d73d3751c40b99995c02e122 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 19:09:29 +0800 Subject: [PATCH 08/15] init --- src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index e5402b21..311a868b 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2630,8 +2630,7 @@ private void testWriteSizeLimit(PegasusClientInterface client) { } } - // make sure return "maxFetchCount" if has "maxFetchCount" record though some record expired - @Test + @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record public void testMultiGetWhenValueExpired() throws PException, InterruptedException { String tableName = "temp"; String hashKey = "hashKey"; @@ -2653,7 +2652,6 @@ public void testMultiGetWhenValueExpired() throws PException, InterruptedExcepti Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); Assert.assertEquals("persistent_4_value", new String(values.get(4).getValue())); values.clear(); - // case A2: scan all record: if persistent record count < maxFetchCount, it only return // persistent count records boolean caseA2 = From 2aa7c13a106d736e0acab193f12c7fbd50a4eabe Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 25 Jan 2021 19:11:13 +0800 Subject: [PATCH 09/15] init --- src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 311a868b..5bba1aa0 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -2797,7 +2797,7 @@ public void testMultiGetWhenValueExpired() throws PException, InterruptedExcepti Assert.assertEquals("persistent_1_value", new String(values.get(1).getValue())); values.clear(); - // case D: use multiGetSortKeys, which actually equal with case 1 but no value + // case D: use multiGetSortKeys, which actually equal with case A but no value List sortKeys = new ArrayList<>(); boolean caseD = client.multiGetSortKeys(tableName, hashKey.getBytes(), 10, -1, sortKeys); Assert.assertTrue(caseD); From 249bdf91150eb4a3ac1d4794a60a561a23690c42 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 26 Jan 2021 14:40:05 +0800 Subject: [PATCH 10/15] init --- .../infra/pegasus/client/PegasusClient.java | 4 +- .../infra/pegasus/client/PegasusTable.java | 121 +++++----- .../infra/pegasus/client/TestBasic.java | 217 ++++++------------ 3 files changed, 134 insertions(+), 208 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index d845f8be..0410d02f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult; import com.xiaomi.infra.pegasus.rpc.*; import com.xiaomi.infra.pegasus.tools.Tools; import java.nio.ByteBuffer; @@ -329,8 +330,7 @@ public boolean multiGetSortKeys( throw new PException("Invalid parameter: sortKeys should not be null"); } PegasusTable table = getTable(tableName); - PegasusTableInterface.MultiGetSortKeysResult result = - table.multiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, 0); + MultiGetSortKeysResult result = table.multiGetSortKeys(hashKey, maxFetchCount, -1, 0); sortKeys.addAll(result.keys); return result.allFetched; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 7dc8da46..1c48539e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1066,63 +1066,17 @@ public MultiGetResult multiGet( int maxFetchSize, int timeout /*ms*/) throws PException { - if (timeout <= 0) timeout = defaultTimeout; - - String hashKeyStr = hashKey == null ? "" : new String(hashKey); - String startSortKeyStr = startSortKey == null ? "" : new String(startSortKey); - String stopSortKeyStr = stopSortKey == null ? "" : new String(stopSortKey); - - long startTime = System.currentTimeMillis(); - long currentCheckTime = startTime; - long deadlineTime = startTime + timeout; - - ScanOptions scanOptions = new ScanOptions(); - scanOptions.noValue = options.noValue; - scanOptions.startInclusive = options.startInclusive; - scanOptions.stopInclusive = options.stopInclusive; - scanOptions.sortKeyFilterType = options.sortKeyFilterType; - scanOptions.sortKeyFilterPattern = options.sortKeyFilterPattern; - - PegasusScannerInterface pegasusScanner = - getScanner(hashKey, startSortKey, stopSortKey, scanOptions); - MultiGetResult multiGetResult = new MultiGetResult(); - multiGetResult.allFetched = false; - multiGetResult.values = new ArrayList<>(); try { - currentCheckTime = System.currentTimeMillis(); - if (currentCheckTime >= deadlineTime) { - throw new TimeoutException( - String.format( - "getting pegasusScanner takes too long time when multiGet hashKey=%s, startSortKey=%s, stopSortKey=%s, timeUsed=%s", - hashKeyStr, startSortKeyStr, stopSortKeyStr, currentCheckTime - startTime)); - } - - int remainingTime; - Pair, byte[]> pairs; - while ((pairs = pegasusScanner.next()) != null - && multiGetResult.values.size() < maxFetchCount) { - currentCheckTime = System.currentTimeMillis(); - remainingTime = (int) (deadlineTime - currentCheckTime); - if (remainingTime <= 0) { - throw new TimeoutException( - String.format( - "getting pegasusScanner takes too long time when multiGet hashKey=%s, " - + "startSortKey=%s, stopSortKey=%s, timeUsed=%s, fetchCount=%d", - hashKeyStr, - startSortKeyStr, - stopSortKeyStr, - currentCheckTime - startTime, - multiGetResult.values.size())); - } - multiGetResult.values.add(Pair.of(pairs.getLeft().getValue(), pairs.getValue())); - } - - if (pegasusScanner.next() == null) { - multiGetResult.allFetched = true; - } - return multiGetResult; + return asyncMultiGet( + hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, timeout) + .get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw new PException(new ReplicationException(error_code.error_types.ERR_TIMEOUT)); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, maxFetchCount), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); } } @@ -1220,12 +1174,11 @@ public MultiGetSortKeysResult multiGetSortKeys( if (timeout <= 0) timeout = defaultTimeout; MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult(); sortKeysResult.keys = new ArrayList<>(); - MultiGetOptions options = new MultiGetOptions(); + ScanOptions options = new ScanOptions(); options.noValue = true; - MultiGetResult result = - multiGet(hashKey, null, null, options, maxFetchCount, maxFetchSize, timeout); - for (Pair value : result.values) { - sortKeysResult.keys.add(value.getKey()); + ScanRangeResult result = scanRange(hashKey, null, null, options, maxFetchCount, timeout); + for (Pair, byte[]> pair : result.results) { + sortKeysResult.keys.add(pair.getLeft().getValue()); } sortKeysResult.allFetched = result.allFetched; return sortKeysResult; @@ -1873,6 +1826,54 @@ public List getUnorderedScanners( return ret; } + static class ScanRangeResult { + public List, byte[]>> results; + public boolean allFetched; + } + + ScanRangeResult scanRange( + byte[] hashKey, + byte[] startSortKey, + byte[] stopSortKey, + ScanOptions options, + int maxFetchCount, + int timeout /*ms*/) + throws PException { + if (timeout <= 0) timeout = defaultTimeout; + long startTime = System.currentTimeMillis(); + long currentCheckTime = startTime; + long deadlineTime = startTime + timeout; + + PegasusScannerInterface pegasusScanner = + getScanner(hashKey, startSortKey, stopSortKey, options); + ScanRangeResult scanRangeResult = new ScanRangeResult(); + scanRangeResult.allFetched = false; + scanRangeResult.results = new ArrayList<>(); + currentCheckTime = System.currentTimeMillis(); + if (currentCheckTime >= deadlineTime) { + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException()); + } + + int remainingTime; + Pair, byte[]> pair; + while ((pair = pegasusScanner.next()) != null + && (maxFetchCount == -1 || scanRangeResult.results.size() < maxFetchCount)) { + currentCheckTime = System.currentTimeMillis(); + remainingTime = (int) (deadlineTime - currentCheckTime); + if (remainingTime <= 0) { + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException()); + } + scanRangeResult.results.add(pair); + } + + if (pegasusScanner.next() == null) { + scanRangeResult.allFetched = true; + } + return scanRangeResult; + } + public void handleReplicaException( Request request, DefaultPromise promise, client_operator op, Table table, int timeout) { if (timeout <= 0) timeout = defaultTimeout; diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 5bba1aa0..563e7899 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -4,6 +4,8 @@ package com.xiaomi.infra.pegasus.client; /** @author qinzuoyan */ +import com.xiaomi.infra.pegasus.client.PegasusTable.ScanRangeResult; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult; import io.netty.util.concurrent.Future; import java.util.ArrayList; import java.util.HashMap; @@ -2631,183 +2633,90 @@ private void testWriteSizeLimit(PegasusClientInterface client) { } @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record - public void testMultiGetWhenValueExpired() throws PException, InterruptedException { + public void testScanRangeWithValueExpired() throws PException, InterruptedException { String tableName = "temp"; String hashKey = "hashKey"; // generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9] generateRecordsWithExpired(tableName, hashKey, 1000, 10); - PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); - List> values = new ArrayList<>(); + PegasusTable table = + (PegasusTable) PegasusClientFactory.getSingletonClient().openTable(tableName); // case A: scan all record // case A1: scan all record: if persistent record count >= maxFetchCount, it must return // maxFetchCount records - boolean caseA1 = - client.multiGet( - tableName, hashKey.getBytes(), null, null, new MultiGetOptions(), 5, -1, values); - Assert.assertFalse(caseA1); - Assert.assertEquals(5, values.size()); - Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); - Assert.assertEquals("persistent_4_value", new String(values.get(4).getValue())); - values.clear(); + ScanRangeResult caseA1 = + table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 5, 0); + assertScanResult(0, 4, false, caseA1); // case A2: scan all record: if persistent record count < maxFetchCount, it only return // persistent count records - boolean caseA2 = - client.multiGet( - tableName, hashKey.getBytes(), null, null, new MultiGetOptions(), 100, -1, values); - Assert.assertTrue(caseA2); - Assert.assertEquals(10, values.size()); - Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_9", new String(values.get(9).getKey())); - Assert.assertEquals("persistent_9_value", new String(values.get(9).getValue())); - values.clear(); + ScanRangeResult caseA2 = + table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 100, 0); + assertScanResult(0, 9, true, caseA2); // case B: scan limit record by "startSortKey" and "": // case B1: scan limit record by "expired_0" and "", if persistent record count >= // maxFetchCount, it must return maxFetchCount records - boolean caseB1 = - client.multiGet( - tableName, - hashKey.getBytes(), - "expired_0".getBytes(), - "".getBytes(), - new MultiGetOptions(), - 5, - -1, - values); - Assert.assertFalse(caseB1); - Assert.assertEquals( - 5, values.size()); // among "expired_0" and "persistent_4" only has 5 valid record - Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_4", new String(values.get(4).getKey())); - Assert.assertEquals("persistent_4_value", new String(values.get(4).getValue())); - values.clear(); + ScanRangeResult caseB1 = + table.scanRange( + hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 5, 0); + assertScanResult(0, 4, false, caseB1); // case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount, // it only return valid records - boolean caseB2 = - client.multiGet( - tableName, - hashKey.getBytes(), - "expired_0".getBytes(), - "".getBytes(), - new MultiGetOptions(), - 50, - -1, - values); - Assert.assertTrue(caseB2); - Assert.assertEquals(10, values.size()); // among "expired_0" and "" only has 10 valid record - Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_9", new String(values.get(9).getKey())); - Assert.assertEquals("persistent_9_value", new String(values.get(9).getValue())); - values.clear(); + ScanRangeResult caseB2 = + table.scanRange( + hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 50, 0); + assertScanResult(0, 9, true, caseB2); // case B3: scan limit record by "persistent_5" and "", if following persistent record count < // maxFetchCount, it only return valid records - boolean caseB3 = - client.multiGet( - tableName, - hashKey.getBytes(), - "persistent_5".getBytes(), - "".getBytes(), - new MultiGetOptions(), - 50, - -1, - values); - Assert.assertTrue(caseB3); - Assert.assertEquals(5, values.size()); // among "persistent_5" and "" only has 5 valid record - Assert.assertEquals("persistent_5", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_5_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_9", new String(values.get(4).getKey())); - Assert.assertEquals("persistent_9_value", new String(values.get(4).getValue())); - values.clear(); + ScanRangeResult caseB3 = + table.scanRange( + hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 50, 0); + assertScanResult(5, 9, true, caseB3); // case B4: scan limit record by "persistent_5" and "", if following persistent record count > // maxFetchCount, it only return valid records - boolean caseB4 = - client.multiGet( - tableName, - hashKey.getBytes(), - "persistent_5".getBytes(), - "".getBytes(), - new MultiGetOptions(), - 3, - -1, - values); - Assert.assertFalse(caseB4); - Assert.assertEquals(3, values.size()); // among "persistent_5" and "" only has 5 valid record - Assert.assertEquals("persistent_5", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_5_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_7", new String(values.get(2).getKey())); - Assert.assertEquals("persistent_7_value", new String(values.get(2).getValue())); - values.clear(); + ScanRangeResult caseB4 = + table.scanRange( + hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 3, 0); + assertScanResult(5, 7, false, caseB4); // case C: scan limit record by "" and "stopSortKey": // case C1: scan limit record by "" and "expired_7", if will return 0 record - boolean caseC1 = - client.multiGet( - tableName, - hashKey.getBytes(), - "".getBytes(), - "expired_7".getBytes(), - new MultiGetOptions(), - 3, - -1, - values); - Assert.assertTrue(caseC1); - Assert.assertEquals(0, values.size()); // among "" and "expired_7" has 0 valid record + ScanRangeResult caseC1 = + table.scanRange( + hashKey.getBytes(), "".getBytes(), "expired_7".getBytes(), new ScanOptions(), 3, 0); + Assert.assertTrue(caseC1.allFetched); + Assert.assertEquals(0, caseC1.results.size()); // among "" and "expired_7" has 0 valid record // case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount, // it only return valid record - boolean caseC2 = - client.multiGet( - tableName, - hashKey.getBytes(), - "".getBytes(), - "persistent_7".getBytes(), - new MultiGetOptions(), - 10, - -1, - values); - Assert.assertTrue(caseC2); - Assert.assertEquals(7, values.size()); // among "" and "persistent_7" only has 7 valid record - Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_6", new String(values.get(6).getKey())); - Assert.assertEquals("persistent_6_value", new String(values.get(6).getValue())); - values.clear(); + ScanRangeResult caseC2 = + table.scanRange( + hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 10, 0); + assertScanResult(0, 6, true, caseC2); // case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount, // it only return valid record - boolean caseC3 = - client.multiGet( - tableName, - hashKey.getBytes(), - "".getBytes(), - "persistent_7".getBytes(), - new MultiGetOptions(), - 2, - -1, - values); - Assert.assertFalse(caseC3); - Assert.assertEquals(2, values.size()); - Assert.assertEquals("persistent_0", new String(values.get(0).getKey())); - Assert.assertEquals("persistent_0_value", new String(values.get(0).getValue())); - Assert.assertEquals("persistent_1", new String(values.get(1).getKey())); - Assert.assertEquals("persistent_1_value", new String(values.get(1).getValue())); - values.clear(); + ScanRangeResult caseC3 = + table.scanRange( + hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 2, 0); + assertScanResult(0, 1, false, caseC3); // case D: use multiGetSortKeys, which actually equal with case A but no value - List sortKeys = new ArrayList<>(); - boolean caseD = client.multiGetSortKeys(tableName, hashKey.getBytes(), 10, -1, sortKeys); - Assert.assertTrue(caseD); - Assert.assertEquals(10, sortKeys.size()); - Assert.assertEquals("persistent_0", new String(sortKeys.get(0))); - Assert.assertEquals("persistent_9", new String(sortKeys.get(9))); - values.clear(); + // case D1: maxFetchCount > 0, return maxFetchCount valid record + MultiGetSortKeysResult caseD1 = table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0); + Assert.assertFalse(caseD1.allFetched); + Assert.assertEquals(5, caseD1.keys.size()); + for (int i = 0; i <= 4; i++) { + Assertions.assertEquals("persistent_" + i, new String(caseD1.keys.get(i))); + } + // case D1: maxFetchCount < 0, return all valid record + MultiGetSortKeysResult caseD2 = table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0); + Assert.assertTrue(caseD2.allFetched); + Assert.assertEquals(10, caseD2.keys.size()); + for (int i = 0; i <= 9; i++) { + Assertions.assertEquals("persistent_" + i, new String(caseD2.keys.get(i))); + } } - public void generateRecordsWithExpired( + private void generateRecordsWithExpired( String tableName, String hashKey, int expiredCount, int persistentCount) throws PException, InterruptedException { PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); @@ -2833,4 +2742,20 @@ public void generateRecordsWithExpired( } PegasusClientFactory.closeSingletonClient(); } + + private void assertScanResult( + int startIndex, int stopIndex, boolean expectAllFetched, ScanRangeResult actuallyRes) { + Assertions.assertEquals(expectAllFetched, actuallyRes.allFetched); + Assertions.assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size()); + for (int i = startIndex; i <= stopIndex; i++) { + Assertions.assertEquals( + "hashKey", new String(actuallyRes.results.get(i - startIndex).getLeft().getKey())); + Assertions.assertEquals( + "persistent_" + i, + new String(actuallyRes.results.get(i - startIndex).getLeft().getValue())); + Assertions.assertEquals( + "persistent_" + i + "_value", + new String(actuallyRes.results.get(i - startIndex).getRight())); + } + } } From b020733d54dca34439b5257186d1ae26f1209d81 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 26 Jan 2021 15:06:49 +0800 Subject: [PATCH 11/15] init --- .../infra/pegasus/client/PegasusClient.java | 2 +- .../pegasus/client/PegasusClientInterface.java | 5 ++--- .../xiaomi/infra/pegasus/client/PegasusTable.java | 13 ++----------- .../pegasus/client/PegasusTableInterface.java | 15 ++++++++++++--- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 0410d02f..9852114c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -330,7 +330,7 @@ public boolean multiGetSortKeys( throw new PException("Invalid parameter: sortKeys should not be null"); } PegasusTable table = getTable(tableName); - MultiGetSortKeysResult result = table.multiGetSortKeys(hashKey, maxFetchCount, -1, 0); + MultiGetSortKeysResult result = table.multiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, 0); sortKeys.addAll(result.keys); return result.allFetched; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 6b03cfed..444681be 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -268,10 +268,9 @@ public int batchMultiGet2( * * @param tableName table name * @param hashKey used to decide which partition to put this k-v, should not be null or empty. - * @param maxFetchCount max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. + * @param maxFetchCount max count of sortKeys to be fetched. max_fetch_count <= 0 means no limit. * default value is 100. - * @param maxFetchSize max size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. - * default value is 1000000. + * @param maxFetchSize deprecated argument, can be ignored * @param sortKeys output sort keys. * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 1c48539e..8a022253 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1186,16 +1186,7 @@ public MultiGetSortKeysResult multiGetSortKeys( @Override public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout) throws PException { - if (timeout <= 0) timeout = defaultTimeout; - try { - return asyncMultiGetSortKeys(hashKey, timeout).get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw PException.threadInterrupted(table.getTableName(), e); - } catch (TimeoutException e) { - throw PException.timeout(metaList, table.getTableName(), new Request(hashKey), timeout, e); - } catch (ExecutionException e) { - throw new PException(e); - } + return multiGetSortKeys(hashKey, 100, -1, timeout); } @Override @@ -1858,7 +1849,7 @@ ScanRangeResult scanRange( int remainingTime; Pair, byte[]> pair; while ((pair = pegasusScanner.next()) != null - && (maxFetchCount == -1 || scanRangeResult.results.size() < maxFetchCount)) { + && (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) { currentCheckTime = System.currentTimeMillis(); remainingTime = (int) (deadlineTime - currentCheckTime); if (remainingTime <= 0) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index 71f80ba8..f54fbec7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -858,11 +858,20 @@ public int batchMultiGet2( throws PException; /** - * sync version of MultiGetSortKeys, please refer to the async version {@link - * #asyncMultiGetSortKeys(byte[], int, int, int)} and {@link #asyncMultiGetSortKeys(byte[], int)} + * sync fetch sortKeys under the same hash key, which is wrapper of {@linkplain + * #getScanner(byte[], byte[], byte[], ScanOptions)} + * + * @param hashKey hashKey, should not be null or empty + * @param maxFetchCount max count of sortKeys to be fetched. max_fetch_count <= 0 means no limit. + * default value is 100. + * @param maxFetchSize deprecated argument, can be ignored + * @param timeout timeout + * @return MultiGetSortKeysResult + * @throws PException if exceed timeout will throw exception */ public MultiGetSortKeysResult multiGetSortKeys( - byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout /*ms*/) throws PException; + byte[] hashKey, int maxFetchCount, int maxFetchSize /*Deprecated*/, int timeout /*ms*/) + throws PException; public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout /*ms*/) throws PException; From 99c94d4c6e4c4f9b997a1c8ee30228a593aecb70 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 26 Jan 2021 15:14:42 +0800 Subject: [PATCH 12/15] init --- .../xiaomi/infra/pegasus/client/PegasusClientInterface.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 444681be..3af2de70 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -276,7 +276,11 @@ public int batchMultiGet2( * @throws PException throws exception if any error occurs. */ public boolean multiGetSortKeys( - String tableName, byte[] hashKey, int maxFetchCount, int maxFetchSize, List sortKeys) + String tableName, + byte[] hashKey, + int maxFetchCount, + int maxFetchSize /*Deprecated*/, + List sortKeys) throws PException; public boolean multiGetSortKeys(String tableName, byte[] hashKey, List sortKeys) From 5ab7eaf8b1bcdd7606f46e538c2432f6b6ef5079 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 27 Jan 2021 10:15:29 +0800 Subject: [PATCH 13/15] update --- src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 8a022253..652a157a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1066,6 +1066,7 @@ public MultiGetResult multiGet( int maxFetchSize, int timeout /*ms*/) throws PException { + if (timeout <= 0) timeout = defaultTimeout; try { return asyncMultiGet( hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, timeout) From db52a039109ed7c7d43fcec615edbe64400e4901 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 27 Jan 2021 14:39:30 +0800 Subject: [PATCH 14/15] update comment --- .../infra/pegasus/client/PegasusTable.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 652a157a..909a47de 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1818,11 +1818,30 @@ public List getUnorderedScanners( return ret; } + /** + * {@linkplain #scanRange(byte[], byte[], byte[], ScanOptions, int, int)} result, if fetch all + * data for {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true + */ static class ScanRangeResult { public List, byte[]>> results; public boolean allFetched; } + /** + * get scan result for {startSortKey, stopSortKey} within hashKey + * + * @param hashKey used to decide which partition to put this k-v, + * @param startSortKey start sort key scan from if null or length == 0, means start from begin + * @param stopSortKey stop sort key scan to if null or length == 0, means stop to end + * @param options scan options like endpoint inclusive/exclusive + * @param maxFetchCount max count of k-v pairs to be fetched. if <=0 means fetch all data for + * {startSortKey, stopSortKey} + * @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with + * "timeout" of config + * @return ScanRangeResult, if fetch all data for {startSortKey, stopSortKey}, + * ScanRangeResult.allFetched=true + * @throws PException + */ ScanRangeResult scanRange( byte[] hashKey, byte[] startSortKey, From 9f0cd17490af7c8b9307a983621267b7348d2597 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 27 Jan 2021 14:48:36 +0800 Subject: [PATCH 15/15] update comment --- .../infra/pegasus/client/PegasusTable.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 909a47de..f2f69654 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1838,8 +1838,8 @@ static class ScanRangeResult { * {startSortKey, stopSortKey} * @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with * "timeout" of config - * @return ScanRangeResult, if fetch all data for {startSortKey, stopSortKey}, - * ScanRangeResult.allFetched=true + * @return ScanRangeResult result{pair((hashKey, sortKey), value}, if fetch all data for + * {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true * @throws PException */ ScanRangeResult scanRange( @@ -1851,28 +1851,22 @@ ScanRangeResult scanRange( int timeout /*ms*/) throws PException { if (timeout <= 0) timeout = defaultTimeout; - long startTime = System.currentTimeMillis(); - long currentCheckTime = startTime; - long deadlineTime = startTime + timeout; + long deadlineTime = System.currentTimeMillis() + timeout; PegasusScannerInterface pegasusScanner = getScanner(hashKey, startSortKey, stopSortKey, options); ScanRangeResult scanRangeResult = new ScanRangeResult(); scanRangeResult.allFetched = false; scanRangeResult.results = new ArrayList<>(); - currentCheckTime = System.currentTimeMillis(); - if (currentCheckTime >= deadlineTime) { + if (System.currentTimeMillis() >= deadlineTime) { throw PException.timeout( metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException()); } - int remainingTime; Pair, byte[]> pair; while ((pair = pegasusScanner.next()) != null && (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) { - currentCheckTime = System.currentTimeMillis(); - remainingTime = (int) (deadlineTime - currentCheckTime); - if (remainingTime <= 0) { + if (System.currentTimeMillis() >= deadlineTime) { throw PException.timeout( metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException()); }