Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Master]Add Change Data Capture (CDC) APIs to stream data out of YugabyteDB #9019

Open
suranjan opened this issue Jun 22, 2021 · 2 comments
Open
Assignees
Labels

Comments

@suranjan
Copy link
Contributor

suranjan commented Jun 22, 2021

Motivation

  • Without Change Data Capture (CDC), database extraction is a cumbersome process in which you move the entire contents of tables into flat files, and then load the files into the data warehouse. This ad hoc approach is expensive in several ways.
  • Without CDC, for staging, the entire contents of tables are moved into flat files and interfaces become error-prone and manpower intensive to administer
  • Without CDC, It becomes expensive because you must write and maintain the capture software yourself, or purchase it from a third-party vendor.
  • So, we need an efficient, distributed, row-level change data capture (CDC) feed into a configurable sink for downstream processing such as reporting,full-text indexes, analytics engines, or big data pipelines.
  • Applications can use change streams to subscribe to all data changes on a single table, a database, or an entire deployment, and immediately react to them.

Phase 1

Status Subtask GitHub Issue Estimated Time
Implement the CDC Lifecycle API
Implement the GetChanges method of CDC API #9022
Define the CDCEvent Structure #9020
Develop Simple Console Client #9021
Support Snapshot of the table before the start of the CDC
Allow DDL changes to be propagated
🛠 Build a Kafka Source Connector (Debezium) #11855
⬜️ Support reading the 'before image' of a change

Phase 2

Status Subtask GitHub Issue
⬜️ Remove dependency on 'Kafka'
⬜️ Support UDT datatype for CDC
⬜️ Support Row Level Security
⬜️ Support Metrics for tracking CDC state

The following issues are also being tracked and are under our plan for future releases:

@suranjan suranjan added the area/cdcsdk CDC SDK label Jun 22, 2021
@ymahajan ymahajan changed the title Define the CDC API Add Change Data Capture (CDC) APIs to stream data out of YugabyteDB Jul 21, 2021
@ymahajan ymahajan changed the title Add Change Data Capture (CDC) APIs to stream data out of YugabyteDB [Master]Add Change Data Capture (CDC) APIs to stream data out of YugabyteDB Jul 21, 2021
suranjan added a commit that referenced this issue Jan 28, 2022
Summary:
Added following CDC admin APIs. The CDC stream is created at the namespace level. It creates the stream on all the tables present at that point in time.
This Stream ID can be used by the CDC client to get the data of the tables. Currently, any table created in this namespace after this command is executed, will not have the stream created for it automatically. The user will need to alter this stream to add/remove the table id.

**Case 1: No tables are present in the database**
```
$ ./yb-admin create_change_data_stream demo
CDC Stream ID: 3ea999960f5f4fe49d5526079cd0eec4

$ ./yb-admin list_change_data_streams
CDC Streams:
streams {
  stream_id: "3ea999960f5f4fe49d5526079cd0eec4"
  options {
    key: "id_type"
    value: "NAMESPACEID"
  }
  options {
    key: "checkpoint_type"
    value: "EXPLICIT"
  }
  options {
    key: "source_type"
    value: "CDCSDK"
  }
  options {
    key: "record_format"
    value: "PROTO"
  }
  options {
    key: "record_type"
    value: "CHANGE"
  }
  options {
    key: "state"
    value: "ACTIVE"
  }
}

$ ./yb-admin get_change_data_stream_info 3ea999960f5f4fe49d5526079cd0eec4
CDC DB Stream Info:
namespace_id: "000033e1000030008000000000000000"

$ ./yb-admin delete_change_data_stream 3ea999960f5f4fe49d5526079cd0eec4
Successfully deleted Change Data Stream ID: 3ea999960f5f4fe49d5526079cd0eec4
```

