diff --git a/docs/content/connectors/mongodb-cdc(ZH).md b/docs/content/connectors/mongodb-cdc(ZH).md index 38c8bf6c343..93f35b2f297 100644 --- a/docs/content/connectors/mongodb-cdc(ZH).md +++ b/docs/content/connectors/mongodb-cdc(ZH).md @@ -201,30 +201,32 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为 optional (none) String - 电流和分离 连接选项 of MongoDB. eg.
+ MongoDB连接选项。 例如:
replicaSet=test&connectTimeoutMS=300000 - copy.existing - optional - true - Boolean - 是否从源集合复制现有数据。 + scan.startup.mode + optional + initial + String + MongoDB CDC 消费者可选的启动模式, + 合法的模式为 "initial","latest-offset" 和 "timestamp"。 + 请查阅 启动模式 章节了解更多详细信息。 - copy.existing.queue.size - optional - 10240 - Integer - 复制数据时要使用的队列的最大大小。 + scan.startup.timestamp-millis + optional + (none) + Long + 起始毫秒数, 仅适用于 'timestamp' 启动模式. batch.size optional 1024 Integer - 光标批次大小。 + Cursor 批次大小。 poll.max.batch.size @@ -245,7 +247,7 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为 optional 0 Integer - 发送检测信号消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 + 心跳间隔(毫秒)。使用 0 禁用。 scan.incremental.snapshot.enabled @@ -337,20 +339,38 @@ CREATE TABLE products ( MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后在处理**甚至失败时继续读取带有**的更改流事件。 -### Snapshot When Startup Or Not +### 启动模式 -配置选项 `copy.existing` 指定在 MongoDB CDC 消费者启动时是否执行快照。
默认是 `true`. +配置选项```scan.startup.mode```指定 MySQL CDC 使用者的启动模式。有效枚举包括: -### 快照数据筛选器 +- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 oplog。 +- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。 +- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 oplog 事件。 -配置选项 `copy.existing.pipeline` 描述复制现有数据时的筛选器。
-这可以只过滤所需的数据,并改进复制管理器对索引的使用。 +例如使用 DataStream API: +```java +MongoDBSource.builder() + .startupOptions(StartupOptions.latest()) // Start from latest offset + .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp + .build() +``` -在下面的示例中,`$match` 聚合运算符确保只复制关闭字段设置为 false 的文档。 +and with SQL: +```SQL +CREATE TABLE mongodb_source (...) WITH ( + 'connector' = 'mongodb-cdc', + 'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动 + ... + 'scan.incremental.snapshot.enabled' = 'true', -- 指定时间戳启动,需要开启增量快照读 + 'scan.startup.mode' = 'timestamp', -- 指定时间戳启动模式 + 'scan.startup.timestamp-millis' = '1667232000000' -- 启动毫秒时间 + ... +) ``` -'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]' -``` + +**Notes:** +- 'timestamp' 指定时间戳启动模式,需要开启增量快照读。 ### 更改流 diff --git a/docs/content/connectors/mongodb-cdc.md b/docs/content/connectors/mongodb-cdc.md index 9cf60a90afe..490cebff3a6 100644 --- a/docs/content/connectors/mongodb-cdc.md +++ b/docs/content/connectors/mongodb-cdc.md @@ -206,11 +206,19 @@ Connector Options - copy.existing - optional - true - Boolean - Whether copy existing data from source collections. + scan.startup.mode + optional + initial + String + Optional startup mode for MongoDB CDC consumer, valid enumerations are "initial", "latest-offset" and "timestamp". + Please see Startup Reading Position section for more detailed information. + + + scan.startup.timestamp-millis + optional + (none) + Long + Timestamp in millis of the start point, only used for 'timestamp' startup mode. copy.existing.queue.size @@ -337,20 +345,40 @@ Features The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with **exactly-once processing** even failures happen. -### Snapshot When Startup Or Not +### Startup Reading Position -The config option `copy.existing` specifies whether do snapshot when MongoDB CDC consumer startup.
Defaults to `true`. +The config option `scan.startup.mode` specifies the startup mode for MongoDB CDC consumer. The valid enumerations are: -### Snapshot Data Filters +- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest oplog. +- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from + the end of the oplog which means only have the changes since the connector was started. +- `timestamp`: Skip snapshot phase and start reading oplog events from a specific timestamp. -The config option `copy.existing.pipeline` describing the filters when copying existing data.
-This can filter only required data and improve the use of indexes by the copying manager. +For example in DataStream API: +```java +MongoDBSource.builder() + .startupOptions(StartupOptions.latest()) // Start from latest offset + .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp + .build() +``` -In the following example, the `$match` aggregation operator ensures that only documents in which the closed field is set to false are copied. +and with SQL: +```SQL +CREATE TABLE mongodb_source (...) WITH ( + 'connector' = 'mongodb-cdc', + 'scan.startup.mode' = 'latest-offset', -- Start from latest offset + ... + 'scan.incremental.snapshot.enabled' = 'true', -- To use timestamp startup mode should enable incremental snapshot. + 'scan.startup.mode' = 'timestamp', -- Start from timestamp + 'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under timestamp startup mode + ... +) ``` -'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]' -``` + +**Notes:** +- 'timestamp' startup mode is not supported by legacy source. To use timestamp startup mode, you need to enable incremental snapshot. + ### Change Streams diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java index 7cec9ee6b25..22d0daac967 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java @@ -18,6 +18,7 @@ import com.ververica.cdc.connectors.base.config.SourceConfig; import com.ververica.cdc.connectors.base.dialect.DataSourceDialect; +import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState; import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; @@ -128,10 +129,34 @@ public void close() {} // ------------------------------------------------------------------------------------------ public StreamSplit createStreamSplit() { + StartupOptions startupOptions = sourceConfig.getStartupOptions(); + + Offset startingOffset; + switch (startupOptions.startupMode) { + case LATEST_OFFSET: + startingOffset = dialect.displayCurrentOffset(sourceConfig); + break; + case EARLIEST_OFFSET: + startingOffset = offsetFactory.createInitialOffset(); + break; + case TIMESTAMP: + startingOffset = + offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis); + break; + case SPECIFIC_OFFSETS: + startingOffset = + offsetFactory.newOffset( + startupOptions.specificOffsetFile, + startupOptions.specificOffsetPos.longValue()); + break; + default: + throw new IllegalStateException( + "Unsupported startup mode " + startupOptions.startupMode); + } return new StreamSplit( STREAM_SPLIT_ID, - dialect.displayCurrentOffset(sourceConfig), + startingOffset, offsetFactory.createNoStoppingOffset(), new ArrayList<>(), new HashMap<>(), diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetFactory.java index 0b83362e4b7..c9b9cd95440 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetFactory.java @@ -31,6 +31,8 @@ public OffsetFactory() {} public abstract Offset newOffset(Long position); + public abstract Offset createTimestampOffset(long timestampMillis); + public abstract Offset createInitialOffset(); public abstract Offset createNoStoppingOffset(); diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffset.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffset.java index e87470b43f6..e92c516c1f9 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffset.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffset.java @@ -57,6 +57,10 @@ public BinlogOffset(String filename, long position) { this(filename, position, 0L, 0L, 0L, null, null); } + public BinlogOffset(long binlogEpochSecs) { + this(null, 0L, 0L, 0L, binlogEpochSecs, null, null); + } + public BinlogOffset( String filename, long position, diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffsetFactory.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffsetFactory.java index b4b335f15bb..dcbd570b012 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffsetFactory.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/offset/BinlogOffsetFactory.java @@ -43,6 +43,11 @@ public Offset newOffset(Long position) { throw new FlinkRuntimeException("not supported create new Offset by Long position."); } + @Override + public Offset createTimestampOffset(long timestampMillis) { + return new BinlogOffset(timestampMillis / 1000); + } + @Override public Offset createInitialOffset() { return BinlogOffset.INITIAL_OFFSET; diff --git a/flink-connector-mongodb-cdc/pom.xml b/flink-connector-mongodb-cdc/pom.xml index 5a43d245038..b1b1732492e 100644 --- a/flink-connector-mongodb-cdc/pom.xml +++ b/flink-connector-mongodb-cdc/pom.xml @@ -144,6 +144,12 @@ under the License. testcontainers ${testcontainers.version} test + + + junit + junit + + diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java index a98bd3b1c57..6c5d7302bc4 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java @@ -40,7 +40,6 @@ import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; -import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; @@ -77,7 +76,7 @@ public static class Builder { private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue(); private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue(); private Boolean updateLookup = true; - private Boolean copyExisting = COPY_EXISTING.defaultValue(); + private Boolean copyExisting = true; private Integer copyExistingMaxThreads; private Integer copyExistingQueueSize; private String copyExistingPipeline; diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index d0bda19c20b..224a6d626ad 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -136,18 +136,13 @@ public MongoDBSourceBuilder pollMaxBatchSize(int pollMaxBatchSize) { } /** - * copy.existing + * scan.startup.mode * - *

Copy existing data from source collections and convert them to Change Stream events on - * their respective topics. Any changes to the data that occur during the copy process are - * applied once the copy is completed. + *

Optional startup mode for MongoDB CDC consumer, valid enumerations are initial, + * latest-offset, timestamp. Default: initial */ - public MongoDBSourceBuilder copyExisting(boolean copyExisting) { - if (copyExisting) { - this.configFactory.startupOptions(StartupOptions.initial()); - } else { - this.configFactory.startupOptions(StartupOptions.latest()); - } + public MongoDBSourceBuilder startupOptions(StartupOptions startupOptions) { + this.configFactory.startupOptions(startupOptions); return this; } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java index 9032a6f71d8..d53f0c2dfa4 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java @@ -78,15 +78,6 @@ public class MongoDBSourceOptions { "The ampersand-separated MongoDB connection options. " + "eg. replicaSet=test&connectTimeoutMS=300000"); - public static final ConfigOption COPY_EXISTING = - ConfigOptions.key("copy.existing") - .booleanType() - .defaultValue(Boolean.TRUE) - .withDescription( - "Copy existing data from source collections and convert them " - + "to Change Stream events on their respective topics. Any changes to the data " - + "that occur during the copy process are applied once the copy is completed."); - public static final ConfigOption COPY_EXISTING_QUEUE_SIZE = ConfigOptions.key("copy.existing.queue.size") .intType() diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/offset/ChangeStreamOffsetFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/offset/ChangeStreamOffsetFactory.java index e848ed9c666..d25b6452ed3 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/offset/ChangeStreamOffsetFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/offset/ChangeStreamOffsetFactory.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mongodb.source.offset; +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; import java.util.Map; @@ -39,7 +40,12 @@ public ChangeStreamOffset newOffset(String filename, Long position) { @Override public ChangeStreamOffset newOffset(Long position) { - return new ChangeStreamOffset(bsonTimestampFromEpochMillis(position)); + throw new UnsupportedOperationException("not supported create new Offset by position."); + } + + @Override + public Offset createTimestampOffset(long timestampMillis) { + return new ChangeStreamOffset(bsonTimestampFromEpochMillis(timestampMillis)); } @Override diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index a29a3a384b4..197e69c72e9 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mongodb.table; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -29,6 +30,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; @@ -69,7 +71,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final String password; private final String database; private final String collection; - private final Boolean copyExisting; + private final StartupOptions startupOptions; private final Integer copyExistingQueueSize; private final Integer batchSize; private final Integer pollMaxBatchSize; @@ -100,7 +102,7 @@ public MongoDBTableSource( @Nullable String database, @Nullable String collection, @Nullable String connectionOptions, - @Nullable Boolean copyExisting, + StartupOptions startupOptions, @Nullable Integer copyExistingQueueSize, @Nullable Integer batchSize, @Nullable Integer pollMaxBatchSize, @@ -119,7 +121,7 @@ public MongoDBTableSource( this.database = database; this.collection = collection; this.connectionOptions = connectionOptions; - this.copyExisting = copyExisting; + this.startupOptions = checkNotNull(startupOptions); this.copyExistingQueueSize = copyExistingQueueSize; this.batchSize = batchSize; this.pollMaxBatchSize = pollMaxBatchSize; @@ -182,6 +184,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scheme(scheme) .hosts(hosts) .closeIdleReaders(closeIdlerReaders) + .startupOptions(startupOptions) .deserializer(deserializer); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); @@ -189,7 +192,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { Optional.ofNullable(username).ifPresent(builder::username); Optional.ofNullable(password).ifPresent(builder::password); Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions); - Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting); Optional.ofNullable(batchSize).ifPresent(builder::batchSize); Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize); Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis); @@ -206,12 +208,24 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .hosts(hosts) .deserializer(deserializer); + switch (startupOptions.startupMode) { + case INITIAL: + builder.copyExisting(true); + break; + case LATEST_OFFSET: + builder.copyExisting(false); + break; + default: + throw new ValidationException( + startupOptions.startupMode + + " is not supported by legacy source. To use this feature, 'scan.incremental.snapshot.enabled' needs to be set to true."); + } + Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); Optional.ofNullable(username).ifPresent(builder::username); Optional.ofNullable(password).ifPresent(builder::password); Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions); - Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting); Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize); Optional.ofNullable(batchSize).ifPresent(builder::batchSize); Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize); @@ -266,7 +280,7 @@ public DynamicTableSource copy() { database, collection, connectionOptions, - copyExisting, + startupOptions, copyExistingQueueSize, batchSize, pollMaxBatchSize, @@ -299,7 +313,7 @@ public boolean equals(Object o) { && Objects.equals(database, that.database) && Objects.equals(collection, that.collection) && Objects.equals(connectionOptions, that.connectionOptions) - && Objects.equals(copyExisting, that.copyExisting) + && Objects.equals(startupOptions, that.startupOptions) && Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize) && Objects.equals(batchSize, that.batchSize) && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize) @@ -325,7 +339,7 @@ public int hashCode() { database, collection, connectionOptions, - copyExisting, + startupOptions, copyExistingQueueSize, batchSize, pollMaxBatchSize, diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index f3180f573f5..960253faa87 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -19,6 +19,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -26,6 +27,7 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.utils.OptionUtils; import java.time.ZoneId; @@ -34,10 +36,11 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS; -import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING_QUEUE_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.DATABASE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; @@ -51,6 +54,7 @@ import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME; import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** Factory for creating configured instance of {@link MongoDBTableSource}. */ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { @@ -83,7 +87,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { Integer heartbeatIntervalMillis = config.get(HEARTBEAT_INTERVAL_MILLIS); - Boolean copyExisting = config.get(COPY_EXISTING); + StartupOptions startupOptions = getStartupOptions(config); Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null); String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE); @@ -114,7 +118,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { database, collection, connectionOptions, - copyExisting, + startupOptions, copyExistingQueueSize, batchSize, pollMaxBatchSize, @@ -133,6 +137,37 @@ private void checkPrimaryKey(UniqueConstraint pk, String message) { message); } + private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; + + private static StartupOptions getStartupOptions(ReadableConfig config) { + String modeString = config.get(SCAN_STARTUP_MODE); + + switch (modeString.toLowerCase()) { + case SCAN_STARTUP_MODE_VALUE_INITIAL: + return StartupOptions.initial(); + case SCAN_STARTUP_MODE_VALUE_LATEST: + return StartupOptions.latest(); + case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: + return StartupOptions.timestamp( + checkNotNull( + config.get(SCAN_STARTUP_TIMESTAMP_MILLIS), + String.format( + "To use timestamp startup mode, the startup timestamp millis '%s' must be set.", + SCAN_STARTUP_TIMESTAMP_MILLIS.key()))); + default: + throw new ValidationException( + String.format( + "Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s", + SCAN_STARTUP_MODE.key(), + SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_LATEST, + SCAN_STARTUP_MODE_VALUE_TIMESTAMP, + modeString)); + } + } + @Override public String factoryIdentifier() { return IDENTIFIER; @@ -154,7 +189,8 @@ public Set> optionalOptions() { options.add(CONNECTION_OPTIONS); options.add(DATABASE); options.add(COLLECTION); - options.add(COPY_EXISTING); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); options.add(COPY_EXISTING_QUEUE_SIZE); options.add(BATCH_SIZE); options.add(POLL_MAX_BATCH_SIZE); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index f91f6222f05..5df135b52c8 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -19,6 +19,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.utils.LegacyRowResource; @@ -52,6 +53,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; /** Integration tests for MongoDB change stream event SQL source. */ @RunWith(Parameterized.class) @@ -220,6 +222,94 @@ public void testConsumingAllEvents() throws ExecutionException, InterruptedExcep result.getJobClient().get().cancel().get(); } + @Test + public void testStartupFromTimestamp() throws Exception { + String database = ROUTER.executeCommandFileInSeparateDatabase("inventory"); + + // Unfortunately we have to sleep here to differ initial and later-generating changes in + // oplog by timestamp + Thread.sleep(5000L); + + String sourceDDL = + String.format( + "CREATE TABLE mongodb_source (" + + " _id STRING NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " PRIMARY KEY (_id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'mongodb-cdc'," + + " 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000'," + + " 'hosts' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database' = '%s'," + + " 'collection' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'scan.startup.mode' = 'timestamp'," + + " 'scan.startup.timestamp-millis' = '" + + System.currentTimeMillis() + + "'," + + " 'heartbeat.interval.ms' = '1000'" + + ")", + ROUTER.getHostAndPort(), + FLINK_USER, + FLINK_USER_PASSWORD, + database, + "products", + parallelismSnapshot); + + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + if (!parallelismSnapshot) { + assertThrows( + ValidationException.class, + () -> + tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name")); + return; + } + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name"); + + MongoCollection products = + mongodbClient.getDatabase(database).getCollection("products"); + + products.insertOne( + productDocOf( + "100000000000000000000110", + "jacket", + "water resistent white wind breaker", + 0.2)); + + products.insertOne( + productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18)); + + waitForSinkSize("sink", 2); + + String[] expected = new String[] {"jacket,0.200", "scooter,5.180"}; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + @Test public void testAllTypes() throws Throwable { String database = ROUTER.executeCommandFileInSeparateDatabase("column_type_test"); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index 9c3d49c00dc..2a55231fef3 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.util.ExceptionUtils; +import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils; import org.junit.Test; @@ -48,7 +49,6 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; -import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; @@ -93,7 +93,6 @@ public class MongoDBTableFactoryTest { private static final String MY_DATABASE = "myDB"; private static final String MY_TABLE = "myTable"; private static final ZoneId LOCAL_TIME_ZONE = ZoneId.systemDefault(); - private static final Boolean COPY_EXISTING_DEFAULT = COPY_EXISTING.defaultValue(); private static final int BATCH_SIZE_DEFAULT = BATCH_SIZE.defaultValue(); private static final int POLL_MAX_BATCH_SIZE_DEFAULT = POLL_MAX_BATCH_SIZE.defaultValue(); private static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = POLL_AWAIT_TIME_MILLIS.defaultValue(); @@ -123,7 +122,7 @@ public void testCommonProperties() { MY_DATABASE, MY_TABLE, null, - COPY_EXISTING_DEFAULT, + StartupOptions.initial(), null, BATCH_SIZE_DEFAULT, POLL_MAX_BATCH_SIZE_DEFAULT, @@ -142,7 +141,8 @@ public void testOptionalProperties() { Map options = getAllOptions(); options.put("scheme", MONGODB_SRV_SCHEME); options.put("connection.options", "replicaSet=test&connectTimeoutMS=300000"); - options.put("copy.existing", "false"); + options.put("scan.startup.mode", "timestamp"); + options.put("scan.startup.timestamp-millis", "1667232000000"); options.put("copy.existing.queue.size", "100"); options.put("batch.size", "101"); options.put("poll.max.batch.size", "102"); @@ -164,7 +164,7 @@ public void testOptionalProperties() { MY_DATABASE, MY_TABLE, "replicaSet=test&connectTimeoutMS=300000", - false, + StartupOptions.timestamp(1667232000000L), 100, 101, 102, @@ -200,7 +200,7 @@ public void testMetadataColumns() { MY_DATABASE, MY_TABLE, null, - COPY_EXISTING_DEFAULT, + StartupOptions.initial(), null, BATCH_SIZE_DEFAULT, POLL_MAX_BATCH_SIZE_DEFAULT, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/offset/RedoLogOffsetFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/offset/RedoLogOffsetFactory.java index 3b3ab233300..142d2fe4ef8 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/offset/RedoLogOffsetFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/offset/RedoLogOffsetFactory.java @@ -45,6 +45,12 @@ public Offset newOffset(Long position) { "Do not support to create RedoLogOffset by position."); } + @Override + public Offset createTimestampOffset(long timestampMillis) { + throw new UnsupportedOperationException( + "Do not support to create RedoLogOffset by timestamp."); + } + @Override public Offset createInitialOffset() { return RedoLogOffset.INITIAL_OFFSET; diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java index d711fae0972..9523ec172f4 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java @@ -44,6 +44,11 @@ public Offset newOffset(Long position) { "not supported create new Offset by filename and position."); } + @Override + public Offset createTimestampOffset(long timestampMillis) { + throw new UnsupportedOperationException("not supported create new Offset by timestamp."); + } + @Override public Offset createInitialOffset() { return LsnOffset.INITIAL_OFFSET;