diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 46399f85632a..3e9ba2194aed 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -134,45 +134,40 @@ public void testLookup(LookupCacheMode cacheMode) throws Exception { iterator.close(); } - @ParameterizedTest - @EnumSource(LookupCacheMode.class) - public void testLookupIgnoreScanOptions(LookupCacheMode cacheMode) throws Exception { - initTable(cacheMode); - sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)"); - - String scanOption; - if (ThreadLocalRandom.current().nextBoolean()) { - scanOption = "'scan.mode'='latest'"; - } else { - scanOption = "'scan.snapshot-id'='2'"; - } - String query = - "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS(" - + scanOption - + ") */" - + " for system_time as of T.proctime AS D ON T.i = D.i"; - BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); - - sql("INSERT INTO T VALUES (1), (2), (3)"); - List result = iterator.collect(3); - assertThat(result) - .containsExactlyInAnyOrder( - Row.of(1, 11, 111, 1111), - Row.of(2, 22, 222, 2222), - Row.of(3, null, null, null)); - - sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)"); - Thread.sleep(2000); // wait refresh - sql("INSERT INTO T VALUES (1), (2), (3), (4)"); - result = iterator.collect(4); - assertThat(result) - .containsExactlyInAnyOrder( - Row.of(1, 11, 111, 1111), - Row.of(2, 44, 444, 4444), - Row.of(3, 33, 333, 3333), - Row.of(4, null, null, null)); - - iterator.close(); + @Test + public void testLookupIgnoreScanOptions() throws Exception { + sql( + "CREATE TABLE d (\n" + + " pt INT,\n" + + " id INT,\n" + + " data STRING,\n" + + " PRIMARY KEY (pt, id) NOT ENFORCED\n" + + ") PARTITIONED BY (pt) WITH ( 'bucket' = '1', 'continuous.discovery-interval'='1 ms' )"); + sql( + "CREATE TABLE t1 (\n" + + " pt INT,\n" + + " id INT,\n" + + " data STRING,\n" + + " `proctime` AS PROCTIME(),\n" + + " PRIMARY KEY (pt, id) NOT ENFORCED\n" + + ") PARTITIONED BY (pt) with ( 'continuous.discovery-interval'='1 ms' )"); + + sql("INSERT INTO d VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')"); + sql("INSERT INTO t1 VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')"); + + BlockingIterator streamIter = + streamSqlBlockIter( + "SELECT T.pt, T.id, T.data, D.pt, D.id, D.data " + + "FROM t1 AS T LEFT JOIN d /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'scan.snapshot-id'='2') */ " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.id = D.id"); + + assertThat(streamIter.collect(3)) + .containsExactlyInAnyOrder( + Row.of(1, 1, "one", null, null, null), + Row.of(2, 2, "two", null, null, null), + Row.of(3, 3, "three", 3, 3, "three")); + + streamIter.close(); } @ParameterizedTest