**Case 2: Some tables are present in the DB**
In this case, we will be getting the table IDs of the respective tables in the relevant responses
```
$ ./yb-admin create_change_data_stream demo
CDC Stream ID: a6e987dbc2af4516b02ab53dfd01cf56

$ ./yb-admin list_change_data_streams
CDC Streams:
streams {
  stream_id: "a6e987dbc2af4516b02ab53dfd01cf56"
  table_id: "000033e1000030008000000000004000"
  table_id: "000033e1000030008000000000004005"
  options {
    key: "id_type"
    value: "NAMESPACEID"
  }
  options {
    key: "checkpoint_type"
    value: "EXPLICIT"
  }
  options {
    key: "source_type"
    value: "CDCSDK"
  }
  options {
    key: "record_format"
    value: "PROTO"
  }
  options {
    key: "record_type"
    value: "CHANGE"
  }
  options {
    key: "state"
    value: "ACTIVE"
  }
}

$ ./yb-admin get_change_data_stream_info a6e987dbc2af4516b02ab53dfd01cf56
CDC DB Stream Info:
table_info {
  stream_id: "a6e987dbc2af4516b02ab53dfd01cf56"
  table_id: "000033e1000030008000000000004000"
}
table_info {
  stream_id: "a6e987dbc2af4516b02ab53dfd01cf56"
  table_id: "000033e1000030008000000000004005"
}
namespace_id: "000033e1000030008000000000000000"

$ ./yb-admin delete_change_data_stream a6e987dbc2af4516b02ab53dfd01cf56
Successfully deleted Change Data Stream ID: a6e987dbc2af4516b02ab53dfd01cf56
```

Test Plan: Added unit tests. Have tested it manually.

Reviewers: rahuldesirazu, nicolas, sergei, vkushwaha

Reviewed By: sergei, vkushwaha

Subscribers: zyu, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D14605
@ma-hartma
Copy link

I'm really looking forward to this, especially the possibility of supporting different 'sinks' or 'connectors'!

As already stated in #2513 a connector for NATS Jetstream would be awesome.

suranjan added a commit that referenced this issue Mar 3, 2022
Summary:
Github Master Ticket: #9019
Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

  - Added a new CDC Type EXTERNAL
  - Have added CDCSDK naming conventions to avoid confusion with common codes with cluster
  - Read the changes from IntentDB for UPDATE_TRANSACTION_OP op type
  - Batch the changes from IntentDB depending on the maximum batch size defined by cdc_max_stream_intent_records
  - Send CDCSDKCheckpoint with every record
  - CDCSDKCheckpoint will have term, index, reverse_index_key, and write_id
  - Mark the record as INSERT/UPDATE/DELETE depending on the type of operations that were performed.
  - An update of the primary key will generate two events DELETE and INSERT
  - UPDATE of multiple columns is 'broken' into multiple records of single column UPDATE record in case of multi-shard transactions
  - Send the DDL events found in the WAL to the subscriber

Test Plan:
We have unit tests as well as tests using the ConsoleSubscriber.

  - Added CPP Unit tests to verify op types INSERT/UPDATE/DELETE types
  - Verify the ordering of events of records
  - Added Java Unit tests using CDC Java Console Client, verifying
  - Multiple data types (To be enhanced)
  - Large SQL scripts with varying DMLs with the expected output
  - Correctness in case of the composite primary key

We also have run long-running tests with failover to verify if the number of streamed are expected.

Reviewers: bogdan, nicolas, rahuldesirazu, sergei

Reviewed By: rahuldesirazu, sergei

Subscribers: amartsinchyk, kannan, kgupta, mihnea, iamoncar, sdash, jhe, ybase, vkushwaha

Differential Revision: https://phabricator.dev.yugabyte.com/D13838
suranjan added a commit that referenced this issue Mar 4, 2022
Summary:

Github Master Ticket: #9019
Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

This is the client-side change that exposes some APIs to be consumed by CDC consumers. Currently, these APIs are not public and are to be consumed by our Debezium connector. For testing purposes, we have written a console subscriber for testing purposes.

