Skip to content

Commit

Permalink
Fixed incorrect key schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
thake committed Sep 28, 2020
1 parent 9c2cc89 commit bc58532
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ object SourceRecordFields {
private const val RECORD_TIMESTAMP = "ts_ms"
private const val TRANSACTION = "txId"
private const val SCN = "scn"
const val NAME = "name"
private const val OWNER = "schema"
private const val TABLE = "table"
private const val CHANGE_USER = "user"
Expand Down Expand Up @@ -59,12 +58,12 @@ class ConnectSchemaFactory(private val nameService: ConnectNameService) {
private fun createKeyStruct(cdcRecord: CdcRecord): Struct {
val schema = cdcRecord.dataSchema.keySchema
val struct = Struct(schema)
val sourceMap = when (cdcRecord.operation) {
Operation.READ, Operation.INSERT ->
cdcRecord.after
Operation.DELETE, Operation.UPDATE -> cdcRecord.before
}!!
schema.fields().forEach {
val sourceMap = when (cdcRecord.operation) {
Operation.READ, Operation.INSERT ->
cdcRecord.after
Operation.DELETE, Operation.UPDATE -> cdcRecord.before
}!!
struct.put(it.name(), sourceMap[it.name()])
}
return struct
Expand All @@ -73,6 +72,7 @@ class ConnectSchemaFactory(private val nameService: ConnectNameService) {
private fun createValue(record: CdcRecord): Pair<Schema, Struct> {
val name = nameService.getValueRecordName(record.table)
val recordConnectSchema = record.dataSchema.valueSchema

val valueSchema = SchemaBuilder.struct()
.name(name)
.field(CdcRecordFields.OPERATION, Schema.STRING_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SchemaService(
return SchemaDefinition(
table,
valueSchemaBuilder.optional().build(),
keySchemaBuilder.optional().build(),
keySchemaBuilder.required().build(),
columnTypes
)
}
Expand Down

0 comments on commit bc58532

Please sign in to comment.