Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support exchange partition in TiCDC #639

Closed
1 task
amyangfei opened this issue Jun 10, 2020 · 24 comments
Closed
1 task

Support exchange partition in TiCDC #639

amyangfei opened this issue Jun 10, 2020 · 24 comments
Assignees
Labels
component/ddl DDL component. difficulty/medium Medium task. help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines.

Comments

@amyangfei
Copy link
Contributor

amyangfei commented Jun 10, 2020

Feature Request

Is your feature request related to a problem? Please describe:

pingcap/tidb#17149 introduces a new DDL

ALTER TABLE target_table 
  EXCHANGE PARTITION target_partition 
  WITH TABLE source_table 

The partition table(in target table) id and source table id will be exchanged. TableInfo in job.BinlogInfo records the new table info of target_table

Describe the feature you'd like:

  • Support exchange partition in TiCDC

Teachability, Documentation, Adoption, Migration Strategy:

TiCDC should compare the old table info with the new table info of target_table, and found the exchanged partition table id and source table id

@amyangfei amyangfei added component/ddl DDL component. help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. labels Jun 10, 2020
@amyangfei amyangfei changed the title Support exchange partition in CDC Support exchange partition in TiCDC Jun 28, 2020
@BowenXiao1999
Copy link

How to test our program?

@BowenXiao1999
Copy link

BowenXiao1999 commented Jun 29, 2020

Another question. Is it a good way to solve it like this:

  1. The DDL job is already packed for usage, so we do not need to care about feed or job init for exchange partition. For example, job.Type is encoded into JSON and will be unmarshal into job automatically.

  2. add support function in schema_storage.go to maintain the schema snapshot on new exchange partition grammar. Seems like we also need to add support in applyJob, which update the status of changeFeed.

  3. pass Job to sink.EmitDDLJob() without modification, which is already implemented. So again, nothing needs to be changes here.

I have just started looked at CDC for one hour. If there are some misunderstanding about the workflow, please inform me.

@amyangfei
Copy link
Contributor Author

amyangfei commented Jun 30, 2020

How to test our program?

Could add this DDL in integration test, including case partition_table and ddl_reentrant, and if we want to simulate the scenario that source table and target table locate in different captures, feel free to add more test cases.

  • integration test can be run with make integration_test_build and make integration_test CASE=xxx
  • the integration test uses master TiDB binary, which means we can execute ALTER TABLE target_table EXCHANGE PARTITION in upstream TiDB.

https://github.com/pingcap/ticdc/blob/3bac0045aac5adcf9eabfd09d822490ae132b754/scripts/jenkins_ci/integration_test_common.groovy#L3-L5

@amyangfei
Copy link
Contributor Author

amyangfei commented Jun 30, 2020

  • add support function in schema_storage.go to maintain the schema snapshot on new exchange partition grammar. Seems like we also need to add support in applyJob, which update the status of changeFeed.

Yep, we should update the schema storage, besides since the source table and target table could be located in different captures, the owner should re-schedule the table tasks if needed.

@BowenXiao1999
Copy link

BowenXiao1999 commented Jun 30, 2020

  • add support function in schema_storage.go to maintain the schema snapshot on new exchange partition grammar. Seems like we also need to add support in applyJob, which update the status of changeFeed.

Yep, we should update the schema storage, besides since the source table and target table could be located in different captures, the owner should re-schedule the table tasks if needed.

So the logic is:

  1. if source table and target table are in one capture, the processor on this capture exchange partition by manipulate its own schema storage.
  2. Otherwise, owner detect it and should re-schedule by add source table into target partition capture to make it work?

BTW, how to implement exchange? Swap the partition id with source table ID? (here ID works like a pointer).

We just need to update the schema in CDC cluster right? The Job will be generated for us and sink will responsible for downstream consistency.

@BowenXiao1999
Copy link

BowenXiao1999 commented Jul 1, 2020

Hi @amyangfei , I have started to code on the most simple case.
This is my way:

  1. get the partition id and table ID of source table and target table.

  2. update schemaStorage.partitionTable to make mapping of partition ID to tableInfo correct

  3. done

But I realize that it may not be the case that: I can not get the partition id of source table if it is not a partition table. So should I somehow change it to a partition table? If so, how to do it?

@BowenXiao1999
Copy link

The partition table(in target table) id and source table id will be exchanged. TableInfo in job.BinlogInfo records the new table info of target_table

  1. I know that we can get all partitions of target table, but how to find the target partition?

  2. How to get the source table ID?