Test Plan:
Unit tests in java for APIs and CDC behavior.
We have done some long-running testing with applications.
We have also run the YB-sample apps and enabled CDC on the table. Verified that all the events are received.

Reviewers: nicolas, bogdan, ybase, ashetkar, nkumar, nikhil, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: vkushwaha, srangavajjula

Differential Revision: https://phabricator.dev.yugabyte.com/D13836
jayant07-yb pushed a commit to jayant07-yb/yugabyte-db that referenced this issue Mar 8, 2022
Summary:
Github Master Ticket: yugabyte#9019
Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

  - Added a new CDC Type EXTERNAL
  - Have added CDCSDK naming conventions to avoid confusion with common codes with cluster
  - Read the changes from IntentDB for UPDATE_TRANSACTION_OP op type
  - Batch the changes from IntentDB depending on the maximum batch size defined by cdc_max_stream_intent_records
  - Send CDCSDKCheckpoint with every record
  - CDCSDKCheckpoint will have term, index, reverse_index_key, and write_id
  - Mark the record as INSERT/UPDATE/DELETE depending on the type of operations that were performed.
  - An update of the primary key will generate two events DELETE and INSERT
  - UPDATE of multiple columns is 'broken' into multiple records of single column UPDATE record in case of multi-shard transactions
  - Send the DDL events found in the WAL to the subscriber

Test Plan:
We have unit tests as well as tests using the ConsoleSubscriber.

  - Added CPP Unit tests to verify op types INSERT/UPDATE/DELETE types
  - Verify the ordering of events of records
  - Added Java Unit tests using CDC Java Console Client, verifying
  - Multiple data types (To be enhanced)
  - Large SQL scripts with varying DMLs with the expected output
  - Correctness in case of the composite primary key

We also have run long-running tests with failover to verify if the number of streamed are expected.

Reviewers: bogdan, nicolas, rahuldesirazu, sergei

Reviewed By: rahuldesirazu, sergei

Subscribers: amartsinchyk, kannan, kgupta, mihnea, iamoncar, sdash, jhe, ybase, vkushwaha

Differential Revision: https://phabricator.dev.yugabyte.com/D13838
jayant07-yb pushed a commit to jayant07-yb/yugabyte-db that referenced this issue Mar 8, 2022
Summary:

Github Master Ticket: yugabyte#9019
Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

This is the client-side change that exposes some APIs to be consumed by CDC consumers. Currently, these APIs are not public and are to be consumed by our Debezium connector. For testing purposes, we have written a console subscriber for testing purposes.

Test Plan:
Unit tests in java for APIs and CDC behavior.
We have done some long-running testing with applications.
We have also run the YB-sample apps and enabled CDC on the table. Verified that all the events are received.

Reviewers: nicolas, bogdan, ybase, ashetkar, nkumar, nikhil, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: vkushwaha, srangavajjula

Differential Revision: https://phabricator.dev.yugabyte.com/D13836
suranjan added a commit that referenced this issue Mar 28, 2022
Summary:
Original commits:
- b04b58a / D14605
- 9ac94af / D14990
Added following CDC admin APIs. The CDC stream is created at the namespace level. It creates the stream on all the tables present at that point in time.
This Stream ID can be used by the CDC client to get the data of the tables. Currently, any table created in this namespace after this command is executed, will not have the stream created for it automatically. The user will need to alter this stream to add/remove the table id.

