Skip to content

Commit

Permalink
[Flink] Fix 1.14 configuration cast
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Dec 27, 2023
1 parent f729599 commit bb1663c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableConfig;
Expand All @@ -60,6 +61,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
Expand Down Expand Up @@ -136,12 +138,17 @@ public DataTableSource(
this.tableIdentifier = tableIdentifier;
this.streaming = streaming;
this.context = context;
Map<String, String> mapConf;
ReadableConfig config = context.getConfiguration();
if (config instanceof Configuration) { // It can't be cast to TableConfig for flink 1.14
mapConf = ((Configuration) config).toMap();
} else if (config instanceof TableConfig) {
mapConf = ((TableConfig) config).getConfiguration().toMap();
} else {
throw new IllegalArgumentException("Unexpected config: " + config.getClass());
}
this.valueStatisticsDisable =
Options.fromMap(
((TableConfig) context.getConfiguration())
.getConfiguration()
.toMap())
.get(SOURCE_VALUE_FILTER_STATISTICS_DISABLE);
Options.fromMap(mapConf).get(SOURCE_VALUE_FILTER_STATISTICS_DISABLE);
this.logStoreTableFactory = logStoreTableFactory;
this.predicate = predicate;
this.projectFields = projectFields;
Expand Down Expand Up @@ -175,7 +182,8 @@ public ChangelogMode getChangelogMode() {
return ChangelogMode.all();
}

// optimization: transaction consistency and all changelog mode avoid the generation of
// optimization: transaction consistency andls - all changelog mode avoid the generation
// of
// normalized nodes. See FlinkTableSink.getChangelogMode validation.
return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
&& options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL
Expand Down Expand Up @@ -326,11 +334,11 @@ public TableStats reportStatistics() {
return TableStats.UNKNOWN;
}

// if (valueStatisticsDisable
// && checkAllValuePredication(
// table.partitionKeys(), table.primaryKeys(), predicate)) {
// return TableStats.UNKNOWN;
// }
if (valueStatisticsDisable
&& checkAllValuePredication(
table.partitionKeys(), table.primaryKeys(), predicate)) {
return TableStats.UNKNOWN;
}

scanSplitsForInference();
return new TableStats(splitStatistics.totalRowCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.plan.stats.TableStats;
import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -148,7 +147,6 @@ public void testTableFilterValueStatistics() throws Exception {
}

@Test
@Ignore
public void testTableFilterValueDisableStatistics() throws Exception {
FileStoreTable table = writeData();
PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType());
Expand Down

0 comments on commit bb1663c

Please sign in to comment.