Skip to content

Commit

Permalink
Repro for multiple captures in debezium for db2
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamas Kiss committed Oct 8, 2021
1 parent 1ae87a6 commit bf7fe76
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -44,6 +46,7 @@

/** Integration tests for DB2 CDC source. */
public class Db2ConnectorITCase extends Db2TestBase {
private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class);

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -94,36 +97,39 @@ public void testConsumingAllEvents()
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

// async submit job
// TableResult result =
// tEnv.executeSql(
// "INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");

TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
"INSERT INTO sink SELECT NAME, WEIGHT FROM debezium_source");

waitForSnapshotStarted("sink");
// waitForSnapshotStarted("sink");

try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {

statement.execute(
"UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
// statement.execute(
// "UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
// statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
// statement.execute(
// "INSERT INTO DB2INST1.PRODUCTS VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
// statement.execute(
// "UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
// statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
// statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
}

waitForSinkSize("sink", 20);
waitForSinkSize("sink", 2);

/*
* <pre>
Expand All @@ -149,16 +155,12 @@ public void testConsumingAllEvents()

String[] expected =
new String[] {
"scooter,3.140",
"car battery,8.100",
"12-pack drill bits,0.800",
"hammer,2.625",
"rocks,5.100",
"jacket,0.600",
"spare tire,22.200"
"+I(jacket,0.200)",
"+I(jacket,0.200)"
};

List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
LOG.info("actual: {}", actual);
assertThat(actual, containsInAnyOrder(expected));

result.getJobClient().get().cancel().get();
Expand Down
20 changes: 10 additions & 10 deletions flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ CREATE TABLE DB2INST1.PRODUCTS2 (
WEIGHT FLOAT
);

INSERT INTO DB2INST1.PRODUCTS(NAME,DESCRIPTION,WEIGHT)
VALUES ('scooter','Small 2-wheel scooter',3.14),
('car battery','12V car battery',8.1),
('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
('hammer','12oz carpenter''s hammer',0.75),
('hammer','14oz carpenter''s hammer',0.875),
('hammer','16oz carpenter''s hammer',1.0),
('rocks','box of assorted rocks',5.3),
('jacket','water resistent black wind breaker',0.1),
('spare tire','24 inch spare tire',22.2);
--INSERT INTO DB2INST1.PRODUCTS(NAME,DESCRIPTION,WEIGHT)
--VALUES ('scooter','Small 2-wheel scooter',3.14),
-- ('car battery','12V car battery',8.1),
-- ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
-- ('hammer','12oz carpenter''s hammer',0.75),
-- ('hammer','14oz carpenter''s hammer',0.875),
-- ('hammer','16oz carpenter''s hammer',1.0),
-- ('rocks','box of assorted rocks',5.3),
-- ('jacket','water resistent black wind breaker',0.1),
-- ('spare tire','24 inch spare tire',22.2);

INSERT INTO DB2INST1.PRODUCTS1(NAME,DESCRIPTION,WEIGHT)
VALUES ('scooter','Small 2-wheel scooter',3.14),
Expand Down

0 comments on commit bf7fe76

Please sign in to comment.