**Case 1: No tables are present in the database**
```
$ ./yb-admin create_change_data_stream demo
CDC Stream ID: 3ea999960f5f4fe49d5526079cd0eec4

$ ./yb-admin list_change_data_streams
CDC Streams:
streams {
  stream_id: "3ea999960f5f4fe49d5526079cd0eec4"
  options {
    key: "id_type"
    value: "NAMESPACEID"
  }
  options {
    key: "checkpoint_type"
    value: "EXPLICIT"
  }
  options {
    key: "source_type"
    value: "CDCSDK"
  }
  options {
    key: "record_format"
    value: "PROTO"
  }
  options {
    key: "record_type"
    value: "CHANGE"
  }
  options {
    key: "state"
    value: "ACTIVE"
  }
}

$ ./yb-admin get_change_data_stream_info 3ea999960f5f4fe49d5526079cd0eec4
CDC DB Stream Info:
namespace_id: "000033e1000030008000000000000000"

$ ./yb-admin delete_change_data_stream 3ea999960f5f4fe49d5526079cd0eec4
Successfully deleted Change Data Stream ID: 3ea999960f5f4fe49d5526079cd0eec4
```

**Case 2: Some tables are present in the DB**
In this case, we will be getting the table IDs of the respective tables in the relevant responses
```
$ ./yb-admin create_change_data_stream demo
CDC Stream ID: a6e987dbc2af4516b02ab53dfd01cf56

$ ./yb-admin list_change_data_streams
CDC Streams:
streams {
  stream_id: "a6e987dbc2af4516b02ab53dfd01cf56"
  table_id: "000033e1000030008000000000004000"
  table_id: "000033e1000030008000000000004005"
  options {
    key: "id_type"
    value: "NAMESPACEID"
  }
  options {
    key: "checkpoint_type"
    value: "EXPLICIT"
  }
  options {
    key: "source_type"
    value: "CDCSDK"
  }
  options {
    key: "record_format"
    value: "PROTO"
  }
  options {
    key: "record_type"
    value: "CHANGE"
  }
  options {
    key: "state"
    value: "ACTIVE"
  }
}

$ ./yb-admin get_change_data_stream_info a6e987dbc2af4516b02ab53dfd01cf56
CDC DB Stream Info:
table_info {
  stream_id: "a6e987dbc2af4516b02ab53dfd01cf56"
  table_id: "000033e1000030008000000000004000"
}
table_info {
  stream_id: "a6e987dbc2af4516b02ab53dfd01cf56"
  table_id: "000033e1000030008000000000004005"
}
namespace_id: "000033e1000030008000000000000000"

$ ./yb-admin delete_change_data_stream a6e987dbc2af4516b02ab53dfd01cf56
Successfully deleted Change Data Stream ID: a6e987dbc2af4516b02ab53dfd01cf56
```

