Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create schemas at topic creation #4717

Merged
merged 3 commits into from
Mar 10, 2020

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Mar 5, 2020

fixes #4219

Description

To be honest, I don't love the way this problem is solved in this PR but I can't come up with anything better. Basically, when we create a new topic in the TopicCreateInjector we also register the schema with schema registry. This is easy for C* statements but is much more difficult for C*AS statements as we don't actually know the schema until we build the logical plan.

It is important, however, to create the schema before we enqueue onto the command topic so that we don't get into the situation where we re-create schemas while replaying the command topic.

Testing done

Unit test and local testing (note no data running through):

ksql> CREATE STREAM foo (id VARCHAR) WITH (value_format='avro', partitions=1, kafka_topic='foo');

 Message
----------------
 Stream created
----------------
ksql> CREATE STREAM bar WITH (value_format='avro', kafka_topic='foo');

 Message
----------------
 Stream created
----------------
ksql> DESCRIBE bar;

Name                 : BAR
 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 ID      | VARCHAR(STRING)
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra force-pushed the schema_at_create branch from e12981b to fe23730 Compare March 6, 2020 01:12
@agavra agavra force-pushed the schema_at_create branch from fe23730 to 0f0b1f5 Compare March 6, 2020 18:04
@agavra agavra marked this pull request as ready for review March 6, 2020 22:42
@agavra agavra requested a review from a team as a code owner March 6, 2020 22:42
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra

Some thoughts...

I'm not sure this works in a reliable way as it stands, as there is a race condition.

At the moment, the rest-server will create the schema when it ensures the topics exist. This is done by building the logical plan from for the query using the snapshot of the metastore. However, it's possible that when the command is actually executed that one or more of the sources in the metastore have changed. This could result in a different schema - one out of whack with the schema in the SR!

I think you can get around this by ensuring the building of the logical plan + registering of the schema is done within the transaction used to produce to the command topic. However, I think even then we've a potential edge case:

If one node starts the transaction and registers the schema, but another node then init's transactions before the first node produces and commits, then (I believe) the first node's transaction is aborted. So we have a schema registered, but not data source. When the data-source creation is retried, sources in the metastore may have changed, meaning the source now has a different, potentially incompatible schema.

I think the fix here would be to delete the schema from SR if the transaction is aborted. Likewise, have we thought about deleting any created topics if we fail to produce to the command topic? Maybe injectors need an undo method?. Same undo should be called if a later injector fails...

Switching to a metastore db model would potentially help here, as the single-writer pattern it would utilise would give us a single node responsible for performing such one-of tasks, without any race conditions.

Finally, as @rayokota stated, schema creation in the SR is idempotent. So... maybe a better design, (at least until we move to a metastore db), would be to register the schema when executing the C*AS statements after reading them from the command topic? This would ensure we're only ever registering a schema with the correct columns. Or course, we'd need to handle the registration failure scenario there, but we already check evolvability on the rest-server side, so chance of this are slim. If it does fail, I think it would be ok to put the query into a failed state and log an error message. User could then fix the issue, e.g. delete an old / bad schema, and restart to retry.

Comment on lines 443 to 455
@Test
public void shouldNotRegisterSchemaIfSchemaRegistryIsDisabled() {
// Given:
config = new KsqlConfig(ImmutableMap.of());
givenStatement("CREATE STREAM x (f1 VARCHAR) WITH(kafka_topic='expectedName', value_format='AVRO', partitions=1);");
when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10));

// When:
injector.inject(statement, builder);

// Then:
verifyNoMoreInteractions(schemaRegistryClient);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is missing some tests around what happens if schema registration fails...

@agavra
Copy link
Contributor Author

agavra commented Mar 9, 2020

@big-andy-coates - thanks for the review! You bring up a good point about the race conditions. Some comments inline (not necessarily in the order you typed):

If one node starts the transaction and registers the schema, but another node then init's transactions before the first node produces and commits, then (I believe) the first node's transaction is aborted. So we have a schema registered, but not data source. When the data-source creation is retried, sources in the metastore may have changed, meaning the source now has a different, potentially incompatible schema.

Since the likelihood of this happening is very small (multiple users trying to register the same source with different schemas on different nodes), I'm trying to see if it's worth fixing as part of this PR (I don't think it's trivial at all, see below). That basically means we need to trade off between the existing behavior and this bug. IMO, this fix is worth having even with the knowledge that the race condition exists.

I think the fix here would be to delete the schema from SR if the transaction is aborted. ... Maybe injectors need an undo method?.

I don't think this is sufficient. Imagine the scenario where the race condition actually registers a compatible schema. In this case, the second register call would succeed but the transaction fails. Then we delete the valid schema as part of the undo.

Also as an implementation, it's not trivial as it would require to make injectors stateful.

Finally, as @rayokota stated, schema creation in the SR is idempotent. So... maybe a better design, (at least until we move to a metastore db), would be to register the schema when executing the C*AS statements after reading them from the command topic? This would ensure we're only ever registering a schema with the correct columns. Or course, we'd need to handle the registration failure scenario there, but we already check evolvability on the rest-server side, so chance of this are slim. If it does fail, I think it would be ok to put the query into a failed state and log an error message. User could then fix the issue, e.g. delete an old / bad schema, and restart to retry.

Addressing this first as this could have otherwise have solved our problem. As mentioned in the description: "It is important to create the schema before we enqueue onto the command topic so that we don't get into the situation where we re-create schemas while replaying the command topic." We'd run into the same issues we used to get with topic creation during replay.

Switching to a metastore db model would potentially help here, as the single-writer pattern it would utilise would give us a single node responsible for performing such one-of tasks, without any race conditions.

I've been advocating for a single-writer independent of swithcing the to metastore db model 😉 and I think that's the best fix for lots of these types of race conditions. With the work done to route pull queries to other nodes, I imagine this may not even be that difficult.

@agavra agavra requested a review from big-andy-coates March 9, 2020 16:39
@agavra agavra requested a review from a team March 9, 2020 19:15
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing this first as this could have otherwise have solved our problem. As mentioned in the description: "It is important to create the schema before we enqueue onto the command topic so that we don't get into the situation where we re-create schemas while replaying the command topic." We'd run into the same issues we used to get with topic creation during replay.

What's wrong with publishing the schema to the SR from each node and on each restart? As long as the logic is correct, i.e. that we only do this for C*AS commands that will be started, then this would work as far as I can tell.

This would also avoid the race condition.

If there is some issue with this approach, can you explain it?

Sure, there's much more scope for bugs with this approach, (the whole recreation of schemas for terminated queries, etc). However, I'm sure you can avoid those when implementing ;). Plus, when we switch to a single writer pattern we can move away from this approach.

This seems like the better short term approach to me.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline chat - happy to proceed with this, knowing we've got the edge case we can fix later. Please raise a github issue to track the known edge case.

@agavra
Copy link
Contributor Author

agavra commented Mar 10, 2020

Opened #4751

@agavra agavra merged commit 514025d into confluentinc:master Mar 10, 2020
@agavra agavra deleted the schema_at_create branch March 10, 2020 23:42
@apurvam
Copy link
Contributor

apurvam commented Mar 11, 2020

Awesome to see this get in. Thanks @agavra for the initiative and @big-andy-coates for the quick review turnaround!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create Avro sink schema at time of stream/table creation
3 participants