Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent mod #4718

Closed
Closed

Conversation

big-andy-coates
Copy link
Contributor

Description

fixes: #4639

Until the Streams bug https://issues.apache.org/jira/browse/KAFKA-9668 is fixed, ksql needs to protect itself from ConcurrentMod exceptions when accessing KafkaSteams.allMetadata.

This change accesses the internals of KafkaStreams to acquire a reference to the field that needs to be synchronised to protect against the concurrent modification.

Testing done

Added unit test to check logic and ran manual testing.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

agavra and others added 30 commits February 11, 2020 14:10
* feat: enhance `PRINT TOPIC`'s format detection

fixes: confluentinc#4258

With this change `PRINT TOPIC` has enhanced detection for the key and value formats of a topic.
The command starts with a list of known formats for both the key and the value and refines this
list as it sees more data.

As the list of possible formats is refined over time the command will output the reduced list.
For example, you may see output such as:

```
ksql> PRINT some_topic FROM BEGINNING;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}}
rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window  TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}}
Key format: KAFKA_STRING
...
```

In the last line of the above output the command has narrowed the key format down as it has proceeded more data.

The command has also been updated to only detect valid UTF8 encoded text as type `JSON` or `KAFKA_STRING`.
This is inline with how KSQL would later deserialize the data.

If no known format can successfully deserialize the data it is printed as a combination of ASCII characters and hex encoded bytes.
big-andy-coates and others added 29 commits February 27, 2020 14:49
move the validation of the client auth config into the config class.
* chore: remove Connect schema from Format interface

As this interface is moving towards being a pluggable public interface.

Now, the `Format` interface deals with converting SchemaRegistry `ParsedSchema`s into the list of `SimpleColumn` ksql should use. Each `SimpleColumn` defines the name and type of the column.

Conversion between connect and ksql types is now handled by the `ConnectFormat` base class.  This base class currently converts all field names to uppercase. However, it is now possible for a format to return any-case names and those names will be respected.
)

* chore: better error when missing DESCRIBE_CONFIGS acl

Logged error was:

```
...
Caused by: io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
	at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:77)
	at io.confluent.ksql.services.KafkaTopicClientImpl.getConfig(KafkaTopicClientImpl.java:309)
	at io.confluent.ksql.services.KafkaTopicClientImpl.isTopicDeleteEnabled(KafkaTopicClientImpl.java:298)
	... 21 more
Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
```

and is now:

```
...
Caused by: io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration. Please ensure the ksql principal has DESCRIBE_CONFIGS rights on the Kafka cluster.
See https://docs.ksqldb.io/en/latest/operate-and-deploy/installation/server-config/security/#required-acls for more info.
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:77)
	at io.confluent.ksql.services.KafkaTopicClientImpl.getConfig(KafkaTopicClientImpl.java:309)
	at io.confluent.ksql.services.KafkaTopicClientImpl.isTopicDeleteEnabled(KafkaTopicClientImpl.java:298)
	... 21 more
Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
```

I've also switched our doc links over to the new micro site.
)

* fix: change configOverrides back to streamsProperties

Revert the breaking api change

* fix: missed annotation
This change improves the error message returned by the testing tool & QTT when a test case is expected to throw an exception but doesn't.

Old message was:

```
java.lang.AssertionError: test did not generate any queries.
expected: non-empty collection
was: Collection[]
failed test: key-schemas - KEY key field name
in file: query-validation-tests/key-schemas.json

```

New message is:

```
java.lang.AssertionError: Expected test to throw(an instance of io.confluent.ksql.util.KsqlStatementException and statement text a string containing "CREATE STREAM INPUT (KEY STRING KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');" and exception with message a string containing "'KEY' is an invalid KEY column name. KSQL currently only supports KEY columns named ROWKEY.")
failed test: key-schemas - KEY key field name
in file: query-validation-tests/key-schemas.json
```

Plus this change suppresses the logging of `ProtobufDataConfig` creation in the tests, which is just noise and fills up the logs on the build servers
Fix up historic plans merged from 5.5.x by splitting plan.json out of spec.json and adding any missing plans
RQTT test cases can now have a properties section, just like QTT test cases, e.g.

```json
"properties": {
    "ksql.some.thing": "whateva"
}
```
Our use of query plans means we no longer need this...
As we move towards allowing key names other than `ROWKEY` and switching `ROWTIME` to a pseudo column, having implicitly added columns in `LogicalSchema` is actually making the code less readable, not more.

This change removes the implicit columns from `LogicalSchema`. Instead key columns are now added _explicitly_ via `keyColumn()` and `ROWTIME` is now added _explicitly_ via `withRowTime()`.  There is little actual change to production code. The main bit being in `TableElements.toLogicalSchema()`.  Some places now need a `withRowTime()` call, but even these will go once `ROWTIME` is a pseudo column.

Most of the changes are in the test code base.
fixes: confluentinc#4639

Until the Streams bug https://issues.apache.org/jira/browse/KAFKA-9668 is fixed, ksql needs to protect itself from ConcurrentMod exceptions when accessing `KafkaSteams.allMetadata`.

This change accesses the internals of `KafkaStreams` to acquire a reference to the field that needs to be synchronised to protect against the concurrent modification.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.