Skip to content

Commit

Permalink
fix: regenerate query plans for join-with-custom-timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Feb 26, 2020
1 parent 5b4058c commit aa40d7b
Show file tree
Hide file tree
Showing 63 changed files with 6,804 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM S1 (ROWKEY BIGINT KEY, ID BIGINT, NAME STRING, TS BIGINT) WITH (KAFKA_TOPIC='s1', KEY='ID', TIMESTAMP='TS', VALUE_FORMAT='AVRO');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "S1",
"schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `TS` BIGINT",
"keyField" : "ID",
"timestampColumn" : {
"column" : "TS",
"format" : null
},
"topicName" : "s1",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM S2 (ROWKEY BIGINT KEY, ID BIGINT, F1 STRING, F2 STRING) WITH (KAFKA_TOPIC='s2', KEY='ID', VALUE_FORMAT='AVRO');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "S2",
"schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `F1` STRING, `F2` STRING",
"keyField" : "ID",
"timestampColumn" : null,
"topicName" : "s2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM S1_JOIN_S2 WITH (TIMESTAMP='TS') AS SELECT\n S1.ID ID,\n S1.NAME NAME,\n S1.TS TS,\n S2.F1 F1,\n S2.F2 F2\nFROM S1 S1\nINNER JOIN S2 S2 WITHIN 11 SECONDS ON ((S1.ID = S2.ID))\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "S1_JOIN_S2",
"schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `TS` BIGINT, `F1` STRING, `F2` STRING",
"keyField" : "ID",
"timestampColumn" : {
"column" : "TS",
"format" : null
},
"topicName" : "S1_JOIN_S2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "S1", "S2" ],
"sink" : "S1_JOIN_S2",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "S1_JOIN_S2"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamStreamJoinV1",
"properties" : {
"queryContext" : "Join"
},
"joinType" : "INNER",
"leftInternalFormats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"rightInternalFormats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"leftSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasLeft"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Left/Source"
},
"topicName" : "s1",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : {
"column" : "TS",
"format" : null
},
"sourceSchema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `TS` BIGINT"
},
"selectExpressions" : [ "ID AS S1_ID", "NAME AS S1_NAME", "TS AS S1_TS", "ROWTIME AS S1_ROWTIME", "ROWKEY AS S1_ROWKEY" ]
},
"rightSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasRight"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Right/Source"
},
"topicName" : "s2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `F1` STRING, `F2` STRING"
},
"selectExpressions" : [ "ID AS S2_ID", "F1 AS S2_F1", "F2 AS S2_F2", "ROWTIME AS S2_ROWTIME", "ROWKEY AS S2_ROWKEY" ]
},
"beforeMillis" : 11.000000000,
"afterMillis" : 11.000000000
},
"selectExpressions" : [ "S1_ID AS ID", "S1_NAME AS NAME", "S1_TS AS TS", "S2_F1 AS F1", "S2_F2 AS F2" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "S1_JOIN_S2",
"timestampColumn" : {
"column" : "TS",
"format" : null
}
},
"queryId" : "CSAS_S1_JOIN_S2_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/tmp/confluent8609378800848380873",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"version" : "6.0.0",
"timestamp" : 1582734336704,
"schemas" : {
"CSAS_S1_JOIN_S2_0.KafkaTopic_Left.Source" : "STRUCT<ID BIGINT, NAME VARCHAR, TS BIGINT> NOT NULL",
"CSAS_S1_JOIN_S2_0.KafkaTopic_Right.Source" : "STRUCT<ID BIGINT, F1 VARCHAR, F2 VARCHAR> NOT NULL",
"CSAS_S1_JOIN_S2_0.Join.Left" : "STRUCT<S1_ID BIGINT, S1_NAME VARCHAR, S1_TS BIGINT, S1_ROWTIME BIGINT, S1_ROWKEY BIGINT> NOT NULL",
"CSAS_S1_JOIN_S2_0.Join.Right" : "STRUCT<S2_ID BIGINT, S2_F1 VARCHAR, S2_F2 VARCHAR, S2_ROWTIME BIGINT, S2_ROWKEY BIGINT> NOT NULL",
"CSAS_S1_JOIN_S2_0.S1_JOIN_S2" : "STRUCT<ID BIGINT, NAME VARCHAR, TS BIGINT, F1 VARCHAR, F2 VARCHAR> NOT NULL"
},
"inputs" : [ {
"topic" : "s1",
"key" : 0,
"value" : {
"ID" : 0,
"NAME" : "zero",
"TS" : 0
},
"timestamp" : 0
}, {
"topic" : "s2",
"key" : 0,
"value" : {
"ID" : 0,
"F1" : "blah",
"F2" : "foo"
},
"timestamp" : 10000
}, {
"topic" : "s2",
"key" : 10,
"value" : {
"ID" : 10,
"F1" : "foo",
"F2" : "bar"
},
"timestamp" : 13000
}, {
"topic" : "s1",
"key" : 10,
"value" : {
"ID" : 10,
"NAME" : "100",
"TS" : 11000
},
"timestamp" : 22000
}, {
"topic" : "s1",
"key" : 0,
"value" : {
"ID" : 0,
"NAME" : "jan",
"TS" : 8000
},
"timestamp" : 33000
} ],
"outputs" : [ {
"topic" : "S1_JOIN_S2",
"key" : 0,
"value" : {
"ID" : 0,
"NAME" : "zero",
"TS" : 0,
"F1" : "blah",
"F2" : "foo"
},
"timestamp" : 0
}, {
"topic" : "S1_JOIN_S2",
"key" : 10,
"value" : {
"ID" : 10,
"NAME" : "100",
"TS" : 11000,
"F1" : "foo",
"F2" : "bar"
},
"timestamp" : 11000
}, {
"topic" : "S1_JOIN_S2",
"key" : 0,
"value" : {
"ID" : 0,
"NAME" : "jan",
"TS" : 8000,
"F1" : "blah",
"F2" : "foo"
},
"timestamp" : 8000
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [s1])
--> KSTREAM-TRANSFORMVALUES-0000000001
Source: KSTREAM-SOURCE-0000000003 (topics: [s2])
--> KSTREAM-TRANSFORMVALUES-0000000004
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> PrependAliasLeft
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-TRANSFORMVALUES-0000000004 (stores: [])
--> PrependAliasRight
<-- KSTREAM-SOURCE-0000000003
Processor: PrependAliasLeft (stores: [])
--> Join-this-windowed
<-- KSTREAM-TRANSFORMVALUES-0000000001
Processor: PrependAliasRight (stores: [])
--> Join-other-windowed
<-- KSTREAM-TRANSFORMVALUES-0000000004
Processor: Join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000009-store])
--> Join-other-join
<-- PrependAliasRight
Processor: Join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000008-store])
--> Join-this-join
<-- PrependAliasLeft
Processor: Join-other-join (stores: [KSTREAM-JOINTHIS-0000000008-store])
--> Join-merge
<-- Join-other-windowed
Processor: Join-this-join (stores: [KSTREAM-JOINOTHER-0000000009-store])
--> Join-merge
<-- Join-this-windowed
Processor: Join-merge (stores: [])
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Loading

0 comments on commit aa40d7b

Please sign in to comment.