Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaV2SinkConnector #38973

Merged
merged 15 commits into from
Mar 14, 2024
Merged

KafkaV2SinkConnector #38973

merged 15 commits into from
Mar 14, 2024

Conversation

xinlian12
Copy link
Member

@xinlian12 xinlian12 commented Feb 27, 2024

Feature request #38769

In this PR, we added the kafka CosmosDB Sink Connector V2 version.

Config

  • kafka.connect.cosmos.accountEndpoint -> No default. Cosmos DB Account Endpoint Uri
  • kafka.connect.cosmos.accountKey -> No default. Cosmos DB Account Key
  • kafka.connect.cosmos.useGatewayMode -> Default false. Flag to indicate whether to use gateway mode. By default it is false.
  • kafka.connect.cosmos.preferredRegionsList -> Default empty list. Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., [East US, West US] or East US, West US) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions here
  • kafka.connect.cosmos.applicationName -> Default empty string. Will be added as the userAgent suffix.
  • kafka.connect.cosmos.sink.database.name -> No Default. CosmosDb database name.
  • kafka.connect.cosmos.sink.containers.topicMap -> No Default. A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2.
  • kafka.connect.cosmos.sink.errors.tolerance -> Default None. Error tolerance level after exhausting all retries. None for fail on error. All for log and continue
  • kafka.connect.cosmos.sink.bulk.enabled -> Default true. Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector.
  • kafka.connect.cosmos.sink.bulk.maxConcurrentCosmosPartitions -> Default -1. Usually this is only required to be tuned for large containers. Cosmos DB Item Write Max Concurrent Cosmos Partitions. If not specified it will be determined based on the number of the container's physical partitions which would indicate every batch is expected to have data from all Cosmos physical partitions. If specified it indicates from at most how many Cosmos Physical Partitions each batch contains data. So this config can be used to make bulk processing more efficient when input data in each batch has been repartitioned to balance to how many Cosmos partitions each batch needs to write. This is mainly useful for very large containers (with hundreds of physical partitions.
  • kafka.connect.cosmos.sink.bulk.initialBatchSize -> Default 1. Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 1. Reduce this when you want to avoid that the first few requests consume too many RUs.
  • kafka.connect.cosmos.sink.write.strategy -> Default ItemOverwrite. Cosmos DB Item write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore pre-existing items i.e., Conflicts), ItemDelete (deletes based on id/pk of data frame), ItemDeleteIfNotModified (deletes based on id/pk of data frame if etag hasn't changed since collecting id/pk), ItemOverwriteIfNotModified (using create if etag is empty, update/replace with etag pre-condition otherwise, if document was updated the pre-condition failure is ignored)
  • kafka.connect.cosmos.sink.maxRetryCount -> Default 10. Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times.
  • kafka.connect.cosmos.sink.id.strategy -> Default ProvidedInValueStrategy. A strategy used to populate the document with an id. Valid strategies are: TemplateStrategy, FullKeyStrategy, KafkaMetadataStrategy, ProvidedInKeyStrategy, ProvidedInValueStrategy. Configuration properties prefixed withid.strategy are passed through to the strategy. For example, when using id.strategy=TemplateStrategy , the property id.strategy.template is passed through to the template strategy and used to specify the template string to be used in constructing the id.

@xinlian12 xinlian12 force-pushed the kafkaV2SinkConnector-2 branch from 20587ec to ac1a96f Compare February 27, 2024 23:16
@azure-sdk
Copy link
Collaborator

azure-sdk commented Feb 27, 2024

API change check

APIView has identified API level changes in this PR and created following API reviews.

com.azure.cosmos.kafka:azure-cosmos-kafka-connect

@xinlian12 xinlian12 force-pushed the kafkaV2SinkConnector-2 branch from 83ff738 to 223d6d7 Compare March 1, 2024 01:56
@xinlian12 xinlian12 requested a review from mssfang as a code owner March 1, 2024 16:45
@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

.getCosmosAsyncContainerAccessor()
.getLinkWithoutTrailingSlash(container),
null,
new DocumentCollection())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why new DocumentCollection and not null? I think his param is meant to be used when asynccache has an instance believed to be stale?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both works. The value used here just mean if the cached value is not the same as the staled value, then use the cached value directly. So null or new DocumentCollection() would have the same effect

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUt null is cheaper (no new garbage instantiation)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change in next PR

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

/azp run java - cosmos - tests

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12 xinlian12 merged commit 6c983ab into Azure:main Mar 14, 2024
73 checks passed
xinlian12 pushed a commit to xinlian12/azure-sdk-for-java that referenced this pull request Mar 25, 2024
kushagraThapar pushed a commit that referenced this pull request Mar 26, 2024
* Revert "KafkaV2SinkConnector (#38973)"

This reverts commit 6c983ab.

* Revert "UsingTestContainerForKafkaIntegrationTests (#38884)"

This reverts commit 12bec49.

* Revert "KafkaV2SourceConnector (#38748)"

This reverts commit 30835d9.

* revert one more change

* revert change

---------

Co-authored-by: annie-mac <[email protected]>
@xinlian12 xinlian12 mentioned this pull request Mar 27, 2024
drielenr pushed a commit that referenced this pull request Apr 2, 2024
* add sink connector v2 implementation


---------

Co-authored-by: annie-mac <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants