Skip to content

Commit

Permalink
feat: better error message when transaction to command topic fails to…
Browse files Browse the repository at this point in the history
… initialize by timeout
  • Loading branch information
stevenpyzhang committed Feb 7, 2020
1 parent 1934d18 commit 72ecd00
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
Expand Down Expand Up @@ -56,14 +57,16 @@ public class DistributingExecutor {
private final ValidatedCommandFactory validatedCommandFactory;
private final CommandIdAssigner commandIdAssigner;
private final ReservedInternalTopics internalTopics;
private final Errors errorHandler;

public DistributingExecutor(
final KsqlConfig ksqlConfig,
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final ValidatedCommandFactory validatedCommandFactory
final ValidatedCommandFactory validatedCommandFactory,
final Errors errorHandler
) {
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.distributedCmdResponseTimeout =
Expand All @@ -78,6 +81,7 @@ public DistributingExecutor(
this.commandIdAssigner = new CommandIdAssigner();
this.internalTopics =
new ReservedInternalTopics(Objects.requireNonNull(ksqlConfig, "ksqlConfig"));
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
}

/**
Expand Down Expand Up @@ -113,18 +117,12 @@ public Optional<KsqlEntity> execute(

try {
transactionalProducer.initTransactions();
} catch (final TimeoutException e) {
throw new KsqlServerException(errorHandler.transactionInitTimeoutErrorMessage(e), e);
} catch (final Exception e) {
throw new KsqlServerException(String.format(
"Failed to initialize transactional producer to the KSQL command topic. "
+ "This may be a result of having misconfigured Kafka."
+ System.lineSeparator()
+ "If you're running a single Kafka broker, ensure that the following Kafka configs are set to 1:"
+ System.lineSeparator()
+ "- transaction.state.log.replication.factor"
+ System.lineSeparator()
+ "- transaction.state.log.min.isr"
+ System.lineSeparator()
+ "- offsets.topic.replication.factor", e));
"Could not write the statement '%s' into the command topic: " + e.getMessage(),
statement.getStatementText()), e);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void configure(final KsqlConfig config) {
distributedCmdResponseTimeout,
injectorFactory,
authorizationValidator,
new ValidatedCommandFactory(config)
new ValidatedCommandFactory(config),
errorHandler
),
ksqlEngine,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
import io.confluent.ksql.rest.entity.CommandId.Type;
Expand Down Expand Up @@ -130,6 +131,8 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json")
private Producer<CommandId, Command> transactionalProducer;
@Mock
private Command command;
@Mock
private Errors errorHandler;

private DistributingExecutor distributor;
private AtomicLong scnCounter;
Expand Down Expand Up @@ -159,7 +162,8 @@ public void setUp() throws InterruptedException {
DURATION_10_MS,
(ec, sc) -> InjectorChain.of(schemaInjector, topicInjector),
Optional.of(authorizationValidator),
validatedCommandFactory
validatedCommandFactory,
errorHandler
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ public String kafkaAuthorizationErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
}

@Override
public String transactionInitTimeoutErrorMessage(final Exception e) {
return "Timeout while initializing transaction to the KSQL command topic."
+ System.lineSeparator()
+ "If you're running a single Kafka broker, "
+ "ensure that the following Kafka configs are set to 1:"
+ System.lineSeparator()
+ "- transaction.state.log.replication.factor"
+ System.lineSeparator()
+ "- transaction.state.log.min.isr"
+ System.lineSeparator()
+ "- offsets.topic.replication.factor";
}

@Override
public String schemaRegistryUnconfiguredErrorMessage(final Exception e) {
return ErrorMessageUtil.buildErrorMessage(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ public interface ErrorMessages {

String kafkaAuthorizationErrorMessage(Exception e);

String transactionInitTimeoutErrorMessage(Exception e);

String schemaRegistryUnconfiguredErrorMessage(Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public String kafkaAuthorizationErrorMessage(final Exception e) {
return errorMessages.kafkaAuthorizationErrorMessage(e);
}

public String transactionInitTimeoutErrorMessage(final Exception e) {
return errorMessages.transactionInitTimeoutErrorMessage(e);
}

public Response generateResponse(
final Exception e,
final Response defaultResponse
Expand Down

0 comments on commit 72ecd00

Please sign in to comment.