@amyangfei
Copy link
Contributor Author

amyangfei commented Jul 2, 2020

The partition table(in target table) id and source table id will be exchanged. TableInfo in job.BinlogInfo records the new table info of target_table

  1. First we can get the target table ID from job.BinlogInfo, and all partition table IDs at the same time, naming this partition table ID list A.
  2. We can query TableInfo from schemaSnapshot (before this DDL), and then the old partition table IDs can be found in this TableInfo, naming it as partition table ID list B.
  3. Compare list-A with list-B, we can get one missing table ID (source table ID) in list-A, and one extra table ID (target partition ID) in list-B.

@amyangfei
Copy link
Contributor Author

  • add support function in schema_storage.go to maintain the schema snapshot on new exchange partition grammar. Seems like we also need to add support in applyJob, which update the status of changeFeed.

Yep, we should update the schema storage, besides since the source table and target table could be located in different captures, the owner should re-schedule the table tasks if needed.

So the logic is:

  1. if source table and target table are in one capture, the processor on this capture exchange partition by manipulate its own schema storage.
  2. Otherwise, owner detect it and should re-schedule by add source table into target partition capture to make it work?

BTW, how to implement exchange? Swap the partition id with source table ID? (here ID works like a pointer).

We just need to update the schema in CDC cluster right? The Job will be generated for us and sink will responsible for downstream consistency.

The case-2 (target table and source table in different captures) is a little complicated, seems MoveTableJob could be used, as we need to move source table to the capture that target table locates.

@BowenXiao1999
Copy link

Thanks for answering.
I think I get your point. The job will provide target table id and target partition listA (partition after DDL). Then we query for schemaStorage with target tableID. We will get a target partition listB, but it is the partition before DDL. Compare and we can find which is in listA but not in listB and which is not in listA but in listB. It is source table ID and target partition ID.

But still goes my questions:

  1. source table is not guaranteed to be a partition. When source table ID be a partition, is there something we need to transform? Or the source table ID is just equal to partition ID? What about source table has multiple partitions?

  2. Just like other mode.ActionPartition type, exchange is implemented by change s.partitionTable[source table id] = target tableInfo and s.partitionTable[target partition] = source TableInfo? Is it correct?

@amyangfei
Copy link
Contributor Author

amyangfei commented Jul 2, 2020

  • source table is not guaranteed to be a partition. When source table ID be a partition, is there something we need to transform? Or the source table ID is just equal to partition ID? What about source table has multiple partitions?
  1. source table is not a partition table.
  2. Basically like what you said. After exchange, the origin source table becomes a partition in target table, and the original target partition becomes the new source table. We should construct right table info.

@BowenXiao1999
Copy link

BowenXiao1999 commented Jul 2, 2020

source table is not a partition table.

So source table will not have multiple partitions. This kind of restriction is done by upstream logic. But we should transform it into a partition right? Cuz after DDL it becomes partition part of target table. Or table ID is also partition ID?

I'm not so sure about table ID and partition ID. For example, a partition table ID is 3, its partitions are [1, 2, 4, 5, 6]. Is it ok?

@BowenXiao1999
Copy link

BowenXiao1999 commented Jul 2, 2020

When I write the code for re-constructing tableInfo, I realize that source tableInfo don't need to be reconstruct. It is not a partition table (so we don't need to care about partition change?) and it has been wrapped with DB info. Is it the right way?

@amyangfei
Copy link
Contributor Author

When I write the code for re-constructing tableInfo, I realize that source tableInfo don't need to be reconstruct. It is not a partition table (so we don't need to care about partition change?) and it has been wrapped with DB info. Is it the right way?

yes, the source table keeps a normal table

@BowenXiao1999
Copy link

BowenXiao1999 commented Jul 4, 2020

How to test our program?

I got questions on this post.

Could add this DDL in integration test, including case partition_table and ddl_reentrant, and if we want to simulate the scenario that source table and target table locate in different captures, feel free to add more test cases.

  1. A little ambiguous. Make sure I' m right. if we pass make integration_test CASE=xxx, where CASE is partition table and ddl_reentrant, we can say we are ok with simple case for exchange partition?
    For more complicated case, do you mean we should write a test script about it (under tests/)? Or test more existing case scripts can be helpful?
  • integration test can be run with make integration_test_build and make integration_test CASE=xxx
  • the integration test uses master TiDB binary, which means we can execute ALTER TABLE target_table EXCHANGE PARTITION in upstream TiDB.
  1. Should I set up TiDB server by myself? The test posts told me to run tests by make sth. And I encounter some problems: Still get problems on test #716

