Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Oct 28, 2024
1 parent 1f344c6 commit 981ad67
Showing 1 changed file with 34 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,45 +134,40 @@ public void testLookup(LookupCacheMode cacheMode) throws Exception {
iterator.close();
}

@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testLookupIgnoreScanOptions(LookupCacheMode cacheMode) throws Exception {
initTable(cacheMode);
sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");

String scanOption;
if (ThreadLocalRandom.current().nextBoolean()) {
scanOption = "'scan.mode'='latest'";
} else {
scanOption = "'scan.snapshot-id'='2'";
}
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS("
+ scanOption
+ ") */"
+ " for system_time as of T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111, 1111),
Row.of(2, 22, 222, 2222),
Row.of(3, null, null, null));

sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111, 1111),
Row.of(2, 44, 444, 4444),
Row.of(3, 33, 333, 3333),
Row.of(4, null, null, null));

iterator.close();
@Test
public void testLookupIgnoreScanOptions() throws Exception {
sql(
"CREATE TABLE d (\n"
+ " pt INT,\n"
+ " id INT,\n"
+ " data STRING,\n"
+ " PRIMARY KEY (pt, id) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH ( 'bucket' = '1', 'continuous.discovery-interval'='1 ms' )");
sql(
"CREATE TABLE t1 (\n"
+ " pt INT,\n"
+ " id INT,\n"
+ " data STRING,\n"
+ " `proctime` AS PROCTIME(),\n"
+ " PRIMARY KEY (pt, id) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) with ( 'continuous.discovery-interval'='1 ms' )");

sql("INSERT INTO d VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')");
sql("INSERT INTO t1 VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')");

BlockingIterator<Row, Row> streamIter =
streamSqlBlockIter(
"SELECT T.pt, T.id, T.data, D.pt, D.id, D.data "
+ "FROM t1 AS T LEFT JOIN d /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'scan.snapshot-id'='2') */ "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.id = D.id");

assertThat(streamIter.collect(3))
.containsExactlyInAnyOrder(
Row.of(1, 1, "one", null, null, null),
Row.of(2, 2, "two", null, null, null),
Row.of(3, 3, "three", 3, 3, "three"));

streamIter.close();
}

@ParameterizedTest
Expand Down

0 comments on commit 981ad67

Please sign in to comment.