From c1164084d4e508d1a7c507058cdc26b671c3c6f0 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 30 May 2018 22:39:42 -0700 Subject: [PATCH] KAFKA-6054: Add 'version probing' to Kafka Streams rebalance (#4636) implements KIP-268 Reviewers: Bill Bejeck , John Roesler , Guozhang Wang --- .../apache/kafka/streams/StreamsConfig.java | 108 ++++- .../processor/internals/AssignedTasks.java | 28 +- .../processor/internals/StreamThread.java | 36 +- .../internals/StreamsPartitionAssignor.java | 229 ++++++++-- .../processor/internals/TaskManager.java | 18 +- .../internals/assignment/AssignmentInfo.java | 12 +- .../internals/assignment/ClientState.java | 32 +- .../assignment/SubscriptionInfo.java | 65 ++- .../processor/internals/StreamThreadTest.java | 27 +- .../StreamsPartitionAssignorTest.java | 393 ++++++++++++------ .../assignment/SubscriptionInfoTest.java | 16 + .../kafka/streams/tests/StreamsSmokeTest.java | 2 +- .../streams/tests/StreamsUpgradeTest.java | 284 ++++++++++++- tests/kafkatest/services/streams.py | 44 +- .../tests/streams/streams_upgrade_test.py | 207 ++++++++- tests/kafkatest/version.py | 2 +- 16 files changed, 1219 insertions(+), 284 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 18dc891682d87..bc549960de45f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -145,6 +145,7 @@ public class StreamsConfig extends AbstractConfig { */ // TODO: currently we cannot get the full topic configurations and hence cannot allow topic configs without the prefix, // this can be lifted once kafka.log.LogConfig is completely deprecated by org.apache.kafka.common.config.TopicConfig + @SuppressWarnings("WeakerAccess") public static final String TOPIC_PREFIX = "topic."; /** @@ -152,6 +153,7 @@ public class StreamsConfig extends AbstractConfig { * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer * properties}. */ + @SuppressWarnings("WeakerAccess") public static final String CONSUMER_PREFIX = "consumer."; /** @@ -161,6 +163,7 @@ public class StreamsConfig extends AbstractConfig { * 2. consumer.[config-name] * 3. [config-name] */ + @SuppressWarnings("WeakerAccess") public static final String MAIN_CONSUMER_PREFIX = "main.consumer."; /** @@ -170,6 +173,7 @@ public class StreamsConfig extends AbstractConfig { * 2. consumer.[config-name] * 3. [config-name] */ + @SuppressWarnings("WeakerAccess") public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer."; /** @@ -179,6 +183,7 @@ public class StreamsConfig extends AbstractConfig { * 2. consumer.[config-name] * 3. [config-name] */ + @SuppressWarnings("WeakerAccess") public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer."; /** @@ -186,6 +191,7 @@ public class StreamsConfig extends AbstractConfig { * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link ProducerConfig producer * properties}. */ + @SuppressWarnings("WeakerAccess") public static final String PRODUCER_PREFIX = "producer."; /** @@ -193,202 +199,250 @@ public class StreamsConfig extends AbstractConfig { * It is recommended to use {@link #adminClientPrefix(String)} to add this prefix to {@link ProducerConfig producer * properties}. */ + @SuppressWarnings("WeakerAccess") public static final String ADMIN_CLIENT_PREFIX = "admin."; /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}. */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_0100 = "0.10.0"; /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}. */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_0101 = "0.10.1"; /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}. */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_0102 = "0.10.2"; /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}. */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_0110 = "0.11.0"; /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}. */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_10 = "1.0"; /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}. */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_11 = "1.1"; /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. */ + @SuppressWarnings("WeakerAccess") public static final String AT_LEAST_ONCE = "at_least_once"; /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees. */ + @SuppressWarnings("WeakerAccess") public static final String EXACTLY_ONCE = "exactly_once"; /** {@code application.id} */ + @SuppressWarnings("WeakerAccess") public static final String APPLICATION_ID_CONFIG = "application.id"; private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; /**{@code user.endpoint} */ + @SuppressWarnings("WeakerAccess") public static final String APPLICATION_SERVER_CONFIG = "application.server"; private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application"; /** {@code bootstrap.servers} */ + @SuppressWarnings("WeakerAccess") public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; /** {@code buffered.records.per.partition} */ + @SuppressWarnings("WeakerAccess") public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; /** {@code cache.max.bytes.buffering} */ + @SuppressWarnings("WeakerAccess") public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; /** {@code client.id} */ + @SuppressWarnings("WeakerAccess") public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; private static final String CLIENT_ID_DOC = "An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer," + " with pattern '-StreamThread--'."; /** {@code commit.interval.ms} */ + @SuppressWarnings("WeakerAccess") public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor." + " (Note, if 'processing.guarantee' is set to '" + EXACTLY_ONCE + "', the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," + " otherwise the default value is " + DEFAULT_COMMIT_INTERVAL_MS + "."; /** {@code connections.max.idle.ms} */ + @SuppressWarnings("WeakerAccess") public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; /** * {@code default.deserialization.exception.handler} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface."; /** * {@code default.production.exception.handler} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; /** * {@code default.windowed.key.serde.inner} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner"; /** * {@code default.windowed.value.serde.inner} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner"; /** {@code default key.serde} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface. " + "Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via '" + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well"; /** {@code default value.serde} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface. " + "Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via '" + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well"; /** {@code default.timestamp.extractor} */ + @SuppressWarnings("WeakerAccess") public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; /** {@code metadata.max.age.ms} */ + @SuppressWarnings("WeakerAccess") public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; /** {@code metrics.num.samples} */ + @SuppressWarnings("WeakerAccess") public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; /** {@code metrics.record.level} */ + @SuppressWarnings("WeakerAccess") public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; /** {@code metric.reporters} */ + @SuppressWarnings("WeakerAccess") public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; /** {@code metrics.sample.window.ms} */ + @SuppressWarnings("WeakerAccess") public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; /** {@code num.standby.replicas} */ + @SuppressWarnings("WeakerAccess") public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; /** {@code num.stream.threads} */ + @SuppressWarnings("WeakerAccess") public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; /** {@code partition.grouper} */ + @SuppressWarnings("WeakerAccess") public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the org.apache.kafka.streams.processor.PartitionGrouper interface."; /** {@code poll.ms} */ + @SuppressWarnings("WeakerAccess") public static final String POLL_MS_CONFIG = "poll.ms"; private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; /** {@code processing.guarantee} */ + @SuppressWarnings("WeakerAccess") public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee"; private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are " + AT_LEAST_ONCE + " (default) and " + EXACTLY_ONCE + ". " + "Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`."; /** {@code receive.buffer.bytes} */ + @SuppressWarnings("WeakerAccess") public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; /** {@code reconnect.backoff.ms} */ + @SuppressWarnings("WeakerAccess") public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; /** {@code reconnect.backoff.max} */ + @SuppressWarnings("WeakerAccess") public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG; /** {@code replication.factor} */ + @SuppressWarnings("WeakerAccess") public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; /** {@code request.timeout.ms} */ + @SuppressWarnings("WeakerAccess") public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; /** {@code retries} */ + @SuppressWarnings("WeakerAccess") public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; /** {@code retry.backoff.ms} */ + @SuppressWarnings("WeakerAccess") public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; /** {@code rocksdb.config.setter} */ + @SuppressWarnings("WeakerAccess") public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the org.apache.kafka.streams.state.RocksDBConfigSetter interface"; /** {@code security.protocol} */ + @SuppressWarnings("WeakerAccess") public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; /** {@code send.buffer.bytes} */ + @SuppressWarnings("WeakerAccess") public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; /** {@code state.cleanup.delay} */ + @SuppressWarnings("WeakerAccess") public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed"; /** {@code state.dir} */ + @SuppressWarnings("WeakerAccess") public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; /** {@code upgrade.from} */ + @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; - public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " + + private static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " + "When upgrading from 1.2 to a newer version it is not required to specify this config." + "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version)."; /** {@code windowstore.changelog.additional.retention.ms} */ + @SuppressWarnings("WeakerAccess") public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; @@ -653,6 +707,7 @@ public class StreamsConfig extends AbstractConfig { public static class InternalConfig { public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__"; + public static final String VERSION_PROBING_FLAG = "__version.probing.flag__"; } /** @@ -662,6 +717,7 @@ public static class InternalConfig { * @param consumerProp the consumer property to be masked * @return {@link #CONSUMER_PREFIX} + {@code consumerProp} */ + @SuppressWarnings("WeakerAccess") public static String consumerPrefix(final String consumerProp) { return CONSUMER_PREFIX + consumerProp; } @@ -673,6 +729,7 @@ public static String consumerPrefix(final String consumerProp) { * @param consumerProp the consumer property to be masked * @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp} */ + @SuppressWarnings("WeakerAccess") public static String mainConsumerPrefix(final String consumerProp) { return MAIN_CONSUMER_PREFIX + consumerProp; } @@ -684,6 +741,7 @@ public static String mainConsumerPrefix(final String consumerProp) { * @param consumerProp the consumer property to be masked * @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp} */ + @SuppressWarnings("WeakerAccess") public static String restoreConsumerPrefix(final String consumerProp) { return RESTORE_CONSUMER_PREFIX + consumerProp; } @@ -695,6 +753,7 @@ public static String restoreConsumerPrefix(final String consumerProp) { * @param consumerProp the consumer property to be masked * @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp} */ + @SuppressWarnings("WeakerAccess") public static String globalConsumerPrefix(final String consumerProp) { return GLOBAL_CONSUMER_PREFIX + consumerProp; } @@ -706,6 +765,7 @@ public static String globalConsumerPrefix(final String consumerProp) { * @param producerProp the producer property to be masked * @return PRODUCER_PREFIX + {@code producerProp} */ + @SuppressWarnings("WeakerAccess") public static String producerPrefix(final String producerProp) { return PRODUCER_PREFIX + producerProp; } @@ -717,6 +777,7 @@ public static String producerPrefix(final String producerProp) { * @param adminClientProp the admin client property to be masked * @return ADMIN_CLIENT_PREFIX + {@code adminClientProp} */ + @SuppressWarnings("WeakerAccess") public static String adminClientPrefix(final String adminClientProp) { return ADMIN_CLIENT_PREFIX + adminClientProp; } @@ -728,6 +789,7 @@ public static String adminClientPrefix(final String adminClientProp) { * @param topicProp the topic property to be masked * @return TOPIC_PREFIX + {@code topicProp} */ + @SuppressWarnings("WeakerAccess") public static String topicPrefix(final String topicProp) { return TOPIC_PREFIX + topicProp; } @@ -737,6 +799,7 @@ public static String topicPrefix(final String topicProp) { * * @return a copy of the config definition */ + @SuppressWarnings("unused") public static ConfigDef configDef() { return new ConfigDef(CONFIG); } @@ -788,8 +851,8 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map getConsumerConfigs(final String groupId, final String clientId) { @@ -853,13 +917,14 @@ public Map getConsumerConfigs(final String groupId, * @param clientId clientId * @return Map of the consumer configuration. */ + @SuppressWarnings("WeakerAccess") public Map getMainConsumerConfigs(final String groupId, final String clientId) { - Map consumerProps = getCommonConsumerConfigs(); + final Map consumerProps = getCommonConsumerConfigs(); // Get main consumer override configs - Map mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); - for (Map.Entry entry: mainConsumerProps.entrySet()) { + final Map mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); + for (final Map.Entry entry: mainConsumerProps.entrySet()) { consumerProps.put(entry.getKey(), entry.getValue()); } @@ -919,12 +984,13 @@ public Map getMainConsumerConfigs(final String groupId, * @param clientId clientId * @return Map of the restore consumer configuration. */ + @SuppressWarnings("WeakerAccess") public Map getRestoreConsumerConfigs(final String clientId) { - Map baseConsumerProps = getCommonConsumerConfigs(); + final Map baseConsumerProps = getCommonConsumerConfigs(); // Get restore consumer override configs - Map restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); - for (Map.Entry entry: restoreConsumerProps.entrySet()) { + final Map restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); + for (final Map.Entry entry: restoreConsumerProps.entrySet()) { baseConsumerProps.put(entry.getKey(), entry.getValue()); } @@ -950,12 +1016,13 @@ public Map getRestoreConsumerConfigs(final String clientId) { * @param clientId clientId * @return Map of the global consumer configuration. */ + @SuppressWarnings("WeakerAccess") public Map getGlobalConsumerConfigs(final String clientId) { - Map baseConsumerProps = getCommonConsumerConfigs(); + final Map baseConsumerProps = getCommonConsumerConfigs(); // Get global consumer override configs - Map globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); - for (Map.Entry entry: globalConsumerProps.entrySet()) { + final Map globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); + for (final Map.Entry entry: globalConsumerProps.entrySet()) { baseConsumerProps.put(entry.getKey(), entry.getValue()); } @@ -977,6 +1044,7 @@ public Map getGlobalConsumerConfigs(final String clientId) { * @param clientId clientId * @return Map of the producer configuration. */ + @SuppressWarnings("WeakerAccess") public Map getProducerConfigs(final String clientId) { final Map clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); @@ -999,6 +1067,7 @@ public Map getProducerConfigs(final String clientId) { * @param clientId clientId * @return Map of the admin client configuration. */ + @SuppressWarnings("WeakerAccess") public Map getAdminConfigs(final String clientId) { final Map clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()); @@ -1045,10 +1114,11 @@ private Map getClientCustomProps() { * * @return an configured instance of key Serde class */ + @SuppressWarnings("WeakerAccess") public Serde defaultKeySerde() { - Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); + final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); try { - Serde serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class); + final Serde serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), true); return serde; } catch (final Exception e) { @@ -1063,10 +1133,11 @@ public Serde defaultKeySerde() { * * @return an configured instance of value Serde class */ + @SuppressWarnings("WeakerAccess") public Serde defaultValueSerde() { - Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG); + final Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG); try { - Serde serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); + final Serde serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), false); return serde; } catch (final Exception e) { @@ -1075,14 +1146,17 @@ public Serde defaultValueSerde() { } } + @SuppressWarnings("WeakerAccess") public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + @SuppressWarnings("WeakerAccess") public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + @SuppressWarnings("WeakerAccess") public ProductionExceptionHandler defaultProductionExceptionHandler() { return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 6f4f454bfc418..079d405cb5030 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -40,15 +40,15 @@ abstract class AssignedTasks { private final Logger log; private final String taskTypeName; private final TaskAction commitAction; - private Map created = new HashMap<>(); - private Map suspended = new HashMap<>(); - private Map restoring = new HashMap<>(); - private Set restoredPartitions = new HashSet<>(); - private Set previousActiveTasks = new HashSet<>(); + private final Map created = new HashMap<>(); + private final Map suspended = new HashMap<>(); + private final Map restoring = new HashMap<>(); + private final Set restoredPartitions = new HashSet<>(); + private final Set previousActiveTasks = new HashSet<>(); // IQ may access this map. - Map running = new ConcurrentHashMap<>(); - private Map runningByPartition = new HashMap<>(); - Map restoringByPartition = new HashMap<>(); + final Map running = new ConcurrentHashMap<>(); + private final Map runningByPartition = new HashMap<>(); + final Map restoringByPartition = new HashMap<>(); AssignedTasks(final LogContext logContext, final String taskTypeName) { @@ -176,7 +176,7 @@ private RuntimeException closeNonRunningTasks(final Collection tasks) { private RuntimeException suspendTasks(final Collection tasks) { final AtomicReference firstException = new AtomicReference<>(null); - for (Iterator it = tasks.iterator(); it.hasNext(); ) { + for (final Iterator it = tasks.iterator(); it.hasNext(); ) { final T task = it.next(); try { task.suspend(); @@ -249,10 +249,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set private void addToRestoring(final T task) { restoring.put(task.id(), task); - for (TopicPartition topicPartition : task.partitions()) { + for (final TopicPartition topicPartition : task.partitions()) { restoringByPartition.put(topicPartition, task); } - for (TopicPartition topicPartition : task.changelogPartitions()) { + for (final TopicPartition topicPartition : task.changelogPartitions()) { restoringByPartition.put(topicPartition, task); } } @@ -264,10 +264,10 @@ private void transitionToRunning(final T task) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); task.initializeTopology(); - for (TopicPartition topicPartition : task.partitions()) { + for (final TopicPartition topicPartition : task.partitions()) { runningByPartition.put(topicPartition, task); } - for (TopicPartition topicPartition : task.changelogPartitions()) { + for (final TopicPartition topicPartition : task.changelogPartitions()) { runningByPartition.put(topicPartition, task); } } @@ -356,7 +356,7 @@ int commit() { void applyToRunningTasks(final TaskAction action) { RuntimeException firstException = null; - for (Iterator it = running().iterator(); it.hasNext(); ) { + for (final Iterator it = running().iterator(); it.hasNext(); ) { final T task = it.next(); try { action.apply(task); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3080d2e1583a0..e72c4a5de9405 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -62,6 +62,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singleton; @@ -69,7 +70,7 @@ public class StreamThread extends Thread { private final static int UNLIMITED_RECORDS = -1; - private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); + private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); /** * Stream thread states are the possible states that a stream thread can be in. @@ -264,7 +265,9 @@ public void onPartitionsAssigned(final Collection assignment) { if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) { return; } - taskManager.createTasks(assignment); + if (!streamThread.versionProbingFlag.get()) { + taskManager.createTasks(assignment); + } } catch (final Throwable t) { log.error( "Error caught during partition assignment, " + @@ -298,7 +301,11 @@ public void onPartitionsRevoked(final Collection assignment) { final long start = time.milliseconds(); try { // suspend active tasks - taskManager.suspendTasksAndState(); + if (streamThread.versionProbingFlag.get()) { + streamThread.versionProbingFlag.set(false); + } else { + taskManager.suspendTasksAndState(); + } } catch (final Throwable t) { log.error( "Error caught during partition revocation, " + @@ -555,6 +562,7 @@ static class StreamsMetricsThreadImpl extends StreamsMetricsImpl { private final String logPrefix; private final TaskManager taskManager; private final StreamsMetricsThreadImpl streamsMetrics; + private final AtomicBoolean versionProbingFlag; private long lastCommitMs; private long timerStartedMs; @@ -647,6 +655,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId); consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); + final AtomicBoolean versionProbingFlag = new AtomicBoolean(); + consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, versionProbingFlag); String originalReset = null; if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); @@ -666,7 +676,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, streamsMetrics, builder, threadClientId, - logContext); + logContext, + versionProbingFlag); } public StreamThread(final Time time, @@ -679,7 +690,8 @@ public StreamThread(final Time time, final StreamsMetricsThreadImpl streamsMetrics, final InternalTopologyBuilder builder, final String threadClientId, - final LogContext logContext) { + final LogContext logContext, + final AtomicBoolean versionProbingFlag) { super(threadClientId); this.stateLock = new Object(); @@ -696,6 +708,7 @@ public StreamThread(final Time time, this.restoreConsumer = restoreConsumer; this.consumer = consumer; this.originalReset = originalReset; + this.versionProbingFlag = versionProbingFlag; this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); @@ -750,19 +763,26 @@ private void runLoop() { while (isRunning()) { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); + if (versionProbingFlag.get()) { + log.info("Version probing detected. Triggering new rebalance."); + enforceRebalance(); + } } catch (final TaskMigratedException ignoreAndRejoinGroup) { log.warn("Detected task {} that got migrated to another thread. " + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); - // re-subscribe to enforce a rebalance in the next poll call - consumer.unsubscribe(); - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + enforceRebalance(); } } } + private void enforceRebalance() { + consumer.unsubscribe(); + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + } + /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException If the store's change log does not contain the partition diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index e1464e6b72c11..db94ac0c852b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; @@ -59,6 +60,12 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable private final static int UNKNOWN = -1; public final static int NOT_AVAILABLE = -2; + private final static int VERSION_ONE = 1; + private final static int VERSION_TWO = 2; + private final static int VERSION_THREE = 3; + private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE; + private int minReceivedMetadataVersion = UNKNOWN; + protected Set supportedVersions = new HashSet<>(); private Logger log; private String logPrefix; @@ -159,7 +166,7 @@ public String toString() { } } - private static final Comparator PARTITION_COMPARATOR = new Comparator() { + protected static final Comparator PARTITION_COMPARATOR = new Comparator() { @Override public int compare(final TopicPartition p1, final TopicPartition p2) { @@ -178,12 +185,21 @@ public int compare(final TopicPartition p1, private TaskManager taskManager; private PartitionGrouper partitionGrouper; + private AtomicBoolean versionProbingFlag; - private int userMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; private InternalTopicManager internalTopicManager; private CopartitionedTopicsValidator copartitionedTopicsValidator; + protected String userEndPoint() { + return userEndPoint; + } + + protected TaskManager taskManger() { + return taskManager; + } + /** * We need to have the PartitionAssignor and its StreamThread to be mutually accessible * since the former needs later's cached metadata while sending subscriptions, @@ -204,15 +220,15 @@ public void configure(final Map configs) { switch (upgradeFrom) { case StreamsConfig.UPGRADE_FROM_0100: log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); - userMetadataVersion = 1; + usedSubscriptionMetadataVersion = VERSION_ONE; break; case StreamsConfig.UPGRADE_FROM_0101: case StreamsConfig.UPGRADE_FROM_0102: case StreamsConfig.UPGRADE_FROM_0110: case StreamsConfig.UPGRADE_FROM_10: case StreamsConfig.UPGRADE_FROM_11: - log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); - userMetadataVersion = 2; + log.info("Downgrading metadata version from {} to 2 for upgrade from {}.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION, upgradeFrom); + usedSubscriptionMetadataVersion = VERSION_TWO; break; default: throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); @@ -234,6 +250,21 @@ public void configure(final Map configs) { taskManager = (TaskManager) o; + final Object o2 = configs.get(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG); + if (o2 == null) { + final KafkaException fatalException = new KafkaException("VersionProbingFlag is not specified"); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; + } + + if (!(o2 instanceof AtomicBoolean)) { + final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o2.getClass().getName(), AtomicBoolean.class.getName())); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; + } + + versionProbingFlag = (AtomicBoolean) o2; + numStandbyReplicas = streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); partitionGrouper = streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); @@ -277,7 +308,7 @@ public Subscription subscription(final Set topics) { final Set standbyTasks = taskManager.cachedTasksIds(); standbyTasks.removeAll(previousActiveTasks); final SubscriptionInfo data = new SubscriptionInfo( - userMetadataVersion, + usedSubscriptionMetadataVersion, taskManager.processId(), previousActiveTasks, standbyTasks, @@ -313,20 +344,25 @@ public Map assign(final Cluster metadata, final Map subscriptions) { // construct the client metadata from the decoded subscription info final Map clientsMetadata = new HashMap<>(); + final Set futureConsumers = new HashSet<>(); - int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + supportedVersions.clear(); + int futureMetadataVersion = UNKNOWN; for (final Map.Entry entry : subscriptions.entrySet()) { final String consumerId = entry.getKey(); final Subscription subscription = entry.getValue(); final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); final int usedVersion = info.version(); + supportedVersions.add(info.latestSupportedVersion()); if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) { - throw new IllegalStateException("Unknown metadata version: " + usedVersion - + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + futureMetadataVersion = usedVersion; + futureConsumers.add(consumerId); + continue; } - if (usedVersion < minUserMetadataVersion) { - minUserMetadataVersion = usedVersion; + if (usedVersion < minReceivedMetadataVersion) { + minReceivedMetadataVersion = usedVersion; } // create the new client metadata if necessary @@ -341,6 +377,27 @@ public Map assign(final Cluster metadata, clientMetadata.addConsumer(consumerId, info); } + final boolean versionProbing; + if (futureMetadataVersion != UNKNOWN) { + if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { + log.info("Received a future (version probing) subscription (version: {}). Sending empty assignment back (with supported version {}).", + futureMetadataVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + versionProbing = true; + } else { + throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion + + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + ") at the same time."); + } + } else { + versionProbing = false; + } + + if (minReceivedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + log.info("Downgrading metadata to version {}. Latest supported version is {}.", + minReceivedMetadataVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + } + log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata); // ---------------- Step Zero ---------------- // @@ -457,12 +514,7 @@ public Map assign(final Cluster metadata, allAssignedPartitions.addAll(partitions); final TaskId id = entry.getKey(); - Set ids = tasksByTopicGroup.get(id.topicGroupId); - if (ids == null) { - ids = new HashSet<>(); - tasksByTopicGroup.put(id.topicGroupId, ids); - } - ids.add(id); + tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id); } for (final String topic : allSourceTopics) { final List partitionInfoList = fullMetadata.partitionsForTopic(topic); @@ -530,7 +582,7 @@ public Map assign(final Cluster metadata, // construct the global partition assignment per host map final Map> partitionsByHostState = new HashMap<>(); - if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) { + if (minReceivedMetadataVersion == 2 || minReceivedMetadataVersion == 3) { for (final Map.Entry entry : clientsMetadata.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; @@ -548,8 +600,23 @@ public Map assign(final Cluster metadata, } taskManager.setPartitionsByHostState(partitionsByHostState); - // within the client, distribute tasks to its owned consumers + final Map assignment; + if (versionProbing) { + assignment = versionProbingAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, futureConsumers, minReceivedMetadataVersion); + } else { + assignment = computeNewAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion); + } + + return assignment; + } + + private Map computeNewAssignment(final Map clientsMetadata, + final Map> partitionsForTask, + final Map> partitionsByHostState, + final int minUserMetadataVersion) { final Map assignment = new HashMap<>(); + + // within the client, distribute tasks to its owned consumers for (final Map.Entry entry : clientsMetadata.entrySet()) { final Set consumers = entry.getValue().consumers; final ClientState state = entry.getValue().state; @@ -574,12 +641,7 @@ public Map assign(final Cluster metadata, if (!state.standbyTasks().isEmpty()) { final List assignedStandbyList = interleavedStandby.get(consumerTaskIndex); for (final TaskId taskId : assignedStandbyList) { - Set standbyPartitions = standby.get(taskId); - if (standbyPartitions == null) { - standbyPartitions = new HashSet<>(); - standby.put(taskId, standbyPartitions); - } - standbyPartitions.addAll(partitionsForTask.get(taskId)); + standby.computeIfAbsent(taskId, k -> new HashSet<>()).addAll(partitionsForTask.get(taskId)); } } @@ -603,13 +665,63 @@ public Map assign(final Cluster metadata, return assignment; } + private Map versionProbingAssignment(final Map clientsMetadata, + final Map> partitionsForTask, + final Map> partitionsByHostState, + final Set futureConsumers, + final int minUserMetadataVersion) { + final Map assignment = new HashMap<>(); + + // assign previously assigned tasks to "old consumers" + for (final ClientMetadata clientMetadata : clientsMetadata.values()) { + for (final String consumerId : clientMetadata.consumers) { + + if (futureConsumers.contains(consumerId)) { + continue; + } + + final List activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks()); + + final List assignedPartitions = new ArrayList<>(); + for (final TaskId taskId : activeTasks) { + assignedPartitions.addAll(partitionsForTask.get(taskId)); + } + + final Map> standbyTasks = new HashMap<>(); + for (final TaskId taskId : clientMetadata.state.prevStandbyTasks()) { + standbyTasks.put(taskId, partitionsForTask.get(taskId)); + } + + assignment.put(consumerId, new Assignment( + assignedPartitions, + new AssignmentInfo( + minUserMetadataVersion, + activeTasks, + standbyTasks, + partitionsByHostState) + .encode() + )); + } + } + + // add empty assignment for "future version" clients (ie, empty version probing response) + for (final String consumerId : futureConsumers) { + assignment.put(consumerId, new Assignment( + Collections.emptyList(), + new AssignmentInfo().encode() + )); + } + + return assignment; + } + // visible for testing List> interleaveTasksByGroupId(final Collection taskIds, final int numberThreads) { final LinkedList sortedTasks = new LinkedList<>(taskIds); Collections.sort(sortedTasks); final List> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads); for (int i = 0; i < numberThreads; i++) { - taskIdsForConsumerAssignment.add(new ArrayList()); + taskIdsForConsumerAssignment.add(new ArrayList<>()); } while (!sortedTasks.isEmpty()) { for (final List taskIdList : taskIdsForConsumerAssignment) { @@ -632,7 +744,35 @@ public void onAssignment(final Assignment assignment) { Collections.sort(partitions, PARTITION_COMPARATOR); final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); - final int usedVersion = info.version(); + final int receivedAssignmentMetadataVersion = info.version(); + final int leaderSupportedVersion = info.latestSupportedVersion(); + + if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) { + throw new IllegalStateException("Sent a version " + usedSubscriptionMetadataVersion + + " subscription but got an assignment with higher version " + receivedAssignmentMetadataVersion + "."); + } + + if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion + && receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { + + if (receivedAssignmentMetadataVersion == leaderSupportedVersion) { + log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " + + "Downgrading subscription metadata to received version and trigger new rebalance.", + usedSubscriptionMetadataVersion, + receivedAssignmentMetadataVersion); + usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion; + } else { + log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " + + "Setting subscription metadata to leaders supported version {} and trigger new rebalance.", + usedSubscriptionMetadataVersion, + receivedAssignmentMetadataVersion, + leaderSupportedVersion); + usedSubscriptionMetadataVersion = leaderSupportedVersion; + } + + versionProbingFlag.set(true); + return; + } // version 1 field final Map> activeTasks = new HashMap<>(); @@ -640,22 +780,29 @@ public void onAssignment(final Assignment assignment) { final Map topicToPartitionInfo = new HashMap<>(); final Map> partitionsByHost; - switch (usedVersion) { - case 1: + switch (receivedAssignmentMetadataVersion) { + case VERSION_ONE: processVersionOneAssignment(info, partitions, activeTasks); partitionsByHost = Collections.emptyMap(); break; - case 2: + case VERSION_TWO: processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; - case 3: + case VERSION_THREE: + if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { + log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + + "Upgrading subscription metadata version to {} for next rebalance.", + usedSubscriptionMetadataVersion, + leaderSupportedVersion, + leaderSupportedVersion); + usedSubscriptionMetadataVersion = leaderSupportedVersion; + } processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; default: - throw new IllegalStateException("Unknown metadata version: " + usedVersion - + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION); + throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/"); } taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); @@ -679,13 +826,7 @@ private void processVersionOneAssignment(final AssignmentInfo info, for (int i = 0; i < partitions.size(); i++) { final TopicPartition partition = partitions.get(i); final TaskId id = info.activeTasks().get(i); - - Set assignedPartitions = activeTasks.get(id); - if (assignedPartitions == null) { - assignedPartitions = new HashSet<>(); - activeTasks.put(id, assignedPartitions); - } - assignedPartitions.add(partition); + activeTasks.computeIfAbsent(id, k -> new HashSet<>()).add(partition); } } @@ -713,6 +854,14 @@ private void processVersionThreeAssignment(final AssignmentInfo info, processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); } + // for testing + protected void processLatestVersionAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks, + final Map topicToPartitionInfo) { + processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); + } + /** * Internal helper function that creates a Kafka topic * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 63224dbcde743..6e6e4ca7c5c44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -42,7 +42,7 @@ import static java.util.Collections.singleton; -class TaskManager { +public class TaskManager { // initialize the task list // activeTasks needs to be concurrent as it can be accessed // by QueryableState @@ -187,14 +187,14 @@ Set standbyTaskIds() { return standby.allAssignedTaskIds(); } - Set prevActiveTaskIds() { + public Set prevActiveTaskIds() { return active.previousTaskIds(); } /** * Returns ids of tasks whose states are kept on the local storage. */ - Set cachedTasksIds() { + public Set cachedTasksIds() { // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios: // 1) the client is actively maintaining standby tasks by maintaining their states from the change log. // 2) the client has just got some tasks migrated out of itself to other clients while these task states @@ -221,7 +221,7 @@ Set cachedTasksIds() { return tasks; } - UUID processId() { + public UUID processId() { return processId; } @@ -356,21 +356,21 @@ private void assignStandbyPartitions() { } } - void setClusterMetadata(final Cluster cluster) { + public void setClusterMetadata(final Cluster cluster) { this.cluster = cluster; } - void setPartitionsByHostState(final Map> partitionsByHostState) { + public void setPartitionsByHostState(final Map> partitionsByHostState) { this.streamsMetadataState.onChange(partitionsByHostState, cluster); } - void setAssignmentMetadata(final Map> activeTasks, + public void setAssignmentMetadata(final Map> activeTasks, final Map> standbyTasks) { this.assignedActiveTasks = activeTasks; this.assignedStandbyTasks = standbyTasks; } - void updateSubscriptionsFromAssignment(List partitions) { + public void updateSubscriptionsFromAssignment(List partitions) { if (builder().sourceTopicPattern() != null) { final Set assignedTopics = new HashSet<>(); for (final TopicPartition topicPartition : partitions) { @@ -385,7 +385,7 @@ void updateSubscriptionsFromAssignment(List partitions) { } } - void updateSubscriptionsFromMetadata(Set topics) { + public void updateSubscriptionsFromMetadata(Set topics) { if (builder().sourceTopicPattern() != null) { final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); if (!existingTopics.equals(topics)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 3c5cee2bfc387..c577830e3e2ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -42,7 +42,7 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); public static final int LATEST_SUPPORTED_VERSION = 3; - public static final int UNKNOWN = -1; + static final int UNKNOWN = -1; private final int usedVersion; private final int latestSupportedVersion; @@ -65,9 +65,9 @@ public AssignmentInfo(final List activeTasks, public AssignmentInfo() { this(LATEST_SUPPORTED_VERSION, - Collections.emptyList(), - Collections.>emptyMap(), - Collections.>emptyMap()); + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap()); } public AssignmentInfo(final int version, @@ -229,7 +229,7 @@ public static AssignmentInfo decode(final ByteBuffer data) { decodeVersionThreeData(assignmentInfo, in); break; default: - TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + + final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); log.error(fatalException.getMessage(), fatalException); throw fatalException; @@ -262,7 +262,7 @@ private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo, final int count = in.readInt(); assignmentInfo.standbyTasks = new HashMap<>(count); for (int i = 0; i < count; i++) { - TaskId id = TaskId.readFrom(in); + final TaskId id = TaskId.readFrom(in); assignmentInfo.standbyTasks.put(id, readTopicPartitions(in)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 15ee849bffc33..66e655fa8372b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -26,6 +26,7 @@ public class ClientState { private final Set standbyTasks; private final Set assignedTasks; private final Set prevActiveTasks; + private final Set prevStandbyTasks; private final Set prevAssignedTasks; private int capacity; @@ -36,21 +37,34 @@ public ClientState() { } ClientState(final int capacity) { - this(new HashSet(), new HashSet(), new HashSet(), new HashSet(), new HashSet(), capacity); + this(new HashSet(), new HashSet(), new HashSet(), new HashSet(), new HashSet(), new HashSet(), capacity); } - private ClientState(Set activeTasks, Set standbyTasks, Set assignedTasks, Set prevActiveTasks, Set prevAssignedTasks, int capacity) { + private ClientState(final Set activeTasks, + final Set standbyTasks, + final Set assignedTasks, + final Set prevActiveTasks, + final Set prevStandbyTasks, + final Set prevAssignedTasks, + final int capacity) { this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.assignedTasks = assignedTasks; this.prevActiveTasks = prevActiveTasks; + this.prevStandbyTasks = prevStandbyTasks; this.prevAssignedTasks = prevAssignedTasks; this.capacity = capacity; } public ClientState copy() { - return new ClientState(new HashSet<>(activeTasks), new HashSet<>(standbyTasks), new HashSet<>(assignedTasks), - new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks), capacity); + return new ClientState( + new HashSet<>(activeTasks), + new HashSet<>(standbyTasks), + new HashSet<>(assignedTasks), + new HashSet<>(prevActiveTasks), + new HashSet<>(prevStandbyTasks), + new HashSet<>(prevAssignedTasks), + capacity); } public void assign(final TaskId taskId, final boolean active) { @@ -71,6 +85,14 @@ public Set standbyTasks() { return standbyTasks; } + public Set prevActiveTasks() { + return prevActiveTasks; + } + + public Set prevStandbyTasks() { + return prevStandbyTasks; + } + public int assignedTaskCount() { return assignedTasks.size(); } @@ -89,6 +111,7 @@ public void addPreviousActiveTasks(final Set prevTasks) { } public void addPreviousStandbyTasks(final Set standbyTasks) { + prevStandbyTasks.addAll(standbyTasks); prevAssignedTasks.addAll(standbyTasks); } @@ -98,6 +121,7 @@ public String toString() { ") standbyTasks: (" + standbyTasks + ") assignedTasks: (" + assignedTasks + ") prevActiveTasks: (" + prevActiveTasks + + ") prevStandbyTasks: (" + prevStandbyTasks + ") prevAssignedTasks: (" + prevAssignedTasks + ") capacity: " + capacity + "]"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index be709472441d8..4ebc95674b0a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -33,7 +33,7 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); public static final int LATEST_SUPPORTED_VERSION = 3; - public static final int UNKNOWN = -1; + static final int UNKNOWN = -1; private final int usedVersion; private final int latestSupportedVersion; @@ -151,20 +151,20 @@ private int getVersionOneByteLength() { 4 + standbyTasks.size() * 8; // length + standby tasks } - private void encodeClientUUID(final ByteBuffer buf) { + protected void encodeClientUUID(final ByteBuffer buf) { buf.putLong(processId.getMostSignificantBits()); buf.putLong(processId.getLeastSignificantBits()); } - private void encodeTasks(final ByteBuffer buf, - final Collection taskIds) { + protected void encodeTasks(final ByteBuffer buf, + final Collection taskIds) { buf.putInt(taskIds.size()); - for (TaskId id : taskIds) { + for (final TaskId id : taskIds) { id.writeTo(buf); } } - private byte[] prepareUserEndPoint() { + protected byte[] prepareUserEndPoint() { if (userEndPoint == null) { return new byte[0]; } else { @@ -194,8 +194,8 @@ private int getVersionTwoByteLength(final byte[] endPointBytes) { 4 + endPointBytes.length; // length + userEndPoint } - private void encodeUserEndPoint(final ByteBuffer buf, - final byte[] endPointBytes) { + protected void encodeUserEndPoint(final ByteBuffer buf, + final byte[] endPointBytes) { if (endPointBytes != null) { buf.putInt(endPointBytes.length); buf.put(endPointBytes); @@ -217,7 +217,7 @@ private ByteBuffer encodeVersionThree() { return buf; } - private int getVersionThreeByteLength(final byte[] endPointBytes) { + protected int getVersionThreeByteLength(final byte[] endPointBytes) { return 4 + // used version 4 + // latest supported version version 16 + // client ID @@ -236,6 +236,7 @@ public static SubscriptionInfo decode(final ByteBuffer data) { data.rewind(); final int usedVersion = data.getInt(); + final int latestSupportedVersion; switch (usedVersion) { case 1: subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); @@ -246,12 +247,13 @@ public static SubscriptionInfo decode(final ByteBuffer data) { decodeVersionTwoData(subscriptionInfo, data); break; case 3: - final int latestSupportedVersion = data.getInt(); + latestSupportedVersion = data.getInt(); subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion); decodeVersionThreeData(subscriptionInfo, data); break; default: - subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); + latestSupportedVersion = data.getInt(); + subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion); log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION); } @@ -261,12 +263,7 @@ public static SubscriptionInfo decode(final ByteBuffer data) { private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { decodeClientUUID(subscriptionInfo, data); - - subscriptionInfo.prevTasks = new HashSet<>(); - decodeTasks(subscriptionInfo.prevTasks, data); - - subscriptionInfo.standbyTasks = new HashSet<>(); - decodeTasks(subscriptionInfo.standbyTasks, data); + decodeTasks(subscriptionInfo, data); } private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo, @@ -274,30 +271,31 @@ private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo, subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); } - private static void decodeTasks(final Collection taskIds, + private static void decodeTasks(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { - final int numPrevs = data.getInt(); - for (int i = 0; i < numPrevs; i++) { - taskIds.add(TaskId.readFrom(data)); + subscriptionInfo.prevTasks = new HashSet<>(); + final int numPrevTasks = data.getInt(); + for (int i = 0; i < numPrevTasks; i++) { + subscriptionInfo.prevTasks.add(TaskId.readFrom(data)); + } + + subscriptionInfo.standbyTasks = new HashSet<>(); + final int numStandbyTasks = data.getInt(); + for (int i = 0; i < numStandbyTasks; i++) { + subscriptionInfo.standbyTasks.add(TaskId.readFrom(data)); } } private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { decodeClientUUID(subscriptionInfo, data); - - subscriptionInfo.prevTasks = new HashSet<>(); - decodeTasks(subscriptionInfo.prevTasks, data); - - subscriptionInfo.standbyTasks = new HashSet<>(); - decodeTasks(subscriptionInfo.standbyTasks, data); - + decodeTasks(subscriptionInfo, data); decodeUserEndPoint(subscriptionInfo, data); } private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { - int bytesLength = data.getInt(); + final int bytesLength = data.getInt(); if (bytesLength != 0) { final byte[] bytes = new byte[bytesLength]; data.get(bytes); @@ -308,16 +306,11 @@ private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo, private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { decodeClientUUID(subscriptionInfo, data); - - subscriptionInfo.prevTasks = new HashSet<>(); - decodeTasks(subscriptionInfo.prevTasks, data); - - subscriptionInfo.standbyTasks = new HashSet<>(); - decodeTasks(subscriptionInfo.standbyTasks, data); - + decodeTasks(subscriptionInfo, data); decodeUserEndPoint(subscriptionInfo, data); } + @Override public int hashCode() { final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); if (userEndPoint == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 749d618bac57a..f3dce52199836 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -78,6 +78,7 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -298,7 +299,8 @@ public void shouldNotCommitBeforeTheCommitInterval() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("") + new LogContext(""), + new AtomicBoolean() ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval - 10L); @@ -331,7 +333,9 @@ public void shouldNotCauseExceptionIfNothingCommitted() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("")); + new LogContext(""), + new AtomicBoolean() + ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval - 10L); thread.maybeCommit(mockTime.milliseconds()); @@ -364,7 +368,9 @@ public void shouldCommitAfterTheCommitInterval() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("")); + new LogContext(""), + new AtomicBoolean() + ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval + 1); thread.maybeCommit(mockTime.milliseconds()); @@ -511,7 +517,8 @@ public void shouldShutdownTaskManagerOnClose() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("") + new LogContext(""), + new AtomicBoolean() ); thread.setStateListener( new StreamThread.StateListener() { @@ -547,7 +554,8 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("") + new LogContext(""), + new AtomicBoolean() ); thread.shutdown(); EasyMock.verify(taskManager); @@ -574,7 +582,9 @@ public void shouldOnlyShutdownOnce() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("")); + new LogContext(""), + new AtomicBoolean() + ); thread.shutdown(); // Execute the run method. Verification of the mock will check that shutdown was only done once thread.run(); @@ -1255,7 +1265,7 @@ private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata met @Test // TODO: Need to add a test case covering EOS when we create a mock taskManager class public void producerMetricsVerificationWithoutEOS() { - final MockProducer producer = new MockProducer(); + final MockProducer producer = new MockProducer<>(); final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); @@ -1271,7 +1281,8 @@ public void producerMetricsVerificationWithoutEOS() { streamsMetrics, internalTopologyBuilder, clientId, - new LogContext("")); + new LogContext(""), + new AtomicBoolean()); final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap()); final Metric testMetric = new KafkaMetric( new Object(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 37b03fa3418af..a32d193a171c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -48,6 +48,7 @@ import org.easymock.EasyMock; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -57,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -98,7 +100,8 @@ public class StreamsPartitionAssignorTest { private final Cluster metadata = new Cluster( "cluster", Collections.singletonList(Node.noNode()), - infos, Collections.emptySet(), + infos, + Collections.emptySet(), Collections.emptySet()); private final TaskId task0 = new TaskId(0, 0); @@ -115,15 +118,16 @@ public class StreamsPartitionAssignorTest { private final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); private Map configProps() { - Map configurationMap = new HashMap<>(); + final Map configurationMap = new HashMap<>(); configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint); configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); + configurationMap.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, new AtomicBoolean()); return configurationMap; } private void configurePartitionAssignor(final Map props) { - Map configurationMap = configProps(); + final Map configurationMap = configProps(); configurationMap.putAll(props); partitionAssignor.configure(configurationMap); } @@ -158,7 +162,7 @@ public void shouldInterleaveTasksByGroupId() { final List expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1); final List> embeddedList = Arrays.asList(expectedSubList1, expectedSubList2, expectedSubList3); - List tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3); + final List tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3); Collections.shuffle(tasks); final List> interleavedTaskIds = partitionAssignor.interleaveTasksByGroupId(tasks, 3); @@ -182,15 +186,15 @@ public void testSubscription() { mockTaskManager(prevTasks, cachedTasks, processId, builder); configurePartitionAssignor(Collections.emptyMap()); - PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); + final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); Collections.sort(subscription.topics()); assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics()); - Set standbyTasks = new HashSet<>(cachedTasks); + final Set standbyTasks = new HashSet<>(cachedTasks); standbyTasks.removeAll(prevTasks); - SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null); + final SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null); assertEquals(info.encode(), subscription.userData()); } @@ -199,8 +203,8 @@ public void testAssignBasic() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - List topics = Utils.mkList("topic1", "topic2"); - Set allTasks = Utils.mkSet(task0, task1, task2); + final List topics = Utils.mkList("topic1", "topic2"); + final Set allTasks = Utils.mkSet(task0, task1, task2); final Set prevTasks10 = Utils.mkSet(task0); final Set prevTasks11 = Utils.mkSet(task1); @@ -209,15 +213,15 @@ public void testAssignBasic() { final Set standbyTasks11 = Utils.mkSet(task2); final Set standbyTasks20 = Utils.mkSet(task0); - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder); configurePartitionAssignor(Collections.emptyMap()); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); - Map subscriptions = new HashMap<>(); + final Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); subscriptions.put("consumer11", @@ -226,7 +230,7 @@ public void testAssignBasic() { new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode())); - Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), @@ -236,17 +240,17 @@ public void testAssignBasic() { // check assignment info // the first consumer - AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - Set allActiveTasks = new HashSet<>(info10.activeTasks()); + final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); + final Set allActiveTasks = new HashSet<>(info10.activeTasks()); // the second consumer - AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); + final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); allActiveTasks.addAll(info11.activeTasks()); assertEquals(Utils.mkSet(task0, task1), allActiveTasks); // the third consumer - AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); + final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); allActiveTasks.addAll(info20.activeTasks()); assertEquals(3, allActiveTasks.size()); @@ -277,7 +281,8 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { final Cluster localMetadata = new Cluster( "cluster", Collections.singletonList(Node.noNode()), - localInfos, Collections.emptySet(), + localInfos, + Collections.emptySet(), Collections.emptySet()); final List topics = Utils.mkList("topic1", "topic2"); @@ -332,26 +337,26 @@ public void testAssignWithPartialTopology() { builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); builder.addStateStore(new MockStoreBuilder("store2", false), "processor2"); - List topics = Utils.mkList("topic1", "topic2"); - Set allTasks = Utils.mkSet(task0, task1, task2); + final List topics = Utils.mkList("topic1", "topic2"); + final Set allTasks = Utils.mkSet(task0, task1, task2); - UUID uuid1 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, (Object) SingleGroupPartitionGrouperStub.class)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); - Map subscriptions = new HashMap<>(); + final Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); // will throw exception if it fails - Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, subscriptions); // check assignment info - AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); - Set allActiveTasks = new HashSet<>(info10.activeTasks()); + final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); + final Set allActiveTasks = new HashSet<>(info10.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -363,8 +368,8 @@ public void testAssignEmptyMetadata() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - List topics = Utils.mkList("topic1", "topic2"); - Set allTasks = Utils.mkSet(task0, task1, task2); + final List topics = Utils.mkList("topic1", "topic2"); + final Set allTasks = Utils.mkSet(task0, task1, task2); final Set prevTasks10 = Utils.mkSet(task0); final Set standbyTasks10 = Utils.mkSet(task1); @@ -372,12 +377,12 @@ public void testAssignEmptyMetadata() { Collections.emptySet(), Collections.emptySet(), Collections.emptySet()); - UUID uuid1 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder); configurePartitionAssignor(Collections.emptyMap()); - Map subscriptions = new HashMap<>(); + final Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); @@ -390,7 +395,7 @@ public void testAssignEmptyMetadata() { // check assignment info AssignmentInfo info10 = checkAssignment(Collections.emptySet(), assignments.get("consumer10")); - Set allActiveTasks = new HashSet<>(info10.activeTasks()); + final Set allActiveTasks = new HashSet<>(info10.activeTasks()); assertEquals(0, allActiveTasks.size()); assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks)); @@ -418,22 +423,22 @@ public void testAssignWithNewTasks() { builder.addSource(null, "source2", null, null, null, "topic2"); builder.addSource(null, "source3", null, null, null, "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); - List topics = Utils.mkList("topic1", "topic2", "topic3"); - Set allTasks = Utils.mkSet(task0, task1, task2, task3); + final List topics = Utils.mkList("topic1", "topic2", "topic3"); + final Set allTasks = Utils.mkSet(task0, task1, task2, task3); // assuming that previous tasks do not have topic3 final Set prevTasks10 = Utils.mkSet(task0); final Set prevTasks11 = Utils.mkSet(task1); final Set prevTasks20 = Utils.mkSet(task2); - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); mockTaskManager(prevTasks10, Collections.emptySet(), uuid1, builder); configurePartitionAssignor(Collections.emptyMap()); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); - Map subscriptions = new HashMap<>(); + final Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), userEndPoint).encode())); subscriptions.put("consumer11", @@ -441,14 +446,14 @@ public void testAssignWithNewTasks() { subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), userEndPoint).encode())); - Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and // then later ones will be re-assigned to other hosts due to load balancing AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData()); - Set allActiveTasks = new HashSet<>(info.activeTasks()); - Set allPartitions = new HashSet<>(assignments.get("consumer10").partitions()); + final Set allActiveTasks = new HashSet<>(info.activeTasks()); + final Set allPartitions = new HashSet<>(assignments.get("consumer10").partitions()); info = AssignmentInfo.decode(assignments.get("consumer11").userData()); allActiveTasks.addAll(info.activeTasks()); @@ -475,18 +480,18 @@ public void testAssignWithStates() { builder.addStateStore(new MockStoreBuilder("store2", false), "processor-2"); builder.addStateStore(new MockStoreBuilder("store3", false), "processor-2"); - List topics = Utils.mkList("topic1", "topic2"); + final List topics = Utils.mkList("topic1", "topic2"); - TaskId task00 = new TaskId(0, 0); - TaskId task01 = new TaskId(0, 1); - TaskId task02 = new TaskId(0, 2); - TaskId task10 = new TaskId(1, 0); - TaskId task11 = new TaskId(1, 1); - TaskId task12 = new TaskId(1, 2); - List tasks = Utils.mkList(task00, task01, task02, task10, task11, task12); + final TaskId task00 = new TaskId(0, 0); + final TaskId task01 = new TaskId(0, 1); + final TaskId task02 = new TaskId(0, 2); + final TaskId task10 = new TaskId(1, 0); + final TaskId task11 = new TaskId(1, 1); + final TaskId task12 = new TaskId(1, 2); + final List tasks = Utils.mkList(task00, task01, task02, task10, task11, task12); - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); mockTaskManager( Collections.emptySet(), @@ -497,7 +502,7 @@ public void testAssignWithStates() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); - Map subscriptions = new HashMap<>(); + final Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); subscriptions.put("consumer11", @@ -505,47 +510,46 @@ public void testAssignWithStates() { subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode())); - Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match assertEquals(2, assignments.get("consumer10").partitions().size()); assertEquals(2, assignments.get("consumer11").partitions().size()); assertEquals(2, assignments.get("consumer20").partitions().size()); - AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); - AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); - AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); assertEquals(2, info10.activeTasks().size()); assertEquals(2, info11.activeTasks().size()); assertEquals(2, info20.activeTasks().size()); - Set allTasks = new HashSet<>(); + final Set allTasks = new HashSet<>(); allTasks.addAll(info10.activeTasks()); allTasks.addAll(info11.activeTasks()); allTasks.addAll(info20.activeTasks()); assertEquals(new HashSet<>(tasks), allTasks); // check tasks for state topics - Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); - assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups)); - assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups)); - assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups)); + assertEquals(Utils.mkSet(task00, task01, task02), tasksForState("store1", tasks, topicGroups)); + assertEquals(Utils.mkSet(task10, task11, task12), tasksForState("store2", tasks, topicGroups)); + assertEquals(Utils.mkSet(task10, task11, task12), tasksForState("store3", tasks, topicGroups)); } - private Set tasksForState(final String applicationId, - final String storeName, + private Set tasksForState(final String storeName, final List tasks, final Map topicGroups) { final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); - Set ids = new HashSet<>(); - for (Map.Entry entry : topicGroups.entrySet()) { - Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet(); + final Set ids = new HashSet<>(); + for (final Map.Entry entry : topicGroups.entrySet()) { + final Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet(); if (stateChangelogTopics.contains(changelogTopic)) { - for (TaskId id : tasks) { + for (final TaskId id : tasks) { if (id.topicGroupId == entry.getKey()) ids.add(id); } @@ -556,15 +560,15 @@ private Set tasksForState(final String applicationId, @Test public void testAssignWithStandbyReplicas() { - Map props = configProps(); + final Map props = configProps(); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); - StreamsConfig streamsConfig = new StreamsConfig(props); + final StreamsConfig streamsConfig = new StreamsConfig(props); builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - List topics = Utils.mkList("topic1", "topic2"); - Set allTasks = Utils.mkSet(task0, task1, task2); + final List topics = Utils.mkList("topic1", "topic2"); + final Set allTasks = Utils.mkSet(task0, task1, task2); final Set prevTasks00 = Utils.mkSet(task0); @@ -574,8 +578,8 @@ public void testAssignWithStandbyReplicas() { final Set standbyTasks02 = Utils.mkSet(task2); final Set standbyTasks00 = Utils.mkSet(task0); - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); mockTaskManager(prevTasks00, standbyTasks01, uuid1, builder); @@ -583,7 +587,7 @@ public void testAssignWithStandbyReplicas() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); - Map subscriptions = new HashMap<>(); + final Map subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode())); subscriptions.put("consumer11", @@ -591,15 +595,15 @@ public void testAssignWithStandbyReplicas() { subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode())); - Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, subscriptions); // the first consumer - AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - Set allActiveTasks = new HashSet<>(info10.activeTasks()); - Set allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet()); + final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); + final Set allActiveTasks = new HashSet<>(info10.activeTasks()); + final Set allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet()); // the second consumer - AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); + final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); allActiveTasks.addAll(info11.activeTasks()); allStandbyTasks.addAll(info11.standbyTasks().keySet()); @@ -610,7 +614,7 @@ public void testAssignWithStandbyReplicas() { assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); // the third consumer - AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); + final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); allActiveTasks.addAll(info20.activeTasks()); allStandbyTasks.addAll(info20.standbyTasks().keySet()); @@ -641,7 +645,7 @@ public void testOnAssignment() { final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState); final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t3p0, t3p3), info.encode()); - Capture capturedCluster = EasyMock.newCapture(); + final Capture capturedCluster = EasyMock.newCapture(); taskManager.setPartitionsByHostState(hostState); EasyMock.expectLastCall(); taskManager.setAssignmentMetadata(activeTasks, standbyTasks); @@ -667,17 +671,17 @@ public void testAssignWithInternalTopics() { builder.addSink("sink1", "topicX", null, null, null, "processor1"); builder.addSource(null, "source2", null, null, null, "topicX"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); - List topics = Utils.mkList("topic1", applicationId + "-topicX"); - Set allTasks = Utils.mkSet(task0, task1, task2); + final List topics = Utils.mkList("topic1", applicationId + "-topicX"); + final Set allTasks = Utils.mkSet(task0, task1, task2); - UUID uuid1 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); configurePartitionAssignor(Collections.emptyMap()); - MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); + final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); - Map subscriptions = new HashMap<>(); - Set emptyTasks = Collections.emptySet(); + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); @@ -690,7 +694,7 @@ public void testAssignWithInternalTopics() { @Test public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { - String applicationId = "test"; + final String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); builder.addSource(null, "source1", null, null, null, "topic1"); @@ -701,18 +705,18 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); builder.addSink("sink2", "topicZ", null, null, null, "processor2"); builder.addSource(null, "source3", null, null, null, "topicZ"); - List topics = Utils.mkList("topic1", "test-topicX", "test-topicZ"); - Set allTasks = Utils.mkSet(task0, task1, task2); + final List topics = Utils.mkList("topic1", "test-topicX", "test-topicZ"); + final Set allTasks = Utils.mkSet(task0, task1, task2); - UUID uuid1 = UUID.randomUUID(); + final UUID uuid1 = UUID.randomUUID(); mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder); configurePartitionAssignor(Collections.emptyMap()); - MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); + final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); - Map subscriptions = new HashMap<>(); - Set emptyTasks = Collections.emptySet(); + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); @@ -731,7 +735,7 @@ public void shouldGenerateTasksForAllCreatedPartitions() { internalTopologyBuilder.setApplicationId(applicationId); // KStream with 3 partitions - KStream stream1 = builder + final KStream stream1 = builder .stream("topic1") // force creation of internal repartition topic .map(new KeyValueMapper>() { @@ -742,7 +746,7 @@ public KeyValue apply(final Object key, final Object value) { }); // KTable with 4 partitions - KTable table1 = builder + final KTable table1 = builder .table("topic3") // force creation of internal repartition topic .groupBy(new KeyValueMapper>() { @@ -884,7 +888,7 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { try { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost")); fail("expected to an exception due to invalid config"); - } catch (ConfigException e) { + } catch (final ConfigException e) { // pass } } @@ -896,7 +900,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { try { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk")); fail("expected to an exception due to invalid config"); - } catch (ConfigException e) { + } catch (final ConfigException e) { // pass } } @@ -908,7 +912,7 @@ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTas final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); internalTopologyBuilder.setApplicationId(applicationId); - KStream stream1 = builder + final KStream stream1 = builder // Task 1 (should get created): .stream("topic1") @@ -964,10 +968,11 @@ public Object apply(final Object value1, final Object value2) { final UUID uuid = UUID.randomUUID(); final String client = "client1"; - mockTaskManager(Collections.emptySet(), - Collections.emptySet(), - UUID.randomUUID(), - internalTopologyBuilder); + mockTaskManager( + Collections.emptySet(), + Collections.emptySet(), + UUID.randomUUID(), + internalTopologyBuilder); configurePartitionAssignor(Collections.emptyMap()); final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( @@ -1037,7 +1042,7 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { uuid, internalTopologyBuilder); - Map props = new HashMap<>(); + final Map props = new HashMap<>(); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint); configurePartitionAssignor(props); @@ -1075,18 +1080,58 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { assertThat(allAssignedPartitions, equalTo(allPartitions)); } - @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() { - partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + @Test + public void shouldThrowKafkaExceptionIfTaskMangerNotConfigured() { + final Map config = configProps(); + config.remove(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR); + + try { + partitionAssignor.configure(config); + fail("Should have thrown KafkaException"); + } catch (final KafkaException expected) { + assertThat(expected.getMessage(), equalTo("TaskManager is not specified")); + } } - @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() { - final Map config = new HashMap<>(); - config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - config.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a stream thread"); + @Test + public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance() { + final Map config = configProps(); + config.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a task manager"); - partitionAssignor.configure(config); + try { + partitionAssignor.configure(config); + fail("Should have thrown KafkaException"); + } catch (final KafkaException expected) { + assertThat(expected.getMessage(), + equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.TaskManager")); + } + } + + @Test + public void shouldThrowKafkaExceptionVersionProbingFlagNotConfigured() { + final Map config = configProps(); + config.remove(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG); + + try { + partitionAssignor.configure(config); + fail("Should have thrown KafkaException"); + } catch (final KafkaException expected) { + assertThat(expected.getMessage(), equalTo("VersionProbingFlag is not specified")); + } + } + + @Test + public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicBoolean() { + final Map config = configProps(); + config.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, "i am not an AtomicBoolean"); + + try { + partitionAssignor.configure(config); + fail("Should have thrown KafkaException"); + } catch (final KafkaException expected) { + assertThat(expected.getMessage(), + equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicBoolean")); + } } @Test @@ -1147,7 +1192,7 @@ public void shouldDownGradeSubscriptionToVersion1() { builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, (Object) StreamsConfig.UPGRADE_FROM_0100)); - PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); } @@ -1187,11 +1232,114 @@ private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); - PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); } + @Test + public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForFutureInstances() { + builder.addSource(null, "source1", null, null, null, "topic1"); + + final Map subscriptions = new HashMap<>(); + final Set allTasks = Utils.mkSet(task0, task1, task2); + + final Set activeTasks = Utils.mkSet(task0, task1); + final Set standbyTasks = Utils.mkSet(task2); + final Map> standbyTaskMap = new HashMap>() { + { + put(task2, Collections.singleton(t1p2)); + } + }; + + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode() + ) + ); + subscriptions.put( + "future-consumer", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription() + ) + ); + + mockTaskManager( + allTasks, + allTasks, + UUID.randomUUID(), + builder); + partitionAssignor.configure(configProps()); + final Map assignment = partitionAssignor.assign(metadata, subscriptions); + + assertThat(assignment.size(), equalTo(2)); + assertThat( + AssignmentInfo.decode(assignment.get("consumer1").userData()), + equalTo(new AssignmentInfo( + new ArrayList<>(activeTasks), + standbyTaskMap, + Collections.>emptyMap() + ))); + assertThat(assignment.get("consumer1").partitions(), equalTo(Utils.mkList(t1p0, t1p1))); + + assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo())); + assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0)); + } + + @Test + public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() { + shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1); + } + + @Test + public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() { + shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); + } + + private ByteBuffer encodeFutureSubscription() { + final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + + 4 /* supported version */); + buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); + buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); + return buf; + } + + private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "future-consumer", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription() + ) + ); + + mockTaskManager( + emptyTasks, + emptyTasks, + UUID.randomUUID(), + builder); + partitionAssignor.configure(configProps()); + + try { + partitionAssignor.assign(metadata, subscriptions); + fail("Should have thrown IllegalStateException"); + } catch (final IllegalStateException expected) { + // pass + } + } + private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.>emptyMap(), @@ -1201,19 +1349,20 @@ private PartitionAssignor.Assignment createAssignment(final MapemptyList(), info.encode()); } - private AssignmentInfo checkAssignment(Set expectedTopics, PartitionAssignor.Assignment assignment) { + private AssignmentInfo checkAssignment(final Set expectedTopics, + final PartitionAssignor.Assignment assignment) { // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group. - AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); // check if the number of assigned partitions == the size of active task id list assertEquals(assignment.partitions().size(), info.activeTasks().size()); // check if active tasks are consistent - List activeTasks = new ArrayList<>(); - Set activeTopics = new HashSet<>(); - for (TopicPartition partition : assignment.partitions()) { + final List activeTasks = new ArrayList<>(); + final Set activeTopics = new HashSet<>(); + for (final TopicPartition partition : assignment.partitions()) { // since default grouper, taskid.partition == partition.partition() activeTasks.add(new TaskId(0, partition.partition())); activeTopics.add(partition.topic()); @@ -1224,11 +1373,11 @@ private AssignmentInfo checkAssignment(Set expectedTopics, PartitionAssi assertEquals(expectedTopics, activeTopics); // check if standby tasks are consistent - Set standbyTopics = new HashSet<>(); - for (Map.Entry> entry : info.standbyTasks().entrySet()) { - TaskId id = entry.getKey(); - Set partitions = entry.getValue(); - for (TopicPartition partition : partitions) { + final Set standbyTopics = new HashSet<>(); + for (final Map.Entry> entry : info.standbyTasks().entrySet()) { + final TaskId id = entry.getKey(); + final Set partitions = entry.getValue(); + for (final TopicPartition partition : partitions) { // since default grouper, taskid.partition == partition.partition() assertEquals(id.partition, partition.partition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index e98b8ce072705..0611bfc4d5c6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -75,4 +76,19 @@ public void shouldEncodeAndDecodeVersion3() { assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } + @Test + public void shouldAllowToDecodeFutureSupportedVersion() { + final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion()); + assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.version()); + assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion()); + } + + private ByteBuffer encodeFutureVersion() { + final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + + 4 /* supported version */); + buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); + buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); + return buf; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index 2409bd59643da..8c807800869e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -40,7 +40,7 @@ public static void main(final String[] args) throws InterruptedException, IOExce final String propFileName = args[0]; final String command = args[1]; - final boolean disableAutoTerminate = args.length > 3; + final boolean disableAutoTerminate = args.length > 2; final Properties streamsProperties = Utils.loadProps(propFileName); final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 69eea0b37c0ef..1b01a7300a1bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,29 +16,57 @@ */ package org.apache.kafka.streams.tests; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; +import org.apache.kafka.streams.processor.internals.TaskManager; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; +import org.apache.kafka.streams.state.HostInfo; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.UUID; public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: "); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args.length > 0 ? args[0] : null; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -48,11 +76,18 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + + final KafkaClientSupplier kafkaClientSupplier; + if (streamsProperties.containsKey("test.future.metadata")) { + streamsProperties.remove("test.future.metadata"); + kafkaClientSupplier = new FutureKafkaClientSupplier(); + } else { + kafkaClientSupplier = new DefaultKafkaClientSupplier(); + } config.putAll(streamsProperties); - final KafkaStreams streams = new KafkaStreams(builder.build(), config); + final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -66,4 +101,237 @@ public void run() { } }); } + + private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier { + @Override + public Consumer getConsumer(final Map config) { + config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, FutureStreamsPartitionAssignor.class.getName()); + return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + } + + public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor { + + public FutureStreamsPartitionAssignor() { + usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1; + } + + @Override + public Subscription subscription(final Set topics) { + // Adds the following information to subscription + // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) + // 2. Task ids of previously running tasks + // 3. Task ids of valid local states on the client's state directory. + + final TaskManager taskManager = taskManger(); + final Set previousActiveTasks = taskManager.prevActiveTaskIds(); + final Set standbyTasks = taskManager.cachedTasksIds(); + standbyTasks.removeAll(previousActiveTasks); + final FutureSubscriptionInfo data = new FutureSubscriptionInfo( + usedSubscriptionMetadataVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, + taskManager.processId(), + previousActiveTasks, + standbyTasks, + userEndPoint()); + + taskManager.updateSubscriptionsFromMetadata(topics); + + return new Subscription(new ArrayList<>(topics), data.encode()); + } + + @Override + public void onAssignment(final PartitionAssignor.Assignment assignment) { + try { + super.onAssignment(assignment); + return; + } catch (final TaskAssignmentException cannotProcessFutureVersion) { + // continue + } + + final ByteBuffer data = assignment.userData(); + data.rewind(); + + final int usedVersion; + try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { + usedVersion = in.readInt(); + } catch (final IOException ex) { + throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); + } + + if (usedVersion > AssignmentInfo.LATEST_SUPPORTED_VERSION + 1) { + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); + } + + final AssignmentInfo info = AssignmentInfo.decode( + assignment.userData().putInt(0, AssignmentInfo.LATEST_SUPPORTED_VERSION)); + + final List partitions = new ArrayList<>(assignment.partitions()); + Collections.sort(partitions, PARTITION_COMPARATOR); + + // version 1 field + final Map> activeTasks = new HashMap<>(); + // version 2 fields + final Map topicToPartitionInfo = new HashMap<>(); + final Map> partitionsByHost; + + processLatestVersionAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + + final TaskManager taskManager = taskManger(); + taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); + taskManager.setPartitionsByHostState(partitionsByHost); + taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); + taskManager.updateSubscriptionsFromAssignment(partitions); + } + + @Override + public Map assign(final Cluster metadata, + final Map subscriptions) { + Map assignment = null; + + final Map downgradedSubscriptions = new HashMap<>(); + for (final Subscription subscription : subscriptions.values()) { + final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) { + assignment = super.assign(metadata, subscriptions); + break; + } + } + + boolean bumpUsedVersion = false; + final boolean bumpSupportedVersion; + if (assignment != null) { + bumpSupportedVersion = supportedVersions.size() == 1 && supportedVersions.iterator().next() == SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1; + } else { + for (final Map.Entry entry : subscriptions.entrySet()) { + final Subscription subscription = entry.getValue(); + + final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData() + .putInt(0, SubscriptionInfo.LATEST_SUPPORTED_VERSION) + .putInt(4, SubscriptionInfo.LATEST_SUPPORTED_VERSION)); + + downgradedSubscriptions.put( + entry.getKey(), + new Subscription( + subscription.topics(), + new SubscriptionInfo( + info.processId(), + info.prevTasks(), + info.standbyTasks(), + info.userEndPoint()) + .encode())); + } + assignment = super.assign(metadata, downgradedSubscriptions); + bumpUsedVersion = true; + bumpSupportedVersion = true; + } + + final Map newAssignment = new HashMap<>(); + for (final Map.Entry entry : assignment.entrySet()) { + final Assignment singleAssignment = entry.getValue(); + newAssignment.put( + entry.getKey(), + new Assignment( + singleAssignment.partitions(), + new FutureAssignmentInfo( + bumpUsedVersion, + bumpSupportedVersion, + singleAssignment.userData()) + .encode())); + } + + return newAssignment; + } + } + + private static class FutureSubscriptionInfo extends SubscriptionInfo { + // for testing only; don't apply version checks + FutureSubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final String userEndPoint) { + super(version, latestSupportedVersion, processId, prevTasks, standbyTasks, userEndPoint); + } + + public ByteBuffer encode() { + if (version() <= SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + final ByteBuffer buf = super.encode(); + // super.encode() always encodes `LATEST_SUPPORTED_VERSION` as "latest supported version" + // need to update to future version + buf.putInt(4, latestSupportedVersion()); + return buf; + } + + final ByteBuffer buf = encodeFutureVersion(); + buf.rewind(); + return buf; + } + + private ByteBuffer encodeFutureVersion() { + final byte[] endPointBytes = prepareUserEndPoint(); + + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); + + buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version + buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version + encodeClientUUID(buf); + encodeTasks(buf, prevTasks()); + encodeTasks(buf, standbyTasks()); + encodeUserEndPoint(buf, endPointBytes); + + return buf; + } + + } + + private static class FutureAssignmentInfo extends AssignmentInfo { + private final boolean bumpUsedVersion; + private final boolean bumpSupportedVersion; + final ByteBuffer originalUserMetadata; + + private FutureAssignmentInfo(final boolean bumpUsedVersion, + final boolean bumpSupportedVersion, + final ByteBuffer bytes) { + this.bumpUsedVersion = bumpUsedVersion; + this.bumpSupportedVersion = bumpSupportedVersion; + originalUserMetadata = bytes; + } + + @Override + public ByteBuffer encode() { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + originalUserMetadata.rewind(); + + try (final DataOutputStream out = new DataOutputStream(baos)) { + if (bumpUsedVersion) { + originalUserMetadata.getInt(); // discard original used version + out.writeInt(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); + } else { + out.writeInt(originalUserMetadata.getInt()); + } + if (bumpSupportedVersion) { + originalUserMetadata.getInt(); // discard original supported version + out.writeInt(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); + } + + try { + while (true) { + out.write(originalUserMetadata.get()); + } + } catch (final BufferUnderflowException expectedWhenAllDataCopied) { } + + out.flush(); + out.close(); + + return ByteBuffer.wrap(baos.toByteArray()); + } catch (final IOException ex) { + throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex); + } + } + } } diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index f268ab8de598b..1d8ed270cc5eb 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import KafkaConfig from kafkatest.services.monitor.jmx import JmxMixin -from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1 +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1 STATE_DIR = "state.dir" @@ -52,6 +52,33 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): "streams_stderr": { "path": STDERR_FILE, "collect_default": True}, + "streams_log.1": { + "path": LOG_FILE + ".1", + "collect_default": True}, + "streams_stdout.1": { + "path": STDOUT_FILE + ".1", + "collect_default": True}, + "streams_stderr.1": { + "path": STDERR_FILE + ".1", + "collect_default": True}, + "streams_log.2": { + "path": LOG_FILE + ".2", + "collect_default": True}, + "streams_stdout.2": { + "path": STDOUT_FILE + ".2", + "collect_default": True}, + "streams_stderr.2": { + "path": STDERR_FILE + ".2", + "collect_default": True}, + "streams_log.3": { + "path": LOG_FILE + ".3", + "collect_default": True}, + "streams_stdout.3": { + "path": STDOUT_FILE + ".3", + "collect_default": True}, + "streams_stderr.3": { + "path": STDERR_FILE + ".3", + "collect_default": True}, "streams_log.0-1": { "path": LOG_FILE + ".0-1", "collect_default": True}, @@ -412,17 +439,26 @@ def set_version(self, kafka_streams_version): def set_upgrade_from(self, upgrade_from): self.UPGRADE_FROM = upgrade_from + def set_upgrade_to(self, upgrade_to): + self.UPGRADE_TO = upgrade_to + def prop_file(self): - properties = {STATE_DIR: self.PERSISTENT_ROOT} + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()} if self.UPGRADE_FROM is not None: properties['upgrade.from'] = self.UPGRADE_FROM + if self.UPGRADE_TO == "future_version": + properties['test.future.metadata'] = "any_value" cfg = KafkaConfig(**properties) return cfg.render() def start_cmd(self, node): args = self.args.copy() - args['kafka'] = self.kafka.bootstrap_servers() + if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]: + args['kafka'] = self.kafka.bootstrap_servers() + else: + args['kafka'] = "" if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1): args['zk'] = self.kafka.zk.connect_setting() else: @@ -437,7 +473,7 @@ def start_cmd(self, node): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \ - " %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \ + " %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args self.logger.info("Executing: " + cmd) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index debe85fd7e271..41134672e9871 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -48,6 +48,7 @@ def __init__(self, test_context): 'data' : { 'partitions': 5 }, } self.leader = None + self.leader_counter = {} def perform_broker_upgrade(self, to_version): self.logger.info("First pass bounce - rolling broker upgrade") @@ -158,7 +159,7 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): random.shuffle(self.processors) for p in self.processors: p.CLEAN_NODE_ENABLED = False - self.do_rolling_bounce(p, None, to_version, counter) + self.do_stop_start_bounce(p, None, to_version, counter) counter = counter + 1 # shutdown @@ -176,8 +177,7 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): self.driver.stop() - #@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) - @ignore + @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions) @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions) def test_metadata_upgrade(self, from_version, to_version): @@ -209,13 +209,70 @@ def test_metadata_upgrade(self, from_version, to_version): random.shuffle(self.processors) for p in self.processors: p.CLEAN_NODE_ENABLED = False - self.do_rolling_bounce(p, from_version[:-2], to_version, counter) + self.do_stop_start_bounce(p, from_version[:-2], to_version, counter) counter = counter + 1 # second rolling bounce random.shuffle(self.processors) for p in self.processors: - self.do_rolling_bounce(p, None, to_version, counter) + self.do_stop_start_bounce(p, None, to_version, counter) + counter = counter + 1 + + # shutdown + self.driver.stop() + self.driver.wait() + + random.shuffle(self.processors) + for p in self.processors: + node = p.node + with node.account.monitor_log(p.STDOUT_FILE) as monitor: + p.stop() + monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", + timeout_sec=60, + err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + + self.driver.stop() + + def test_version_probing_upgrade(self): + """ + Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" + """ + + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() + + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics) + self.kafka.start() + + self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) + self.driver.disable_auto_terminate() + self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) + self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) + self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) + + self.driver.start() + self.start_all_nodes_with("") # run with TRUNK + + self.processors = [self.processor1, self.processor2, self.processor3] + self.old_processors = [self.processor1, self.processor2, self.processor3] + self.upgraded_processors = [] + for p in self.processors: + self.leader_counter[p] = 2 + + self.update_leader() + for p in self.processors: + self.leader_counter[p] = 0 + self.leader_counter[self.leader] = 3 + + counter = 1 + current_generation = 3 + + random.seed() + random.shuffle(self.processors) + + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + current_generation = self.do_rolling_bounce(p, counter, current_generation) counter = counter + 1 # shutdown @@ -233,6 +290,27 @@ def test_metadata_upgrade(self, from_version, to_version): self.driver.stop() + def update_leader(self): + self.leader = None + retries = 10 + while retries > 0: + for p in self.processors: + found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True)) + if len(found) == self.leader_counter[p] + 1: + if self.leader is not None: + raise Exception("Could not uniquely identify leader") + self.leader = p + self.leader_counter[p] = self.leader_counter[p] + 1 + + if self.leader is None: + retries = retries - 1 + time.sleep(5) + else: + break + + if self.leader is None: + raise Exception("Could not identify leader") + def start_all_nodes_with(self, version): # start first with self.prepare_for(self.processor1, version) @@ -293,7 +371,7 @@ def prepare_for(processor, version): else: processor.set_version(version) - def do_rolling_bounce(self, processor, upgrade_from, new_version, counter): + def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter): first_other_processor = None second_other_processor = None for p in self.processors: @@ -361,3 +439,120 @@ def do_rolling_bounce(self, processor, upgrade_from, new_version, counter): monitor.wait_until("processed 100 records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account)) + + def do_rolling_bounce(self, processor, counter, current_generation): + first_other_processor = None + second_other_processor = None + for p in self.processors: + if p != processor: + if first_other_processor is None: + first_other_processor = p + else: + second_other_processor = p + + node = processor.node + first_other_node = first_other_processor.node + second_other_node = second_other_processor.node + + with first_other_node.account.monitor_log(first_other_processor.LOG_FILE) as first_other_monitor: + with second_other_node.account.monitor_log(second_other_processor.LOG_FILE) as second_other_monitor: + # stop processor + processor.stop() + node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) + + node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + "." + str(counter), allow_fail=False) + node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + "." + str(counter), allow_fail=False) + node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + "." + str(counter), allow_fail=False) + self.leader_counter[processor] = 0 + + with node.account.monitor_log(processor.LOG_FILE) as log_monitor: + processor.set_upgrade_to("future_version") + processor.start() + self.old_processors.remove(processor) + self.upgraded_processors.append(processor) + + current_generation = current_generation + 1 + + log_monitor.wait_until("Kafka version : " + str(DEV_VERSION), + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account)) + log_monitor.offset = 5 + log_monitor.wait_until("partition\.assignment\.strategy = \[org\.apache\.kafka\.streams\.tests\.StreamsUpgradeTest$FutureStreamsPartitionAssignor\]", + timeout_sec=60, + err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account)) + + log_monitor.wait_until("Successfully joined group with generation " + str(current_generation), + timeout_sec=60, + err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(node.account)) + first_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation), + timeout_sec=60, + err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(first_other_node.account)) + second_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation), + timeout_sec=60, + err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(second_other_node.account)) + + if processor == self.leader: + self.update_leader() + else: + self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1 + + if processor == self.leader: + leader_monitor = log_monitor + elif first_other_processor == self.leader: + leader_monitor = first_other_monitor + elif second_other_processor == self.leader: + leader_monitor = second_other_monitor + else: + raise Exception("Could not identify leader.") + + monitors = {} + monitors[processor] = log_monitor + monitors[first_other_processor] = first_other_monitor + monitors[second_other_processor] = second_other_monitor + + leader_monitor.wait_until("Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).", + timeout_sec=60, + err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) + + if len(self.old_processors) > 0: + log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) + else: + log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Setting subscription metadata to leaders supported version 4 and trigger new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) + first_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.", + timeout_sec=60, + err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account)) + second_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.", + timeout_sec=60, + err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account)) + + log_monitor.wait_until("Version probing detected. Triggering new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) + + # version probing should trigger second rebalance + current_generation = current_generation + 1 + + for p in self.processors: + monitors[p].wait_until("Successfully joined group with generation " + str(current_generation), + timeout_sec=60, + err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(p.node.account)) + + if processor == self.leader: + self.update_leader() + else: + self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1 + + if self.leader in self.old_processors or len(self.old_processors) > 0: + self.verify_metadata_no_upgraded_yet() + + return current_generation + + def verify_metadata_no_upgraded_yet(self): + for p in self.processors: + found = list(p.node.account.ssh_capture("grep \"Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) + if len(found) > 0: + raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'") diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 7823efac1d4b1..0ed29a34968a0 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -61,7 +61,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("2.0.0-SNAPSHOT") # 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1")