Skip to content

Commit

Permalink
Fix the default flow of cdc cursor (#46961)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Oct 17, 2024
1 parent 9016ce9 commit 0d3bb81
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ interface MetadataQuerier : AutoCloseable {
/** An implementation might open a connection to build a [MetadataQuerier] instance. */
fun session(config: T): MetadataQuerier
}

fun commonCursorOrNull(cursorColumnID: String): FieldOrMetaField?
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import io.airbyte.cdk.command.StreamInputState
import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.ArrayAirbyteSchemaType
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.discover.CommonMetaField
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.discover.MetaField
Expand Down Expand Up @@ -198,12 +197,13 @@ class StateManagerFactory(
if (cursorColumnIDComponents.isEmpty()) {
return null
}

val cursorColumnID: String = cursorColumnIDComponents.joinToString(separator = ".")
if (cursorColumnID == CommonMetaField.CDC_LSN.id) {
return CommonMetaField.CDC_LSN
}
return dataColumnOrNull(cursorColumnID)
val maybeCursorField: FieldOrMetaField? =
metadataQuerier.commonCursorOrNull(cursorColumnID)
return maybeCursorField ?: dataColumnOrNull(cursorColumnID)
}

val configuredPrimaryKey: List<Field>? =
configuredStream.primaryKey?.asSequence()?.let { pkOrNull(it.toList()) }
val configuredCursor: FieldOrMetaField? =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ class ResourceDrivenMetadataQuerierFactory(
}

override fun extraChecks() {}
override fun commonCursorOrNull(cursorColumnID: String): FieldOrMetaField? {
return when (cursorColumnID) {
CommonMetaField.CDC_LSN.id -> CommonMetaField.CDC_LSN
else -> null
}
}

override fun close() {
isClosed = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ class JdbcMetadataQuerier(
checkQueries.executeAll(conn)
}

override fun commonCursorOrNull(cursorColumnID: String): FieldOrMetaField? {
return when (cursorColumnID) {
CommonMetaField.CDC_LSN.id -> CommonMetaField.CDC_LSN
else -> null
}
}

override fun close() {
log.info { "Closing JDBC connection." }
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
dockerImageTag: 0.0.24
dockerImageTag: 0.0.25
dockerRepository: airbyte/source-mysql-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.check.JdbcCheckQueries
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.discover.JdbcMetadataQuerier
import io.airbyte.cdk.discover.MetadataQuerier
import io.airbyte.cdk.discover.TableName
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.read.SelectQueryGenerator
import io.airbyte.integrations.source.mysql.MysqlJdbcStreamFactory.MysqlCDCMetaFields
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Primary
Expand Down Expand Up @@ -175,6 +177,13 @@ class MysqlSourceMetadataQuerier(
return memoizedPrimaryKeys[table] ?: listOf()
}

override fun commonCursorOrNull(cursorColumnID: String): FieldOrMetaField? {
return when (cursorColumnID) {
MysqlCDCMetaFields.CDC_CURSOR.id -> MysqlCDCMetaFields.CDC_CURSOR
else -> null
}
}

private data class AllPrimaryKeysRow(
val tableSchema: String,
val tableName: String,
Expand Down

0 comments on commit 0d3bb81

Please sign in to comment.