[#11205] Trivial RPC methods

Currently, most of our RPC endpoints has a trivial implementation, which takes request, synchronously process, and sends the response.
But our generated service methods allow asynchronous processing, which complicates the writing of every method, even it doesn't require asynchronous processing.
This diff adds the ability to specify that method is trivial.
So his service virtual method will have signature:
```Result<ResponsePB> Method(const RequestPB& req, CoarseTimePoint deadline);```
instead of
```void Method(const RequestPB req, ResponsePB* resp, RpcContext context);```

Using std instead of boost lib for declval

Test Plan:
Jenkins: skip
Added unit tests. Have tested it manually.

ybd --gtest_filter RpcStubTest.Trivial

Reviewers: rahuldesirazu, nicolas, vkushwaha, dmitry, sergei

Reviewed By: sergei

Subscribers: bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D16228
suranjan added a commit that referenced this issue Mar 29, 2022
Summary:
Original commit: 1371944 / D13838
Github Master Ticket: #9019
Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

  - Added a new CDC Type EXTERNAL
  - Have added CDCSDK naming conventions to avoid confusion with common codes with cluster
  - Read the changes from IntentDB for UPDATE_TRANSACTION_OP op type
  - Batch the changes from IntentDB depending on the maximum batch size defined by cdc_max_stream_intent_records
  - Send CDCSDKCheckpoint with every record
  - CDCSDKCheckpoint will have term, index, reverse_index_key, and write_id
  - Mark the record as INSERT/UPDATE/DELETE depending on the type of operations that were performed.
  - An update of the primary key will generate two events DELETE and INSERT
  - UPDATE of multiple columns is 'broken' into multiple records of single column UPDATE record in case of multi-shard transactions
  - Send the DDL events found in the WAL to the subscriber

Fix compilation error

Test Plan:
We have unit tests as well as tests using the ConsoleSubscriber.

  - Added CPP Unit tests to verify op types INSERT/UPDATE/DELETE types
  - Verify the ordering of events of records
  - Added Java Unit tests using CDC Java Console Client, verifying
  - Multiple data types (To be enhanced)
  - Large SQL scripts with varying DMLs with the expected output
  - Correctness in case of the composite primary key

We also have run long-running tests with failover to verify if the number of streamed are expected.

Reviewers: bogdan, nicolas, rahuldesirazu, sergei

Reviewed By: sergei

Differential Revision: https://phabricator.dev.yugabyte.com/D16235
suranjan pushed a commit that referenced this issue Mar 29, 2022
…Subscriber

Summary:
Original commits:
- d294abf / D13836
- 6b15b16 / D15860
- cf5fead / D16057
- 5408e30 / D15989
- 53afee99e7cfe912fa7ec579ce470e999701a074 / D16176

Github Master Ticket: #9019

Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

This is the client-side change that exposes some APIs to be consumed by CDC consumers. Currently, these APIs are not public and are to be consumed by our Debezium connector. For testing purposes, we have written a console subscriber for testing purposes.

[CDCSDK][#11679] Add missing license headers to Java files

The following files have missing license headers:
1. `CreateCDCStreamRequest.java`
2. `GetCheckpointRequest.java`
3. `GetCheckpointResponse.java`
4. `GetDBStreamInfoRequest.java`
5. `GetDBStreamInfoResponse.java`
6. `SetCheckpointRequest.java`
7. `SetCheckpointResponse.java`

This diff adds these missing headers.

[#11779][CDCSDK] Add option to send a DDL record based on a flag value in GetChangesRequest

Before this, the issue was that if for a stream ID, some data was consumed and a client comes up with the same stream ID and requests for changes, it will only receive the changes.

Now the issue with this was with `Debezium` that when the connector was restarted, it directly received the changes without any DDL record, this DDL record was essential for Debezium since it was used to process the schema info for the columns in Debezium and in case it was not there, it lead to a `NullPointerException` on the client side, thus causing a connector crash effectively.

[#11729][DocDB][xCluster] Fix for replication not working if user upgrades to a branch with CDCSDK code changes

With the changes for CDCSDK, we have separate `source_type` values i.e. `XCLUSTER` for xCluster replication and `CDCSDK` for the new changes. Similarly there is another option i.e. `checkpoint_type` which can have `IMPLICIT` and `EXPLICIT` values.

If a stream for replication has been created before upgrading, it was unable to continue replication after upgrading to the latest version since the `source_type` and `checkpoint_type` options were missing from it as it has only been introduced with the CDCSDK changes only.

Test Fixes for 2.12

Test Plan:
Jenkins: skip
Unit tests in java for APIs and CDC behavior.
We have done some long-running testing with applications.
We have also run the YB-sample apps and enabled CDC on the table. Verified that all the events are received.

Tested the complete CDC with Debezium pipeline with the specified change.

Command to run test:
`ybd --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNeedSchemaInfoFlag`

* Manually tested with a custom build on dev portal

Reviewers: nicolas, bogdan, ybase, rahuldesirazu, sdash, iamoncar, zyu, jhe, mkantimath, sergei

Reviewed By: sergei

Differential Revision: https://phabricator.dev.yugabyte.com/D16251
@gedw99
Copy link

gedw99 commented Jun 7, 2023

Going to add my vote for nats. It’s just much more flexible than Kafka.

and way easier to manage

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants