-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: hide internal/system topics from SHOW TOPICS #4322
Conversation
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena
Overall functionality looks good: it's hiding internal topics and not allowing anyone to insert into them, right? i.e. it's treating them as read-only.
As I've said below, I think we should avoid hardcoded lists of internal topics. Instead, we should make this configurable via KsqlConfig
.
I see three different types of topics you're hiding / making read-only:
- Topics that are specific to KSQL, e.g. command topic, processing log topic, and configs topic (which is missing BTW).
- Topics that are internal details of systems KSQL connects to, e.g. Kafka's
__consumer_offsets topic
, or Connect'sconnect-configs
. - Topics relating to CP, but not related to KSQL OSS.
My thoughts would be:
- These should be hard coded within an instance so that it can't corrupt itself, i.e. use the
KsqlConfig
instance to determine the names of these topics and ensure they are internal / read-only. - We should set up the default config so that these are treated as internal / read-only, but make it configurable. This will be more flexible, e.g. supporting users that aren't running Connect, but have a topic named
connect-offsets
from some other system. - I don't think these one should be mentioned in any OSS config, unless it's just a total PITA to do it any other way. Ideally, this would be CP specific config for KSQL.
Please add me back in as a reviewer when you want me to take another look.
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
Outdated
Show resolved
Hide resolved
ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java
Outdated
Show resolved
Hide resolved
dab9bd6
to
284ace5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with a couple of suggestions.
@@ -89,6 +95,14 @@ public DistributingExecutor( | |||
.apply(executionContext, securityContext.getServiceContext()) | |||
.inject(statement); | |||
|
|||
if (injected.getStatement() instanceof InsertInto) { | |||
throwIfInsertOnInternalTopic( | |||
statement.getConfig(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ignores property overrides associated with the request, right? I'm wondering about the situation where a user has a valid reason to INSERT INTO
an topic marked as internal by default, issues the request and sees an error, and now wants to override the SYSTEM_INTERNAL_TOPICS_CONFIG
config to enable their use case. If this code ignores the statement override (which I believe it does), then they can't achieve the override without setting the config at a server level and restarting the KSQL server, which feels unnecessarily heavy-weight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should allow users to override this setting at the console.
- If it is an actual KSQL topic, e.g. the command topic, then we really don't want to allow them to produce to it!
- If it is a topic the admin of the system has configured, via the new config, to be internal/read-only, then again, we should IMHO allow the CLI user to override this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @big-andy-coates, we should not let users override the setting. If the KSQL admin decided to hide and make those topics read-only, then is up to the admin to change the setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, but can we be more explicit about it and add the property to https://github.com/confluentinc/ksql/blob/master/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java in that case? The current implementation of this PR is inconsistent in that it does not allow overrides for INSERT INTO
but it does for INSERT VALUES
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena for adding the config, I think this is much more flexible. However, it's also made it clear to me that we're actually trying to control two things with one config! I've called this out below. There's also some code duplication that I think we can clean up, and a change in the way the ProcessingLogConfig
is being created that I think can be avoided.
Aside from that it's looking good! There's a few suggestions and nits below.
This will cover internal changelog and repartition topics as well, right?
I've not approved yet as I'd appreciate another look after you've read through my comments and made any changes. However, if I'm holding you up then feel free to merge after addressing my concerns. We can always pick up anything I feel strongly about later.
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
final String message = "Cannot get a list of system internal topics due to an invalid " + | ||
"configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'"; | ||
|
||
LOG.error(message + ": " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, avoid "log and throw". It's an anti-pattern, which results in the same error resulting in multiple errors being logged for the same cause. Leave it up to the calling code to determine if it wants to log the error, ignore it or throw it up the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that initially, but while I was running some tests, I found the exception was only displayed on the client side. The server log didn't have any error. Being this a server configuration, I wanted to warn the KSQL server admin about this error too so they can fix it instead of relying on the client to send the error to the server admin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't change this in the updated PR because of above reasons. Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log and throw really is a anti pattern. Better to work out why the exception was not being logged on startup, as that omission could be causing other failures to not be logged, i.e. its a bug that needs fixing.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java
Outdated
Show resolved
Hide resolved
284ace5
to
11f97a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena - LGTM!
Couple of suggestions below to give us cleaner code, but nothing major except that I'm still concerned about the change in how we construct ProcessingLogConfig
.
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
final String message = "Cannot get a list of system internal topics due to an invalid " + | ||
"configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'"; | ||
|
||
LOG.error(message + ": " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log and throw really is a anti pattern. Better to work out why the exception was not being logged on startup, as that omission could be causing other failures to not be logged, i.e. its a bug that needs fixing.
ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java
Outdated
Show resolved
Hide resolved
@@ -192,11 +189,41 @@ public void execute( | |||
throw new KsqlException("Cannot insert values into windowed stream/table!"); | |||
} | |||
|
|||
final ReservedInternalTopics internalTopics = new ReservedInternalTopics(ksqlConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't mind. I don't think this PR is too large yet; adding a few more files where a param is injected and passed through won't, IMHO, make this PR unwieldy. Up to you.
On the flip side, I don't think we should merge this with the change in how you create the ProcessingLoggerConfig
, which may necessitate passing more things around anyway.
IMHO, wiring this up is part of this change. But I'll leave it up to you.
@@ -43,20 +48,24 @@ | |||
* {@code distributedCmdResponseTimeout}. | |||
*/ | |||
public class DistributingExecutor { | |||
private final KsqlConfig ksqlConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java
Show resolved
Hide resolved
2dc3c0e
to
a3a1a18
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still LGTM; but spotted a few minors.
@@ -532,6 +564,20 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { | |||
KSQL_AUTH_CACHE_MAX_ENTRIES_DEFAULT, | |||
Importance.LOW, | |||
KSQL_AUTH_CACHE_MAX_ENTRIES_DOC | |||
).define( | |||
KSQL_HIDDEN_TOPICS_CONFIG, | |||
Type.STRING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be Type.LIST`, which will be a list of strings.
Of course, you'll need to update the validator as well. The most future proof would be to have a validator that validates each element, and pass the regex one to that. But a short cut, for now, would just be to have the regex one validate a List.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I was having some issues with the casting initially, so I decided to convert it to a Type.STRING. But I put more logic in the validator to verify each list item is a String too and used a StringBuilder to build the final String now. That allowed me to use a Type.LIST this time.
`SHOW TOPICS` does not display hidden topics by default, such as: | ||
* KSQL internal topics, like the KSQL command topic or changelog & repartition topics. | ||
* Topics that match any pattern in the `ksql.hidden.topics` configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last sentence doesn't read right. Did you mean to have it end with:
like the KSQL command topic or changelog & repartition topics, or topics that match any pattern in the
ksql.hidden.topics
configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see both formats correctly. I used 2 bullets to mention both types of hidden topics, but your sentence is also correct. I'm not inclined to any sentence, though.
I'll make the change if I find other changes in the code to make after the rebase.
final String regex = Arrays.stream(((String) val).split(",")) | ||
.collect(Collectors.joining("|")); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regex should be a list, not a comma separated string. So this logic will need to change.
- added configs to immutable properties - removed 'internal' word from config names - modified show-topics.md - list changelog/partition topics with ALL syntax - add regex validator to ConfigValidators and KsqlConfig
a3a1a18
to
84ebde3
Compare
Description
Fixes #2952
Fixes #3134
Hide all KSQL, Confluent, Kafka, SR and Connect internal or system topics from the
SHOW TOPICS
command and prevents writing on streams/tables that are based on an internal/system topic.Also, it adds the
SHOW ALL TOPICS
syntax in case users want to list all topics including internal/system topics.Testing done
Uni tests
Verified manually
Reviewer checklist