Skip to content

Commit

Permalink
[bugfix][connector-v2] fix cdc mysql reader err (#3465)
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y authored Nov 18, 2022
1 parent 7b92d2a commit 1b406b5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public MySqlBinlogFetchTask(IncrementalSplit split) {
}

@Override
public void execute(Context context) throws Exception {
public void execute(FetchTask.Context context) throws Exception {
MySqlSourceFetchTaskContext sourceFetchContext = (MySqlSourceFetchTaskContext) context;
taskRunning = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MySqlSnapshotFetchTask(SnapshotSplit split) {
}

@Override
public void execute(Context context) throws Exception {
public void execute(FetchTask.Context context) throws Exception {
MySqlSourceFetchTaskContext sourceFetchContext = (MySqlSourceFetchTaskContext) context;
taskRunning = true;
snapshotSplitReadTask =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
Expand Down Expand Up @@ -97,7 +98,7 @@ public MySqlSnapshotSplitReadTask(

@Override
public SnapshotResult execute(
ChangeEventSourceContext context, OffsetContext previousOffset)
ChangeEventSource.ChangeEventSourceContext context, OffsetContext previousOffset)
throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);
final SnapshotContext ctx;
Expand All @@ -119,10 +120,10 @@ public SnapshotResult execute(

@Override
protected SnapshotResult doExecute(
ChangeEventSourceContext context,
ChangeEventSource.ChangeEventSourceContext context,
OffsetContext previousOffset,
SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext,
AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask)
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
Expand Down Expand Up @@ -152,12 +153,12 @@ protected SnapshotResult doExecute(
}

@Override
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
return new SnapshottingTask(false, true);
}

@Override
protected SnapshotContext prepare(ChangeEventSourceContext changeEventSourceContext)
protected AbstractSnapshotChangeEventSource.SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext)
throws Exception {
return new MySqlSnapshotContext();
}
Expand Down Expand Up @@ -249,7 +250,7 @@ private void createDataEventsForTable(
}

protected ChangeRecordEmitter getChangeRecordEmitter(
SnapshotContext snapshotContext, TableId tableId, Object[] row) {
AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, TableId tableId, Object[] row) {
snapshotContext.offset.event(tableId, clock.currentTime());
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.debezium.relational.Column;
import lombok.extern.slf4j.Slf4j;

/** Utilities for converting from MySQL types to Flink types. */
/** Utilities for converting from MySQL types to SeaTunnel types. */

@Slf4j
public class MySqlTypeUtils {
Expand Down

0 comments on commit 1b406b5

Please sign in to comment.