@amyangfei
Copy link
Contributor Author

How to test our program?

I got questions on this post.

Could add this DDL in integration test, including case partition_table and ddl_reentrant, and if we want to simulate the scenario that source table and target table locate in different captures, feel free to add more test cases.

  1. A little ambiguous. Make sure I' m right. if we pass make integration_test CASE=xxx, where CASE is partition table and ddl_reentrant, we can say we are ok with simple case for exchange partition?
    For more complicated case, do you mean we should write a test script about it (under tests/)? Or test more existing case scripts can be helpful?
  • integration test can be run with make integration_test_build and make integration_test CASE=xxx
  • the integration test uses master TiDB binary, which means we can execute ALTER TABLE target_table EXCHANGE PARTITION in upstream TiDB.
  1. Should I set up TiDB server by myself? The test posts told me to run tests by make sth. And I encounter some problems: Still get problems on test #716

You can refer to this document how to run tests in TiCDC

@BowenXiao1999
Copy link

BowenXiao1999 commented Jul 6, 2020

Now I was working on applyJob logic for this feature.
This is my way:

  1. previous logic is similar to updatePartition, we should update changefeed.partitions[target_table_id] = newPartitions (get from job.TableInfo).

  2. But for the left oldIDs, we are not going to drop it (it is target_partition). Instead we should delete (?) it from changefeed.partitions because our sourceTable is not a partiton table.

it is different from schemaStorage, we don't need to get source table id and target table id because source table is not a partition table.

pr update: #718

@BowenXiao1999
Copy link

For complicate case, because the logic for this feature mainly happen in changefeed (but confused at why the processor do nothing) , so I'd like to solve it on its upstream function -- handleDDL of owner .

If we can somehow detect inconsistency of table location info, we can move sourceTable to target table capture.
image

@amyangfei
Copy link
Contributor Author

amyangfei commented Jul 7, 2020

it is different from schemaStorage, we don't need to get source table id and target table id because source table is not a partition table.

Yes, both step 1 and step are correct.

aplyJob mainly focus on table scheduling, the scheduling has two aspects:

  • the owner should have right information of the whole cluster
  • each table is replicated on the right capture

@amyangfei
Copy link
Contributor Author

For complicate case, because the logic for this feature mainly happen in changefeed (but confused at why the processor do nothing) , so I'd like to solve it on its upstream function -- handleDDL of owner .

If we can somehow detect inconsistency of table location info, we can move sourceTable to target table capture.
image

The defensive detect way maybe not safe. Even we don't need to move tables from one capture to another one, we can still update some necessary information in applyJob

@BowenXiao1999
Copy link

it is different from schemaStorage, we don't need to get source table id and target table id because source table is not a partition table.

Yes, both step 1 and step are correct.

aplyJob mainly focus on table scheduling, the scheduling has two aspects:

  • the owner should have right information of the whole cluster
  • each table is replicated on the right capture

I don't understand. applyJob is not a method of owner. Are you talking about complicate case? I think it should be done in somewhere before changefeed.applyJob(), For example:
image
(before or in owner.handleDDL)

I wrote a beta and can commit in these days.

@BowenXiao1999
Copy link

BowenXiao1999 commented Jul 7, 2020

For complicate case, because the logic for this feature mainly happen in changefeed (but confused at why the processor do nothing) , so I'd like to solve it on its upstream function -- handleDDL of owner .
If we can somehow detect inconsistency of table location info, we can move sourceTable to target table capture.
image

The defensive detect way maybe not safe. Even we don't need to move tables from one capture to another one, we can still update some necessary information in applyJob

Yes, I got your point. Just like createPartition or something else, we should drop old partitions even it's empty. BTW, I think you are talking about handleDDL of owner instead of applyDDL.

@BowenXiao1999
Copy link

new pr #718 .
My reason:
we should try replicate table to right capture before o.balanceTables() so that moveTable Job in o.manualSchedule can be executed.
That's why I add o.rebalanceTableIfNeed() between o.loadChangeFeed() and o.balanceTables().

I wrote it too fast so it's possible to be buggy. But it should express my idea in some extend. (Maybe I should change function name because I found a rebalanceTables() method in changefeed)

@asddongmen
Copy link
Contributor

close by #7118

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/ddl DDL component. difficulty/medium Medium task. help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines.
Projects
None yet
Development

No branches or pull requests

5 participants