Skip to content

Commit

Permalink
Merge branch 'apache:master' into binlogClient-exception
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Nov 14, 2024
2 parents 6cb9331 + 51c679a commit 4d932eb
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,105 @@ void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Except
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.\\.*",
"id,age,'Juvenile' AS roleName",
"age < 18",
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.\\.*",
"id,age,name AS roleName",
"age >= 18",
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 18, Alice], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, 20, Bob], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 20, Bob], after=[2, 30, Bob], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`age` TINYINT,`roleName` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, 15, Juvenile], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, 25, Derrida], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 25, Derrida], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.mytable2",
"*,'Juvenile' AS roleName",
"age < 18",
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
"id,name,age,description,name AS roleName",
"age >= 18",
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
"id,UPPER(name) AS name,age,description",
"age >= 18",
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
}

/** This tests if transform generates metadata info correctly. */
@ParameterizedTest
@EnumSource
Expand Down Expand Up @@ -1227,6 +1326,100 @@ void testPostAsteriskWithSchemaEvolution() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
}

@Test
void testTransformWithFilterButNoProjection() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();

sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
List<Event> events = generateSchemaEvolutionEvents(tableId);

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.\\.*",
null,
"id > 1",
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");

assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}",

// Add column stage
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",

// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",

// Rename column stage
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",

// Drop column stage
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
}

@Test
void testTransformUnmatchedSchemaEvolution() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
Expand Down
Loading

0 comments on commit 4d932eb

Please sign in to comment.