Skip to content

Commit

Permalink
feat: Transactional Produces to Command Topic (#3660)
Browse files Browse the repository at this point in the history
* feat: implement transactional produces to KSQL command topic

* rename the transaction manager to TransactionalProducer

* add new configs for internal topic

* get rid of injecting TransactionalProducer into DistributingExecutor

* some refactors

* better error handling of transactions and redid waiting for commandRunner consumer

* process statements one at a time in rest server

* more tests

* address Almog comments

* remove dependency on CommandRunner from TransactionalProducer

* documentation

* move transactional producer to distributing executor and change interface to default kafka producer one

* improved error handling and changed condition for sending processing log stream statemen

* rohan's comments

* revert to using vanilla Kafka producer

* update acls documentation
  • Loading branch information
stevenpyzhang authored Nov 16, 2019
1 parent deb8b20 commit cba2877
Show file tree
Hide file tree
Showing 21 changed files with 580 additions and 253 deletions.
6 changes: 4 additions & 2 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ ksql.streams.producer.delivery.timeout.ms=2147483647
# Kafka cluster is unavailable.
ksql.streams.producer.max.block.ms=9223372036854775807

# For better fault tolerance and durability, set the replication factor for the KSQL
# Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster.
# For better fault tolerance and durability, set the replication factor and minimum insync replicas
# for the KSQL Server's internal topics.
# Note: the value 3 requires at least 3 brokers in your Kafka cluster.
ksql.internal.topic.replicas=3
ksql.internal.topic.min.insync.replicas=2

# Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and
# durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications.
Expand Down
3 changes: 3 additions & 0 deletions docs/installation/server-config/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ The ACLs required are the same for both :ref:`Interactive and non-interactive (h
KSQL always requires the following ACLs for its internal operations and data management:

- The ``DESCRIBE_CONFIGS`` operation on the ``CLUSTER`` resource type.
- The ``DESCRIBE`` operation on the ``TOPIC`` with ``LITERAL`` name ``__consumer_offsets``.
- The ``DESCRIBE`` operation on the ``TOPIC`` with ``LITERAL`` name ``__transaction_state``.
- The ``DESCRIBE`` and ``WRITE`` operations on the ``TRANSACTIONAL_ID`` with ``LITERAL`` name ``<ksql.service.id>``.
- The ``ALL`` operation on all internal ``TOPICS`` that are ``PREFIXED`` with ``_confluent-ksql-<ksql.service.id>``.
- The ``ALL`` operation on all internal ``GROUPS`` that are ``PREFIXED`` with ``_confluent-ksql-<ksql.service.id>``.

Expand Down
11 changes: 10 additions & 1 deletion ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class KsqlConfig extends AbstractConfig {

public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas";

public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY =
"ksql.internal.topic.min.insync.replicas";

public static final String KSQL_SCHEMA_REGISTRY_PREFIX = "ksql.schema.registry.";

public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url";
Expand Down Expand Up @@ -496,8 +499,14 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY,
Type.SHORT,
(short) 1,
ConfigDef.Importance.LOW,
ConfigDef.Importance.MEDIUM,
"The replication factor for the internal topics of KSQL server."
).define(
KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY,
Type.SHORT,
(short) 1,
ConfigDef.Importance.MEDIUM,
"The minimum number of insync replicas for the internal topics of KSQL server."
).define(
KSQL_UDF_SECURITY_MANAGER_ENABLED,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,36 +40,28 @@ public class CommandTopic {
private final TopicPartition commandTopicPartition;

private Consumer<CommandId, Command> commandConsumer = null;
private Producer<CommandId, Command> commandProducer = null;
private final String commandTopicName;

public CommandTopic(
final String commandTopicName,
final Map<String, Object> kafkaConsumerProperties,
final Map<String, Object> kafkaProducerProperties
final Map<String, Object> kafkaConsumerProperties
) {
this(
commandTopicName,
new KafkaConsumer<>(
Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"),
InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true),
InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false)
),
new KafkaProducer<>(
Objects.requireNonNull(kafkaProducerProperties, "kafkaClientProperties"),
InternalTopicJsonSerdeUtil.getJsonSerializer(true),
InternalTopicJsonSerdeUtil.getJsonSerializer(false)
));
)
);
}

CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer,
final Producer<CommandId, Command> commandProducer
final Consumer<CommandId, Command> commandConsumer
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer");
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
}

Expand All @@ -86,24 +73,6 @@ public void start() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

public RecordMetadata send(final CommandId commandId, final Command command) {
final ProducerRecord<CommandId, Command> producerRecord = new ProducerRecord<>(
commandTopicName,
0,
Objects.requireNonNull(commandId, "commandId"),
Objects.requireNonNull(command, "command"));
try {
return commandProducer.send(producerRecord).get();
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
}
throw new RuntimeException(e.getCause());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
}
Expand Down Expand Up @@ -150,6 +119,5 @@ public void wakeup() {

public void close() {
commandConsumer.close();
commandProducer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.id.HybridQueryIdGenerator;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
Expand Down Expand Up @@ -72,7 +74,6 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
Expand Down Expand Up @@ -108,6 +109,10 @@
import javax.websocket.server.ServerEndpointConfig.Configurator;
import javax.ws.rs.core.Configurable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.log4j.LogManager;
import org.eclipse.jetty.server.ServerConnector;
Expand Down Expand Up @@ -467,13 +472,16 @@ static KsqlRestApplication buildApplication(

UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();

final String commandTopic = KsqlInternalTopicUtils.getTopicName(
final String commandTopicName = KsqlInternalTopicUtils.getTopicName(
ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX);

final CommandStore commandStore = CommandStore.Factory.create(
commandTopic,
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties());
restConfig.getCommandProducerProperties()
);

final InteractiveStatementExecutor statementExecutor =
new InteractiveStatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator);
Expand Down Expand Up @@ -501,19 +509,12 @@ static KsqlRestApplication buildApplication(
authorizationValidator
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
);

final List<String> managedTopics = new LinkedList<>();
managedTopics.add(commandTopic);
managedTopics.add(commandTopicName);
if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) {
managedTopics.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig));
}

final CommandRunner commandRunner = new CommandRunner(
statementExecutor,
commandStore,
Expand All @@ -522,6 +523,14 @@ static KsqlRestApplication buildApplication(
serverState
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
);

final List<KsqlServerPrecondition> preconditions = restConfig.getConfiguredInstances(
KsqlRestConfig.KSQL_SERVER_PRECONDITIONS,
KsqlServerPrecondition.class
Expand Down Expand Up @@ -623,27 +632,47 @@ private static void maybeCreateProcessingLogStream(
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue
) {
if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)
|| !commandQueue.isEmpty()) {
if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)) {
return;
}

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
final Supplier<ConfiguredStatement<?>> configured = () -> ConfiguredStatement.of(
statement, Collections.emptyMap(), ksqlConfig);

final Producer<CommandId, Command> transactionalProducer =
commandQueue.createTransactionalProducer();
try {
transactionalProducer.initTransactions();
transactionalProducer.beginTransaction();

if (!commandQueue.isEmpty()) {
return;
}

// We don't wait for the commandRunner in this case since it hasn't been started yet.

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
final Supplier<ConfiguredStatement<?>> configured = () -> ConfiguredStatement.of(
statement, Collections.emptyMap(), ksqlConfig);

ksqlEngine.createSandbox(ksqlEngine.getServiceContext()).execute(
ksqlEngine.getServiceContext(),
configured.get()
);
} catch (final KsqlException e) {

commandQueue.enqueueCommand(configured.get(), transactionalProducer);
transactionalProducer.commitTransaction();
} catch (final ProducerFencedException
| OutOfOrderSequenceException
| AuthorizationException e
) {
// We can't recover from these exceptions, so our only option is close producer and exit.
// This catch doesn't abortTransaction() since doing that would throw another exception.
log.warn("Failed to create processing log stream", e);
return;
} catch (final Exception e) {
transactionalProducer.abortTransaction();
log.warn("Failed to create processing log stream", e);
} finally {
transactionalProducer.close();
}

commandQueue.enqueueCommand(configured.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;

/**
* Represents a queue of {@link Command}s that must be distributed to all
Expand All @@ -34,13 +36,16 @@ public interface CommandQueue extends Closeable {
* it is guaranteed that the command has been persisted, without regard
* for the {@link io.confluent.ksql.rest.entity.CommandStatus CommandStatus}.
*
* @param statement The statement to be distributed
*
* @param statement The statement to be distributed
* @param transactionalProducer The transactional producer used to for enqueue the command
* @return an asynchronous tracker that can be used to determine the current
* state of the command
*/
QueuedCommandStatus enqueueCommand(ConfiguredStatement<?> statement);

QueuedCommandStatus enqueueCommand(
ConfiguredStatement<?> statement,
Producer<CommandId, Command> transactionalProducer
);

/**
* Polls the Queue for any commands that have been enqueued since the last
* invocation to this method.
Expand Down Expand Up @@ -74,6 +79,20 @@ public interface CommandQueue extends Closeable {
void ensureConsumedPast(long seqNum, Duration timeout)
throws InterruptedException, TimeoutException;

/**
* Creates a transactional producer for producing to the command topic.
*
* @return a TransactionalProducer
*/
Producer<CommandId, Command> createTransactionalProducer();

/**
* Blocks until the command topic consumer has processed all records up to
* the current offset when this method is called.
*
*/
void waitForCommandConsumer();

/**
* @return whether or not there are any enqueued commands
*/
Expand Down
Loading

0 comments on commit cba2877

Please sign in to comment.