Skip to content

Commit

Permalink
[Enhancement] enable incremental scan range delivery by default (#52907)
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt authored Nov 18, 2024
1 parent 79fa702 commit f50d5c5
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,7 @@ public SessionVariable setHiveTempStagingDir(String hiveTempStagingDir) {
private int connectorRemoteFileAsyncTaskSize = 4;

@VarAttr(name = ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES)
private boolean enableConnectorIncrementalScanRanges = false;
private boolean enableConnectorIncrementalScanRanges = true;

@VarAttr(name = CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE)
private int connectorIncrementalScanRangeSize = 500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public void prepareCaptureVersion(boolean isPhasedSchedule) {
if (build != null) {
captureVersionFragment = build;
fragments.add(build);
idToFragment.put(captureVersionFragment.getFragmentId(), captureVersionFragment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class PlanTestBase extends PlanTestNoneDBBase {

Expand Down Expand Up @@ -999,6 +1000,7 @@ public static List<TScanRangeParams> collectAllScanRangeParams(DefaultCoordinato
scanRangeParams.addAll(x);
}
}
return scanRangeParams;
// remove empty scan range introduced in incremental scan ranges feature.
return scanRangeParams.stream().filter(x -> !(x.isSetEmpty() && x.isEmpty())).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,42 @@ PLAN FRAGMENT 0(F02)
DOP: 16
INSTANCES
INSTANCE(0-F02#0)
BE: 10002
BE: 10001

PLAN FRAGMENT 1(F01)
DOP: 16
INSTANCES
INSTANCE(1-F01#0)
DESTINATIONS: 0-F02#0
BE: 10003
INSTANCE(2-F01#1)
DESTINATIONS: 0-F02#0
BE: 10002
INSTANCE(3-F01#2)
DESTINATIONS: 0-F02#0
BE: 10001

PLAN FRAGMENT 2(F00)
DOP: 16
INSTANCES
INSTANCE(2-F00#0)
DESTINATIONS: 1-F01#0
INSTANCE(4-F00#0)
DESTINATIONS: 1-F01#0,2-F01#1,3-F01#2
BE: 10001
SCAN RANGES
0:HdfsScanNode
1. <PLACEHOLDER>
INSTANCE(5-F00#1)
DESTINATIONS: 1-F01#0,2-F01#1,3-F01#2
BE: 10002
SCAN RANGES
0:HdfsScanNode
1. <PLACEHOLDER>
INSTANCE(6-F00#2)
DESTINATIONS: 1-F01#0,2-F01#1,3-F01#2
BE: 10003
SCAN RANGES
0:HdfsScanNode
1. <PLACEHOLDER>

[fragment]
PLAN FRAGMENT 0
Expand Down
3 changes: 3 additions & 0 deletions test/sql/test_deltalake/R/test_deltalake_trace
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ create external catalog delta_test_${uuid0} PROPERTIES (
);
-- result:
-- !result
set enable_connector_incremental_scan_ranges = false;
-- result:
-- !result
set enable_profile=true;
-- result:
-- !result
Expand Down
1 change: 1 addition & 0 deletions test/sql/test_deltalake/T/test_deltalake_trace
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ create external catalog delta_test_${uuid0} PROPERTIES (
"aws.s3.endpoint"="${oss_endpoint}"
);

set enable_connector_incremental_scan_ranges = false;
set enable_profile=true;

function: assert_trace_times_contains("select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_timestamp;", "TraceDefaultJsonHandler.ReadJsonFile")
Expand Down
3 changes: 3 additions & 0 deletions test/sql/test_iceberg/R/test_iceberg_distributed_plan
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ create external catalog iceberg_sql_test_${uuid0}
PROPERTIES ("type"="iceberg", "iceberg.catalog.type"="hive", "iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}","enable_iceberg_metadata_cache"="false","aws.s3.access_key" = "${oss_ak}","aws.s3.secret_key" = "${oss_sk}","aws.s3.endpoint" = "${oss_endpoint}");
-- result:
-- !result
set enable_connector_incremental_scan_ranges=false;
-- result:
-- !result
set enable_profile=true;
-- result:
-- !result
Expand Down
3 changes: 3 additions & 0 deletions test/sql/test_iceberg/R/test_iceberg_identifier_metrics_cache
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
create external catalog iceberg_sql_test_${uuid0} PROPERTIES ("type"="iceberg", "iceberg.catalog.type"="hive", "iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}","enable_iceberg_metadata_cache"="true","aws.s3.access_key" = "${oss_ak}","aws.s3.secret_key" = "${oss_sk}","aws.s3.endpoint" = "${oss_endpoint}");
-- result:
-- !result
set enable_connector_incremental_scan_ranges=false;
-- result:
-- !result
set enable_profile=true;
-- result:
-- !result
Expand Down
3 changes: 3 additions & 0 deletions test/sql/test_iceberg/R/test_iceberg_logical_metadata_table
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ select count(1) from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uui
-- result:
0
-- !result
set enable_connector_incremental_scan_ranges=false;
-- result:
-- !result
set enable_profile=true;
-- result:
-- !result
Expand Down
1 change: 1 addition & 0 deletions test/sql/test_iceberg/T/test_iceberg_distributed_plan
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
create external catalog iceberg_sql_test_${uuid0}
PROPERTIES ("type"="iceberg", "iceberg.catalog.type"="hive", "iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}","enable_iceberg_metadata_cache"="false","aws.s3.access_key" = "${oss_ak}","aws.s3.secret_key" = "${oss_sk}","aws.s3.endpoint" = "${oss_endpoint}");

set enable_connector_incremental_scan_ranges=false;
set enable_profile=true;
set plan_mode=distributed;
set new_planner_optimize_timeout=30000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

create external catalog iceberg_sql_test_${uuid0} PROPERTIES ("type"="iceberg", "iceberg.catalog.type"="hive", "iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}","enable_iceberg_metadata_cache"="true","aws.s3.access_key" = "${oss_ak}","aws.s3.secret_key" = "${oss_sk}","aws.s3.endpoint" = "${oss_endpoint}");

set enable_connector_incremental_scan_ranges=false;
set enable_profile=true;

function: assert_trace_values_contains("select * from iceberg_sql_test_${uuid0}.iceberg_ci_db.test_iceberg_identifier_cache where pk=1;","resultDataFiles=CounterResult{unit=COUNT, value=2}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ select count(1) from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uui
set enable_iceberg_column_statistics=true;
select count(1) from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$logical_iceberg_metadata where column_stats is null;

set enable_connector_incremental_scan_ranges=false;
set enable_profile=true;
set plan_mode=local;
function: assert_trace_values_contains("select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} where id=1;", "resultDataFiles=CounterResult{unit=COUNT, value=1}")
Expand Down

0 comments on commit f50d5c5

Please sign in to comment.