diff --git a/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataConstants.java b/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataConstants.java index bb0cd875f0c67..756cf5f22763e 100644 --- a/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataConstants.java +++ b/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataConstants.java @@ -23,12 +23,16 @@ import com.google.common.collect.Lists; import io.streamnative.kop.coordinator.group.GroupMetadataManager.BaseKey; import io.streamnative.kop.coordinator.group.GroupMetadataManager.GroupMetadataKey; +import io.streamnative.kop.coordinator.group.GroupMetadataManager.GroupTopicPartition; +import io.streamnative.kop.coordinator.group.GroupMetadataManager.OffsetKey; +import io.streamnative.kop.offset.OffsetAndMetadata; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.BoundField; import org.apache.kafka.common.protocol.types.Field; @@ -199,6 +203,30 @@ private static Map asMap(KeyValue ...kvs) { )); } + /** + * Generates the key for offset commit message for given (group, topic, partition). + * + * @return key for offset commit message + */ + static byte[] offsetCommitKey(String group, + TopicPartition topicPartition) { + return offsetCommitKey(group, topicPartition, (short) 0); + } + + static byte[] offsetCommitKey(String group, + TopicPartition topicPartition, + short versionId) { + Struct key = new Struct(CURRENT_OFFSET_KEY_SCHEMA); + key.set(OFFSET_KEY_GROUP_FIELD, group); + key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic()); + key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition()); + + ByteBuffer byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf()); + byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION); + key.writeTo(byteBuffer); + return byteBuffer.array(); + } + /** * Generates the key for group metadata message for given group. * @@ -213,6 +241,30 @@ static byte[] groupMetadataKey(String group) { return byteBuffer.array(); } + /** + * Generates the payload for offset commit message from given offset and metadata. + * + * @param offsetAndMetadata consumer's current offset and metadata + * @return payload for offset commit message + */ + static byte[] offsetCommitValue(OffsetAndMetadata offsetAndMetadata) { + Struct value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA); + value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset()); + value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata()); + value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp()); + value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp()); + ByteBuffer byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf()); + byteBuffer.putShort(CURRENT_GROUP_VALUE_SCHEMA_VERSION); + value.writeTo(byteBuffer); + return byteBuffer.array(); + } + + + static byte[] groupMetadataValue(GroupMetadata groupMetadata, + Map assignment) { + return groupMetadataValue(groupMetadata, assignment, (short) 0); + } + /** * Generates the payload for group metadata message from given offset and metadata * assuming the generation id, selected protocol, leader and member assignment are all available. @@ -285,7 +337,17 @@ static BaseKey readMessageKey(ByteBuffer buffer) { if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) { // version 0 and 1 refer to offset - throw new UnsupportedOperationException(); + String group = key.getString(OFFSET_KEY_GROUP_FIELD); + String topic = key.getString(OFFSET_KEY_TOPIC_FIELD); + int partition = key.getInt(OFFSET_KEY_PARTITION_FIELD); + return new OffsetKey( + version, + new GroupTopicPartition( + group, + topic, + partition + ) + ); } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) { // version 2 refers to group String group = key.getString(GROUP_KEY_GROUP_FIELD); @@ -296,6 +358,33 @@ static BaseKey readMessageKey(ByteBuffer buffer) { } } + static OffsetAndMetadata readOffsetMessageValue(ByteBuffer buffer) { + if (null == buffer) { + return null; + } + + short version = buffer.getShort(); + Schema valueSchema = schemaForOffset(version); + Struct value = valueSchema.read(buffer); + + if (version == 0) { + long offset = value.getLong(OFFSET_VALUE_OFFSET_FIELD_V0); + String metadata = value.getString(OFFSET_VALUE_METADATA_FIELD_V0); + long timestamp = value.getLong(OFFSET_VALUE_TIMESTAMP_FIELD_V0); + + return OffsetAndMetadata.apply(offset, metadata, timestamp); + } else if (version == 1){ + long offset = value.getLong(OFFSET_VALUE_OFFSET_FIELD_V1); + String metadata = value.getString(OFFSET_VALUE_METADATA_FIELD_V1); + long commitTimestamp = value.getLong(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1); + long expireTimestamp = value.getLong(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1); + + return OffsetAndMetadata.apply(offset, metadata, commitTimestamp, expireTimestamp); + } else { + throw new IllegalStateException("Unknown offset message version " + version); + } + } + static GroupMetadata readGroupMessageValue(String groupId, ByteBuffer buffer) { if (null == buffer) { // tombstone diff --git a/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataManager.java b/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataManager.java index 2eedbda527964..1adb1b68bb420 100644 --- a/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataManager.java +++ b/src/main/java/io/streamnative/kop/coordinator/group/GroupMetadataManager.java @@ -13,41 +13,75 @@ */ package io.streamnative.kop.coordinator.group; +import static com.google.common.base.Preconditions.checkArgument; import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.CURRENT_GROUP_VALUE_SCHEMA_VERSION; import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.groupMetadataKey; import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.groupMetadataValue; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.offsetCommitKey; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.offsetCommitValue; import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.readGroupMessageValue; import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.readMessageKey; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.readOffsetMessageValue; import static io.streamnative.kop.utils.CoreUtils.inLock; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.streamnative.kop.coordinator.group.GroupMetadata.CommitRecordMetadataAndOffset; import io.streamnative.kop.offset.OffsetAndMetadata; +import io.streamnative.kop.utils.CoreUtils; +import io.streamnative.kop.utils.MessageIdUtils; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.common.hash.Murmur3; import org.apache.bookkeeper.common.util.MathUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; import org.apache.kafka.common.utils.Time; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.MessageIdImpl; /** * Manager to manage a coordination group. @@ -80,6 +114,23 @@ public String toString() { } + /** + * Key to index offset. + */ + @Data + @Accessors(fluent = true) + static class OffsetKey implements BaseKey { + + private final short version; + private final GroupTopicPartition key; + + @Override + public String toString() { + return key.toString(); + } + + } + /** * The group on a topic partition. */ @@ -107,6 +158,8 @@ public String toString() { } + private final byte magicValue = RecordBatch.CURRENT_MAGIC_VALUE; + private final CompressionType compressionType; private final OffsetConfig config; private final ConcurrentMap groupMetadataCache; /* lock protecting access to loading and owned partition sets */ @@ -122,6 +175,9 @@ public String toString() { private final AtomicBoolean shuttingDown = new AtomicBoolean(false); private final int groupMetadataTopicPartitionCount; + + /* single-thread scheduler to handle offset/group metadata cache loading and unloading */ + private final ScheduledExecutorService scheduler; /** * The groups with open transactional offsets commits per producer. We need this because when the commit or abort * marker comes in for a transaction, it is for a particular partition on the offsets topic and a particular @@ -130,23 +186,42 @@ public String toString() { */ private final Map> openGroupsForProducer = new HashMap<>(); - private final Producer metadataTopicProducer; - private final Reader metadataTopicReader; + private final Producer metadataTopicProducer; + private final Reader metadataTopicReader; private final Time time; GroupMetadataManager(int groupMetadataTopicPartitionCount, OffsetConfig config, - Producer metadataTopicProducer, - Reader metadataTopicConsumer, + Producer metadataTopicProducer, + Reader metadataTopicConsumer, + ScheduledExecutorService scheduler, Time time) { this.config = config; + this.compressionType = config.offsetsTopicCompressionType(); this.groupMetadataCache = new ConcurrentHashMap<>(); this.groupMetadataTopicPartitionCount = groupMetadataTopicPartitionCount; this.metadataTopicProducer = metadataTopicProducer; this.metadataTopicReader = metadataTopicConsumer; + this.scheduler = scheduler; this.time = time; } + public void startup(boolean enableMetadataExpiration) { + if (enableMetadataExpiration) { + scheduler.scheduleAtFixedRate( + this::cleanupGroupMetadata, + config.offsetsRetentionCheckIntervalMs(), + config.offsetsRetentionCheckIntervalMs(), + TimeUnit.MILLISECONDS + ); + } + } + + public void shutdown() { + shuttingDown.set(true); + scheduler.shutdown(); + } + public Iterable currentGroups() { return groupMetadataCache.values(); } @@ -220,20 +295,51 @@ public GroupMetadata addGroup(GroupMetadata group) { public CompletableFuture storeGroup(GroupMetadata group, Map groupAssignment) { + + TimestampType timestampType = TimestampType.CREATE_TIME; long timestamp = time.milliseconds(); byte[] key = groupMetadataKey(group.groupId()); byte[] value = groupMetadataValue( group, groupAssignment, CURRENT_GROUP_VALUE_SCHEMA_VERSION); - return metadataTopicProducer.newMessage() + // construct the record + ByteBuffer buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes( + magicValue, + compressionType, + Lists.newArrayList(new SimpleRecord(timestamp, key, value)) + )); + MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder( + buffer, + magicValue, + compressionType, + timestampType, + 0L + ); + recordsBuilder.append(timestamp, key, value); + MemoryRecords records = recordsBuilder.build(); + + return metadataTopicProducer + .newMessage() .keyBytes(key) - .value(value) + .value(records.buffer()) .eventTime(timestamp) .sendAsync() .thenApply(msgId -> Errors.NONE) .exceptionally(cause -> Errors.COORDINATOR_NOT_AVAILABLE); } + // visible for mock + CompletableFuture storeOffsetMessage(byte[] key, + ByteBuffer buffer, + long timestamp) { + return metadataTopicProducer + .newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(timestamp) + .sendAsync(); + } + public CompletableFuture> storeOffsets( GroupMetadata group, String consumerId, @@ -256,15 +362,255 @@ public CompletableFuture> storeOffsets( short producerEpoch ) { // first filter out partitions with offset metadata size exceeding limit - // Map filteredOffsetMetadata = - // offsetMetadata.entrySet().stream() - // .filter(entry -> validateOffsetMetadataLength(entry.getValue().metadata())) - // .collect(Collectors.toMap( - // e -> e.getKey(), - // e -> e.getValue() - // )); + Map filteredOffsetMetadata = + offsetMetadata.entrySet().stream() + .filter(entry -> validateOffsetMetadataLength(entry.getValue().metadata())) + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue() + )); + + group.inLock(() -> { + if (!group.hasReceivedConsistentOffsetCommits()) { + log.warn("group: {} with leader: {} has received offset commits from consumers as well " + + "as transactional producers. Mixing both types of offset commits will generally" + + " result in surprises and should be avoided.", + group.groupId(), group.leaderOrNull()); + } + return null; + }); + + boolean isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID; + // construct the message set to append + if (filteredOffsetMetadata.isEmpty()) { + // compute the final error codes for the commit response + Map commitStatus = offsetMetadata.entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> Errors.OFFSET_METADATA_TOO_LARGE + )); + return CompletableFuture.completedFuture(commitStatus); + } + + TimestampType timestampType = TimestampType.CREATE_TIME; + long timestamp = time.milliseconds(); + List records = filteredOffsetMetadata.entrySet().stream() + .map(e -> { + byte[] key = offsetCommitKey(group.groupId(), e.getKey()); + byte[] value = offsetCommitValue(e.getValue()); + return new SimpleRecord(timestamp, key, value); + }) + .collect(Collectors.toList()); + + ByteBuffer buffer = ByteBuffer.allocate( + AbstractRecords.estimateSizeInBytes( + magicValue, compressionType, records + ) + ); + + MemoryRecordsBuilder builder = MemoryRecords.builder( + buffer, magicValue, compressionType, + timestampType, 0L, timestamp, + producerId, + producerEpoch, + 0, + isTxnOffsetCommit, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + records.forEach(builder::append); + + MemoryRecords entries = builder.build(); + + if (isTxnOffsetCommit) { + group.inLock(() -> { + addProducerGroup(producerId, group.groupId()); + group.prepareTxnOffsetCommit(producerId, offsetMetadata); + return null; + }); + } else { + group.inLock(() -> { + group.prepareOffsetCommit(offsetMetadata); + return null; + }); + } + + // dummy offset commit key + byte[] key = offsetCommitKey(group.groupId(), new TopicPartition("", -1)); + return storeOffsetMessage(key, entries.buffer(), timestamp) + .thenApply(messageId -> { + if (!group.is(GroupState.Dead)) { + MessageIdImpl lastMessageId = (MessageIdImpl) messageId; + long baseOffset = MessageIdUtils.getOffset( + lastMessageId.getLedgerId(), + lastMessageId.getEntryId() + ); + filteredOffsetMetadata.forEach((tp, offsetAndMetadata) -> { + CommitRecordMetadataAndOffset commitRecordMetadataAndOffset = + new CommitRecordMetadataAndOffset( + Optional.of(baseOffset), + offsetAndMetadata + ); + if (isTxnOffsetCommit) { + group.onTxnOffsetCommitAppend(producerId, tp, commitRecordMetadataAndOffset); + } else { + group.onOffsetCommitAppend(tp, commitRecordMetadataAndOffset); + } + }); + } + return Errors.NONE; + }) + .exceptionally(cause -> { + if (!group.is(GroupState.Dead)) { + if (!group.hasPendingOffsetCommitsFromProducer(producerId)) { + removeProducerGroup(producerId, group.groupId()); + } + filteredOffsetMetadata.forEach((tp, offsetAndMetadata) -> { + if (isTxnOffsetCommit) { + group.failPendingTxnOffsetCommit(producerId, tp); + } else { + group.failPendingOffsetWrite(tp, offsetAndMetadata); + } + }); + } + + if (log.isDebugEnabled()) { + log.debug("Offset commit {} from group {}, consumer {} with generation {} failed" + + " when appending to log due to ", + filteredOffsetMetadata, group.groupId(), consumerId, group.generationId(), cause); + } + + return Errors.UNKNOWN_SERVER_ERROR; + }) + .thenApply(errors -> offsetMetadata.entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> { + if (validateOffsetMetadataLength(e.getValue().metadata())) { + return errors; + } else { + return Errors.OFFSET_METADATA_TOO_LARGE; + } + } + )) + ); + } + + /** + * The most important guarantee that this API provides is that it should never return a stale offset. + * i.e., it either returns the current offset or it begins to sync the cache from the log + * (and returns an error code). + */ + public Map getOffsets( + String groupId, Optional> topicPartitionsOpt + ) { + if (log.isTraceEnabled()) { + log.trace("Getting offsets of {} for group {}.", + topicPartitionsOpt.map(List::toString).orElse("all partitions"), + groupId); + } + + GroupMetadata group = groupMetadataCache.get(groupId); + if (null == group) { + return topicPartitionsOpt.orElse(Collections.emptyList()) + .stream() + .collect(Collectors.toMap( + tp -> tp, + tp -> new PartitionData( + OffsetFetchResponse.INVALID_OFFSET, + "", + Errors.NONE + ) + )); + } + + return group.inLock(() -> { + if (group.is(GroupState.Dead)) { + return topicPartitionsOpt.orElse(Collections.emptyList()) + .stream() + .collect(Collectors.toMap( + tp -> tp, + tp -> new PartitionData( + OffsetFetchResponse.INVALID_OFFSET, + "", + Errors.NONE + ) + )); + } + + return topicPartitionsOpt.map(topicPartitions -> + topicPartitions.stream() + .collect(Collectors.toMap( + tp -> tp, + topicPartition -> + group.offset(topicPartition) + .map(offsetAndMetadata -> new PartitionData( + offsetAndMetadata.offset(), + offsetAndMetadata.metadata(), + Errors.NONE + )) + .orElseGet(() -> new PartitionData( + OffsetFetchResponse.INVALID_OFFSET, + "", + Errors.NONE + )))) + ).orElseGet(() -> + group.allOffsets().entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> { + OffsetAndMetadata oam = e.getValue(); + return new PartitionData( + oam.offset(), + oam.metadata(), + Errors.NONE + ); + } + )) + ); + }); + } + - throw new UnsupportedOperationException(); + private void addProducerGroup(long producerId, + String groupId) { + synchronized (openGroupsForProducer) { + openGroupsForProducer.computeIfAbsent(producerId, (pid) -> new HashSet<>()) + .add(groupId); + } + } + + private void removeProducerGroup(long producerId, + String groupId) { + synchronized (openGroupsForProducer) { + Set groups = openGroupsForProducer.get(producerId); + if (null != groups) { + groups.remove(groupId); + if (groups.isEmpty()) { + openGroupsForProducer.remove(producerId); + } + } + openGroupsForProducer.computeIfAbsent(producerId, (pid) -> new HashSet<>()) + .remove(producerId); + } + } + + private Set groupsBelongingToPartitions(long producerId, + Set partitions) { + synchronized (openGroupsForProducer) { + return openGroupsForProducer.computeIfAbsent(producerId, (pid) -> new HashSet<>()) + .stream() + .filter(group -> partitions.contains(partitionFor(group))) + .collect(Collectors.toSet()); + } + } + + private void removeGroupFromAllProducers(String groupId) { + synchronized (openGroupsForProducer) { + openGroupsForProducer.forEach((pid, groups) -> groups.remove(groupId)); + } } /* @@ -283,11 +629,16 @@ public CompletableFuture scheduleLoadGroupAndOffsets(int offsetsPartition, log.info("Scheduling loading of offsets and group metadata from {}", topicPartition); long startMs = time.milliseconds(); return metadataTopicProducer.newMessage() - .value(new byte[0]) + .value(ByteBuffer.allocate(0)) .eventTime(time.milliseconds()) .sendAsync() - .thenCompose(lastMessageId -> - doLoadGroupsAndOffsets(metadataTopicReader, lastMessageId, onGroupLoaded)) + .thenCompose(lastMessageId -> { + if (log.isTraceEnabled()) { + log.trace("Successfully write a placeholder record into {} @ {}", + topicPartition, lastMessageId); + } + return doLoadGroupsAndOffsets(metadataTopicReader, lastMessageId, onGroupLoaded); + }) .whenComplete((ignored, cause) -> { if (null == cause) { log.info("Finished loading offsets and group metadata from {} in {} milliseconds", @@ -308,10 +659,12 @@ public CompletableFuture scheduleLoadGroupAndOffsets(int offsetsPartition, } private CompletableFuture doLoadGroupsAndOffsets( - Reader metadataConsumer, + Reader metadataConsumer, MessageId endMessageId, Consumer onGroupLoaded ) { + final Map loadedOffsets = new HashMap<>(); + final Map> pendingOffsets = new HashMap<>(); final Map loadedGroups = new HashMap<>(); final Set removedGroups = new HashSet<>(); final CompletableFuture resultFuture = new CompletableFuture<>(); @@ -321,19 +674,67 @@ private CompletableFuture doLoadGroupsAndOffsets( endMessageId, resultFuture, onGroupLoaded, + loadedOffsets, + pendingOffsets, loadedGroups, removedGroups); return resultFuture; } - private void loadNextMetadataMessage(Reader metadataConsumer, + private void loadNextMetadataMessage(Reader metadataConsumer, MessageId endMessageId, CompletableFuture resultFuture, Consumer onGroupLoaded, + Map loadedOffsets, + Map> + pendingOffsets, Map loadedGroups, Set removedGroups) { - metadataConsumer.readNextAsync().whenComplete((message, cause) -> { + try { + unsafeLoadNextMetadataMessage( + metadataConsumer, + endMessageId, + resultFuture, + onGroupLoaded, + loadedOffsets, + pendingOffsets, + loadedGroups, + removedGroups + ); + } catch (Throwable cause) { + log.error("Unknown exception caught when loading group and offsets from topic {}", + metadataConsumer.getTopic(), cause); + resultFuture.completeExceptionally(cause); + } + } + + private void unsafeLoadNextMetadataMessage(Reader metadataConsumer, + MessageId endMessageId, + CompletableFuture resultFuture, + Consumer onGroupLoaded, + Map loadedOffsets, + Map> + pendingOffsets, + Map loadedGroups, + Set removedGroups) { + if (shuttingDown.get()) { + resultFuture.completeExceptionally( + new Exception("Group metadata manager is shutting down")); + return; + } + + if (log.isTraceEnabled()) { + log.trace("Reading the next metadata message from topic {}", + metadataConsumer.getTopic()); + } + + BiConsumer, Throwable> readNextComplete = (message, cause) -> { + if (log.isTraceEnabled()) { + log.trace("Metadata consumer received a metadata message from {} @ {}", + metadataConsumer.getTopic(), message.getMessageId()); + } + if (null != cause) { resultFuture.completeExceptionally(cause); return; @@ -344,6 +745,8 @@ private void loadNextMetadataMessage(Reader metadataConsumer, processLoadedAndRemovedGroups( resultFuture, onGroupLoaded, + loadedOffsets, + pendingOffsets, loadedGroups, removedGroups ); @@ -357,65 +760,237 @@ private void loadNextMetadataMessage(Reader metadataConsumer, endMessageId, resultFuture, onGroupLoaded, + loadedOffsets, + pendingOffsets, loadedGroups, removedGroups ); return; } - BaseKey baseKey = readMessageKey(ByteBuffer.wrap(message.getKeyBytes())); - if (baseKey instanceof GroupMetadataKey) { - // load group metadata - GroupMetadataKey gmKey = (GroupMetadataKey) baseKey; - String groupId = gmKey.key(); - byte[] data = message.getValue(); - if (null == data || data.length == 0) { - // null value - loadedGroups.remove(groupId); - removedGroups.add(groupId); + ByteBuffer buffer = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(buffer); + + memRecords.batches().forEach(batch -> { + boolean isTxnOffsetCommit = batch.isTransactional(); + if (batch.isControlBatch()) { + Iterator recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + ControlRecordType controlRecord = ControlRecordType.parse(record.key()); + if (controlRecord == ControlRecordType.COMMIT) { + pendingOffsets.getOrDefault(batch.producerId(), Collections.emptyMap()) + .forEach((groupTopicPartition, commitRecordMetadataAndOffset) -> { + if (!loadedOffsets.containsKey(groupTopicPartition) + || loadedOffsets.get(groupTopicPartition) + .olderThan(commitRecordMetadataAndOffset)) { + loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset); + } + }); + } + pendingOffsets.remove(batch.producerId()); + } } else { - GroupMetadata groupMetadata = readGroupMessageValue( - groupId, - ByteBuffer.wrap(message.getValue()) - ); - if (null != groupMetadata) { - removedGroups.remove(groupId); - loadedGroups.put(groupId, groupMetadata); - } else { - loadedGroups.remove(groupId); - removedGroups.add(groupId); + Optional batchBaseOffset = Optional.empty(); + for (Record record : batch) { + checkArgument(record.hasKey(), "Group metadata/offset entry key should not be null"); + if (!batchBaseOffset.isPresent()) { + batchBaseOffset = Optional.of(record.offset()); + } + BaseKey bk = readMessageKey(record.key()); + + if (log.isTraceEnabled()) { + log.trace("Applying metadata record {} received from {}", + bk, metadataConsumer.getTopic()); + } + + if (bk instanceof OffsetKey) { + OffsetKey offsetKey = (OffsetKey) bk; + if (isTxnOffsetCommit && !pendingOffsets.containsKey(batch.producerId())) { + pendingOffsets.put( + batch.producerId(), + new HashMap<>() + ); + } + // load offset + GroupTopicPartition groupTopicPartition = offsetKey.key(); + if (!record.hasValue()) { + if (isTxnOffsetCommit) { + pendingOffsets.get(batch.producerId()).remove(groupTopicPartition); + } else { + loadedOffsets.remove(groupTopicPartition); + } + } else { + OffsetAndMetadata offsetAndMetadata = readOffsetMessageValue(record.value()); + CommitRecordMetadataAndOffset commitRecordMetadataAndOffset = + new CommitRecordMetadataAndOffset( + batchBaseOffset, + offsetAndMetadata + ); + if (isTxnOffsetCommit) { + pendingOffsets.get(batch.producerId()).put( + groupTopicPartition, + commitRecordMetadataAndOffset); + } else { + loadedOffsets.put( + groupTopicPartition, + commitRecordMetadataAndOffset + ); + } + } + } else if (bk instanceof GroupMetadataKey) { + GroupMetadataKey groupMetadataKey = (GroupMetadataKey) bk; + String gid = groupMetadataKey.key(); + GroupMetadata gm = readGroupMessageValue(gid, record.value()); + if (gm != null) { + removedGroups.remove(gid); + loadedGroups.put(gid, gm); + } else { + loadedGroups.remove(gid); + removedGroups.add(gid); + } + } else { + resultFuture.completeExceptionally( + new IllegalStateException( + "Unexpected message key " + bk + " while loading offsets and group metadata")); + return; + } } } - loadNextMetadataMessage( - metadataConsumer, - endMessageId, - resultFuture, - onGroupLoaded, - loadedGroups, - removedGroups - ); - } else { - resultFuture.completeExceptionally( - new IllegalStateException("Unexpected message key " - + baseKey + " while loading offsets and group metadata")); - } + }); + loadNextMetadataMessage( + metadataConsumer, + endMessageId, + resultFuture, + onGroupLoaded, + loadedOffsets, + pendingOffsets, + loadedGroups, + removedGroups + ); + }; + + metadataConsumer.readNextAsync().whenComplete((message, cause) -> { + try { + readNextComplete.accept(message, cause); + } catch (Throwable completeCause) { + log.error("Unknown exception caught when processing the received metadata message from topic {}", + metadataConsumer.getTopic(), completeCause); + resultFuture.completeExceptionally(completeCause); + } }); } private void processLoadedAndRemovedGroups(CompletableFuture resultFuture, Consumer onGroupLoaded, + Map loadedOffsets, + Map> + pendingOffsets, Map loadedGroups, Set removedGroups) { + if (log.isTraceEnabled()) { + log.trace("Completing loading : {} loaded groups, {} removed groups, {} loaded offsets, {} pending offsets", + loadedGroups.size(), removedGroups.size(), loadedOffsets.size(), pendingOffsets.size()); + } try { + Map> groupLoadedOffsets = + loadedOffsets.entrySet().stream() + .collect(Collectors.groupingBy( + e -> e.getKey().group(), + Collectors.toMap( + f -> f.getKey().topicPartition(), + f -> f.getValue() + )) + ); + Map>> partitionedLoadedOffsets = + CoreUtils.partition(groupLoadedOffsets, group -> loadedGroups.containsKey(group)); + Map> groupOffsets = + partitionedLoadedOffsets.get(true); + Map> emptyGroupOffsets = + partitionedLoadedOffsets.get(false); + + Map>> pendingOffsetsByGroup = + new HashMap<>(); + pendingOffsets.forEach((producerId, producerOffsets) -> { + producerOffsets.keySet().stream() + .map(GroupTopicPartition::group) + .forEach(group -> addProducerGroup(producerId, group)); + producerOffsets + .entrySet() + .stream() + .collect(Collectors.groupingBy( + e -> e.getKey().group, + Collectors.toMap( + f -> f.getKey().topicPartition(), + f -> f.getValue() + ) + )) + .forEach((group, offsets) -> { + Map> groupPendingOffsets = + pendingOffsetsByGroup.computeIfAbsent( + group, + g -> new HashMap<>()); + Map groupProducerOffsets = + groupPendingOffsets.computeIfAbsent( + producerId, + p -> new HashMap<>()); + groupProducerOffsets.putAll(offsets); + }); + }); + + Map>>> + partitionedPendingOffsetsByGroup = CoreUtils.partition( + pendingOffsetsByGroup, + group -> loadedGroups.containsKey(group) + ); + Map>> pendingGroupOffsets = + partitionedPendingOffsetsByGroup.get(true); + Map>> pendingEmptyGroupOffsets = + partitionedPendingOffsetsByGroup.get(false); + loadedGroups.values().forEach(group -> { - loadGroup(group); + Map offsets = + groupOffsets.getOrDefault(group.groupId(), Collections.emptyMap()); + Map> pOffsets = + pendingGroupOffsets.getOrDefault(group.groupId(), Collections.emptyMap()); + + if (log.isDebugEnabled()) { + log.debug("Loaded group metadata {} with offsets {} and pending offsets {}", + group, offsets, pOffsets); + } + + loadGroup(group, offsets, pOffsets); + onGroupLoaded.accept(group); + }); + + Sets.union( + emptyGroupOffsets.keySet(), + pendingEmptyGroupOffsets.keySet() + ).forEach(groupId -> { + GroupMetadata group = new GroupMetadata(groupId, GroupState.Empty); + Map offsets = + emptyGroupOffsets.getOrDefault(groupId, Collections.emptyMap()); + Map> pOffsets = + pendingEmptyGroupOffsets.getOrDefault(groupId, Collections.emptyMap()); + if (log.isDebugEnabled()) { + log.debug("Loaded group metadata {} with offsets {} and pending offsets {}", + group, offsets, pOffsets); + } + loadGroup(group, offsets, pOffsets); onGroupLoaded.accept(group); }); removedGroups.forEach(groupId -> { - // TODO: add offsets later + // if the cache already contains a group which should be removed, raise an error. Note that it + // is possible (however unlikely) for a consumer group to be removed, and then to be used only for + // offset storage (i.e. by "simple" consumers) + if (groupMetadataCache.containsKey(groupId) + && !emptyGroupOffsets.containsKey(groupId)) { + throw new IllegalStateException("Unexpected unload of active group " + groupId + + " while loading partition"); + } }); resultFuture.complete(null); } catch (RuntimeException re) { @@ -423,7 +998,42 @@ private void processLoadedAndRemovedGroups(CompletableFuture resultFuture, } } - private void loadGroup(GroupMetadata group) { + private void loadGroup(GroupMetadata group, + Map offsets, + Map> pendingTransactionalOffsets) { + // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent + // view of the group's offsets + Map loadedOffsets = CoreUtils.mapValue( + offsets, + commitRecordMetadataAndOffset -> { + OffsetAndMetadata offsetAndMetadata = commitRecordMetadataAndOffset.offsetAndMetadata(); + OffsetAndMetadata updatedOffsetAndMetadata; + if (offsetAndMetadata.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP) { + long expireTimestamp = offsetAndMetadata.commitTimestamp() + config.offsetsRetentionMs(); + updatedOffsetAndMetadata = OffsetAndMetadata.apply( + offsetAndMetadata.offset(), + offsetAndMetadata.metadata(), + offsetAndMetadata.commitTimestamp(), + expireTimestamp + ); + } else { + updatedOffsetAndMetadata = offsetAndMetadata; + } + return new CommitRecordMetadataAndOffset( + commitRecordMetadataAndOffset.appendedBatchOffset(), + updatedOffsetAndMetadata + ); + } + ); + if (log.isTraceEnabled()) { + log.trace("Initialized offsets {} from group {}", + loadedOffsets, group.groupId()); + } + group.initializeOffsets( + loadedOffsets, + pendingTransactionalOffsets + ); + GroupMetadata currentGroup = addGroup(group); if (group != currentGroup) { log.debug("Attempt to load group {} from log with generation {} failed " @@ -432,6 +1042,164 @@ private void loadGroup(GroupMetadata group) { } } + + /** + * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to + * that partition. + * + * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. + */ + public void removeGroupsForPartition(int offsetsPartition, + Consumer onGroupUnloaded) { + TopicPartition topicPartition = new TopicPartition( + GROUP_METADATA_TOPIC_NAME, offsetsPartition + ); + log.info("Scheduling unloading of offsets and group metadata from {}", topicPartition); + scheduler.submit(() -> { + AtomicInteger numOffsetsRemoved = new AtomicInteger(); + AtomicInteger numGroupsRemoved = new AtomicInteger(); + inLock(partitionLock, () -> { + // we need to guard the group removal in cache in the loading partition lock + // to prevent coordinator's check-and-get-group race condition + ownedPartitions.remove(offsetsPartition); + + for (GroupMetadata group : groupMetadataCache.values()) { + if (partitionFor(group.groupId()) == offsetsPartition) { + onGroupUnloaded.accept(group); + groupMetadataCache.remove(group.groupId(), group); + removeGroupFromAllProducers(group.groupId()); + numGroupsRemoved.incrementAndGet(); + numOffsetsRemoved.addAndGet(group.numOffsets()); + } + } + + return null; + }); + + log.info("Finished unloading {}. Removed {} cached offsets and {} cached groups.", + topicPartition, numOffsetsRemoved, numGroupsRemoved); + }); + } + + CompletableFuture cleanupGroupMetadata() { + final long startMs = time.milliseconds(); + return cleanGroupMetadata(groupMetadataCache.values().stream(), + group -> group.removeExpiredOffsets(time.milliseconds()) + ).thenAccept(offsetsRemoved -> + log.info("Removed {} expired offsets in {} milliseconds.", + offsetsRemoved, time.milliseconds() - startMs) + ); + } + + CompletableFuture cleanGroupMetadata(Stream groups, + Function> + selector) { + List> cleanFutures = groups.map(group -> { + String groupId = group.groupId(); + Triple, Boolean, Integer> result = group.inLock(() -> { + Map removedOffsets = selector.apply(group); + if (group.is(GroupState.Empty) && !group.hasOffsets()) { + log.info("Group {} transitioned to Dead in generation {}", + groupId, group.generationId()); + group.transitionTo(GroupState.Dead); + } + + return Triple.of( + removedOffsets, + group.is(GroupState.Dead), + group.generationId() + ); + }); + Map removedOffsets = result.getLeft(); + boolean groupIsDead = result.getMiddle(); + int generation = result.getRight(); + + TimestampType timestampType = TimestampType.CREATE_TIME; + long timestamp = time.milliseconds(); + List tombstones = new ArrayList<>(); + removedOffsets.forEach((topicPartition, offsetAndMetadata) -> { + byte[] commitKey = offsetCommitKey( + groupId, topicPartition + ); + tombstones.add(new SimpleRecord(timestamp, commitKey, null)); + }); + + // We avoid writing the tombstone when the generationId is 0, since this group is only using + // Kafka for offset storage. + if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) { + // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, + // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and + // retry removing this group. + byte[] groupMetadataKey = groupMetadataKey(group.groupId()); + tombstones.add(new SimpleRecord(timestamp, groupMetadataKey, null)); + } + + if (!tombstones.isEmpty()) { + MemoryRecords records = MemoryRecords.withRecords( + magicValue, 0L, compressionType, + timestampType, + tombstones.toArray(new SimpleRecord[tombstones.size()]) + ); + byte[] groupKey = groupMetadataKey( + group.groupId() + ); + return metadataTopicProducer.newMessage() + .keyBytes(groupKey) + .value(records.buffer()) + .eventTime(timestamp) + .sendAsync() + .thenApply(ignored -> removedOffsets.size()) + .exceptionally(cause -> { + log.error("Failed to append {} tombstones to {} for expired/deleted " + + "offsets and/or metadata for group {}", + tombstones.size(), metadataTopicProducer.getTopic(), + group.groupId(), cause); + // ignore and continue + return 0; + }); + } else { + return CompletableFuture.completedFuture(0); + } + }).collect(Collectors.toList()); + return FutureUtils.collect(cleanFutures) + .thenApply(removedList -> removedList.stream().mapToInt(Integer::intValue).sum()); + } + + /** + * Complete pending transactional offset commits of the groups of `producerId` from the provided + * `completedPartitions`. This method is invoked when a commit or abort marker is fully written + * to the log. It may be invoked when a group lock is held by the caller, for instance when delayed + * operations are completed while appending offsets for a group. Since we need to acquire one or + * more group metadata locks to handle transaction completion, this operation is scheduled on + * the scheduler thread to avoid deadlocks. + */ + public void scheduleHandleTxnCompletion(long producerId, + Set completedPartitions, + boolean isCommit) { + scheduler.submit(() -> handleTxnCompletion(producerId, completedPartitions, isCommit)); + } + + void handleTxnCompletion(long producerId, Set completedPartitions, boolean isCommit) { + Set pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions); + pendingGroups.forEach(groupId -> { + getGroup(groupId).map(group -> { + return group.inLock(() -> { + if (!group.is(GroupState.Dead)) { + group.completePendingTxnOffsetCommit(producerId, isCommit); + removeProducerGroup(producerId, groupId); + } + return null; + }); + }).orElseGet(() -> { + log.info("Group {} has moved away from this coordinator after transaction marker was written" + + " but before the cache was updated. The cache on the new group owner will be updated" + + " instead.", + groupId); + return null; + }); + }); + } + /** * Add the partition into the owned list. * diff --git a/src/main/java/io/streamnative/kop/coordinator/group/OffsetConfig.java b/src/main/java/io/streamnative/kop/coordinator/group/OffsetConfig.java index aaf4ba4c9419e..b1819db5996d3 100644 --- a/src/main/java/io/streamnative/kop/coordinator/group/OffsetConfig.java +++ b/src/main/java/io/streamnative/kop/coordinator/group/OffsetConfig.java @@ -28,10 +28,16 @@ public class OffsetConfig { private static final int DefaultMaxMetadataSize = 4096; + private static final long DefaultOffsetsRetentionMs = 24 * 60 * 60 * 1000L; + private static final long DefaultOffsetsRetentionCheckIntervalMs = 600000L; @Default private int maxMetadataSize = DefaultMaxMetadataSize; @Default private CompressionType offsetsTopicCompressionType = CompressionType.NONE; + @Default + private long offsetsRetentionMs = DefaultOffsetsRetentionMs; + @Default + private long offsetsRetentionCheckIntervalMs = DefaultOffsetsRetentionCheckIntervalMs; } diff --git a/src/main/java/io/streamnative/kop/utils/CoreUtils.java b/src/main/java/io/streamnative/kop/utils/CoreUtils.java index 3987086b3163c..fc42e011a85c7 100644 --- a/src/main/java/io/streamnative/kop/utils/CoreUtils.java +++ b/src/main/java/io/streamnative/kop/utils/CoreUtils.java @@ -13,9 +13,14 @@ */ package io.streamnative.kop.utils; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.experimental.UtilityClass; /** @@ -41,4 +46,24 @@ public static T inWriteLock(ReadWriteLock lock, Supplier supplier) { return inLock(lock.writeLock(), supplier); } + public static Map> partition(Map map, + Predicate predicate) { + return map.entrySet() + .stream() + .collect(Collectors.partitioningBy( + e -> predicate.test(e.getKey()), + Collectors.toMap(Entry::getKey, Entry::getValue) + )); + } + + public static Map mapValue(Map map, + Function func) { + return map.entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> func.apply(e.getValue()) + )); + } + } diff --git a/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java b/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java index 4880ed272b5ae..e7674e917d8e3 100644 --- a/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java +++ b/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java @@ -13,31 +13,72 @@ */ package io.streamnative.kop.coordinator.group; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.groupMetadataKey; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.groupMetadataValue; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.offsetCommitKey; +import static io.streamnative.kop.coordinator.group.GroupMetadataConstants.offsetCommitValue; +import static io.streamnative.kop.coordinator.group.GroupState.Empty; +import static io.streamnative.kop.coordinator.group.GroupState.PreparingRebalance; +import static io.streamnative.kop.coordinator.group.GroupState.Stable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.streamnative.kop.MockKafkaServiceBaseTest; +import io.streamnative.kop.coordinator.group.GroupMetadata.CommitRecordMetadataAndOffset; import io.streamnative.kop.coordinator.group.GroupMetadataManager.BaseKey; import io.streamnative.kop.coordinator.group.GroupMetadataManager.GroupMetadataKey; +import io.streamnative.kop.coordinator.group.GroupMetadataManager.GroupTopicPartition; +import io.streamnative.kop.coordinator.group.GroupMetadataManager.OffsetKey; +import io.streamnative.kop.offset.OffsetAndMetadata; import io.streamnative.kop.utils.MockTime; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -62,15 +103,21 @@ public class GroupMetadataManagerTest extends MockKafkaServiceBaseTest { MockTime time = null; GroupMetadataManager groupMetadataManager = null; - Producer producer = null; - Reader consumer = null; + Producer producer = null; + Reader consumer = null; OffsetConfig offsetConfig = OffsetConfig.builder().build(); + OrderedScheduler scheduler; @Before @Override public void setup() throws Exception { super.internalSetup(); - log.info("Admin : {}", admin); + + scheduler = OrderedScheduler.newSchedulerBuilder() + .name("test-scheduler") + .numThreads(1) + .build(); + admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); @@ -87,6 +134,7 @@ public void setup() throws Exception { offsetConfig, producer, consumer, + scheduler, time ); } @@ -97,6 +145,697 @@ public void cleanup() throws Exception { super.internalCleanup(); } + private List createCommittedOffsetRecords(Map committedOffsets, + String groupId) { + return committedOffsets.entrySet().stream().map(e -> { + OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.apply(e.getValue()); + byte[] offsetCommitKey = offsetCommitKey(groupId, e.getKey()); + byte[] offsetCommitValue = offsetCommitValue(offsetAndMetadata); + return new SimpleRecord(offsetCommitKey, offsetCommitValue); + }).collect(Collectors.toList()); + } + + private SimpleRecord buildStableGroupRecordWithMember(int generation, + String protocolType, + String protocol, + String memberId) { + return buildStableGroupRecordWithMember( + generation, + protocolType, + protocol, + memberId, + 0 + ); + } + + private SimpleRecord buildStableGroupRecordWithMember(int generation, + String protocolType, + String protocol, + String memberId, + int assignmentSize) { + Map memberProtocols = new HashMap<>(); + memberProtocols.put(protocol, new byte[0]); + MemberMetadata member = new MemberMetadata( + memberId, + groupId, + "clientId", + "clientHost", + 30000, + 10000, + protocolType, + memberProtocols); + + GroupMetadata group = GroupMetadata.loadGroup( + groupId, + Stable, + generation, + protocolType, + protocol, + memberId, + Lists.newArrayList(member) + ); + byte[] groupMetadataKey = groupMetadataKey(groupId); + Map assignments = new HashMap<>(); + assignments.put(memberId, new byte[0]); + byte[] groupMetadataValue = groupMetadataValue(group, assignments); + return new SimpleRecord(groupMetadataKey, groupMetadataValue); + } + + private SimpleRecord buildEmptyGroupRecord(int generation, + String protocolType) { + GroupMetadata group = GroupMetadata.loadGroup( + groupId, + Empty, + generation, + protocolType, + null, + null, + Collections.emptyList() + ); + byte[] groupMetadataKey = groupMetadataKey(groupId); + byte[] groupMetadataValue = groupMetadataValue( + group, Collections.emptyMap()); + return new SimpleRecord(groupMetadataKey, groupMetadataValue); + } + + private ByteBuffer newMemoryRecordsBuffer(List records) { + return newMemoryRecordsBuffer( + records, + -1L, + (short) -1, + false + ); + } + + private ByteBuffer newMemoryRecordsBuffer(List records, + long producerId, + short producerEpoch, + boolean isTxnOffsetCommit) { + TimestampType timestampType = TimestampType.CREATE_TIME; + long timestamp = time.milliseconds(); + + ByteBuffer buffer = ByteBuffer.allocate( + AbstractRecords.estimateSizeInBytes( + RecordBatch.CURRENT_MAGIC_VALUE, offsetConfig.offsetsTopicCompressionType(), records + ) + ); + + MemoryRecordsBuilder builder = MemoryRecords.builder( + buffer, RecordBatch.CURRENT_MAGIC_VALUE, offsetConfig.offsetsTopicCompressionType(), + timestampType, 0L, timestamp, + producerId, + producerEpoch, + 0, + isTxnOffsetCommit, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + records.forEach(builder::append); + return builder.build().buffer(); + } + + private int appendConsumerOffsetCommit(ByteBuffer buffer, + long baseOffset, + Map offsets) { + MemoryRecordsBuilder builder = + MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset); + List commitRecords = createCommittedOffsetRecords(offsets, groupId); + commitRecords.forEach(builder::append); + builder.build(); + return offsets.size(); + } + + private int appendTransactionalOffsetCommits(ByteBuffer buffer, + long producerId, + short producerEpoch, + long baseOffset, + Map offsets) { + MemoryRecordsBuilder builder = + MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true); + List commitRecords = createCommittedOffsetRecords(offsets, groupId); + commitRecords.forEach(builder::append); + builder.build(); + return offsets.size(); + } + + private int completeTransactionalOffsetCommit(ByteBuffer buffer, + long producerId, + short producerEpoch, + long baseOffset, + boolean isCommit) { + MemoryRecordsBuilder builder = MemoryRecords.builder( + buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, + TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), + producerId, producerEpoch, 0, true, true, + RecordBatch.NO_PARTITION_LEADER_EPOCH); + ControlRecordType controlRecordType; + if (isCommit) { + controlRecordType = ControlRecordType.COMMIT; + } else { + controlRecordType = ControlRecordType.ABORT; + } + builder.appendEndTxnMarker(time.milliseconds(), new EndTransactionMarker(controlRecordType, 0)); + builder.build(); + return 1; + } + + @Test + public void testLoadOffsetsWithoutGroup() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + List offsetCommitRecords = createCommittedOffsetRecords( + committedOffsets, + groupId + ); + ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords); + byte[] key = groupMetadataKey(groupId); + + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + assertEquals(committedOffsets.size(), group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset))); + }); + + } + + @Test + public void testLoadEmptyGroupWithOffsets() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + int generation = 15; + String protocolType = "consumer"; + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + List offsetCommitRecords = createCommittedOffsetRecords( + committedOffsets, + groupId + ); + offsetCommitRecords.add( + buildEmptyGroupRecord(generation, protocolType)); + + ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords); + byte[] key = groupMetadataKey(groupId); + + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + assertEquals(generation, group.generationId()); + assertEquals(Optional.of(protocolType), group.protocolType()); + assertEquals(committedOffsets.size(), group.allOffsets().size()); + assertNull(group.leaderOrNull()); + assertNull(group.protocolOrNull()); + committedOffsets.forEach((tp, offset) -> + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset))); + }); + } + + @Test + public void testLoadTransactionalOffsetsWithoutGroup() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long producerId = 1000L; + short producerEpoch = 2; + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + nextOffset += appendTransactionalOffsetCommits( + buffer, producerId, producerEpoch, nextOffset, committedOffsets + ); + completeTransactionalOffsetCommit( + buffer, producerId, producerEpoch, nextOffset, true + ); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + assertEquals(committedOffsets.size(), group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset))); + }); + } + + @Test + public void testDoNotLoadAbortedTransactionalOffsetCommits() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long producerId = 1000L; + short producerEpoch = 2; + + Map abortedOffsets = new HashMap<>(); + abortedOffsets.put( + new TopicPartition("foo", 0), 23L); + abortedOffsets.put( + new TopicPartition("foo", 1), 455L); + abortedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets); + completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, false); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> {} + ).get(); + Optional groupInCache = groupMetadataManager.getGroup(groupId); + assertFalse(groupInCache.isPresent()); + }); + } + + @Test + public void testGroupLoadedWithPendingCommits() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long producerId = 1000L; + short producerEpoch = 2; + + Map pendingOffsets = new HashMap<>(); + pendingOffsets.put( + new TopicPartition("foo", 0), 23L); + pendingOffsets.put( + new TopicPartition("foo", 1), 455L); + pendingOffsets.put( + new TopicPartition("bar", 0), 8992L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, pendingOffsets); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + GroupMetadata group = onLoadedFuture.get(); + assertSame(group, groupInCache); + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + // Ensure that no offsets are materialized, but that we have offsets pending. + assertEquals(0, group.allOffsets().size()); + assertTrue(group.hasOffsets()); + assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId)); + }); + } + + @Test + public void testLoadWithCommitedAndAbortedTransactionOffsetCommits() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long producerId = 1000L; + short producerEpoch = 2; + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + Map abortedOffsets = new HashMap<>(); + abortedOffsets.put( + new TopicPartition("foo", 2), 231L); + abortedOffsets.put( + new TopicPartition("foo", 3), 4551L); + abortedOffsets.put( + new TopicPartition("bar", 1), 89921L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets); + nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, false); + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets); + completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, true); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + GroupMetadata group = onLoadedFuture.get(); + assertSame(group, groupInCache); + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + // Ensure that only the committed offsets are materialized, and that there are no pending + // commits for the producer. This allows us to be certain that the aborted offset commits + // + // are truly discarded. + assertEquals(committedOffsets.size(), group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset))); + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)); + }); + } + + @Test + public void testLoadWithCommitedAndAbortedAndPendingTransactionOffsetCommits() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long producerId = 1000L; + short producerEpoch = 2; + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + Map abortedOffsets = new HashMap<>(); + abortedOffsets.put( + new TopicPartition("foo", 2), 231L); + abortedOffsets.put( + new TopicPartition("foo", 3), 4551L); + abortedOffsets.put( + new TopicPartition("bar", 1), 89921L); + + Map pendingOffsets = new HashMap<>(); + pendingOffsets.put( + new TopicPartition("foo", 3), 2312L); + pendingOffsets.put( + new TopicPartition("foo", 4), 45512L); + pendingOffsets.put( + new TopicPartition("bar", 2), 899212L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets); + nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, true); + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets); + nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, false); + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, pendingOffsets); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + GroupMetadata group = onLoadedFuture.get(); + assertSame(group, groupInCache); + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + + // Ensure that only the committed offsets are materialized, and that there are no pending commits + // for the producer. This allows us to be certain that the aborted offset commits are truly discarded. + assertEquals(committedOffsets.size(), group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset))); + + // We should have pending commits. + assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId)); + + // The loaded pending commits should materialize after a commit marker comes in. + groupMetadataManager.handleTxnCompletion( + producerId, + Sets.newHashSet(groupMetadataTopicPartition.partition()), + true); + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)); + pendingOffsets.forEach((tp, offset) -> + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset))); + }); + } + + @Test + public void testLoadTransactionalOffsetCommitsFromMultipleProducers() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long firstProducerId = 1000L; + short firstProducerEpoch = 2; + long secondProducerId = 1001L; + short secondProducerEpoch = 3; + + Map committedOffsetsFirstProducer = new HashMap<>(); + committedOffsetsFirstProducer.put( + new TopicPartition("foo", 0), 23L); + committedOffsetsFirstProducer.put( + new TopicPartition("foo", 1), 455L); + committedOffsetsFirstProducer.put( + new TopicPartition("bar", 0), 8992L); + + Map committedOffsetsSecondProducer = new HashMap<>(); + committedOffsetsSecondProducer.put( + new TopicPartition("foo", 2), 231L); + committedOffsetsSecondProducer.put( + new TopicPartition("foo", 3), 4551L); + committedOffsetsSecondProducer.put( + new TopicPartition("bar", 1), 89921L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + int firstProduceRecordOffset = nextOffset; + nextOffset += appendTransactionalOffsetCommits( + buffer, firstProducerId, firstProducerEpoch, nextOffset, committedOffsetsFirstProducer + ); + nextOffset += completeTransactionalOffsetCommit( + buffer, firstProducerId, firstProducerEpoch, nextOffset, true + ); + int secondProduceRecordOffset = nextOffset; + nextOffset += appendTransactionalOffsetCommits( + buffer, secondProducerId, secondProducerEpoch, nextOffset, committedOffsetsSecondProducer + ); + nextOffset += completeTransactionalOffsetCommit( + buffer, secondProducerId, secondProducerEpoch, nextOffset, true + ); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + + // Ensure that only the committed offsets are materialized, and that there are no pending commits + // for the producer. This allows us to be certain that the aborted offset commits are truly discarded. + assertEquals(committedOffsetsFirstProducer.size() + committedOffsetsSecondProducer.size(), + group.allOffsets().size()); + committedOffsetsFirstProducer.forEach((tp, offset) -> { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + assertEquals( + Optional.of((long) firstProduceRecordOffset), + group.offsetWithRecordMetadata(tp).flatMap(CommitRecordMetadataAndOffset::appendedBatchOffset)); + }); + committedOffsetsSecondProducer.forEach((tp, offset) -> { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + assertEquals( + Optional.of((long) secondProduceRecordOffset), + group.offsetWithRecordMetadata(tp).flatMap(CommitRecordMetadataAndOffset::appendedBatchOffset)); + }); + }); + } + + @Test + public void testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWins() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + long producerId = 1000L; + short producerEpoch = 2; + + Map transactionalOffsetCommits = new HashMap<>(); + transactionalOffsetCommits.put( + new TopicPartition("foo", 0), 23L); + + Map consumerOffsetCommits = new HashMap<>(); + consumerOffsetCommits.put( + new TopicPartition("foo", 0), 24L); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + int nextOffset = 0; + nextOffset += appendConsumerOffsetCommit( + buffer, nextOffset, consumerOffsetCommits + ); + nextOffset += appendTransactionalOffsetCommits( + buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits + ); + nextOffset += completeTransactionalOffsetCommit( + buffer, producerId, producerEpoch, nextOffset, true + ); + buffer.flip(); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + + // The group should be loaded with pending offsets. + assertEquals(1, group.allOffsets().size()); + assertTrue(group.hasOffsets()); + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)); + assertEquals(consumerOffsetCommits.size(), group.allOffsets().size()); + transactionalOffsetCommits.forEach((tp, offset) -> { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + }); + }); + } + @Test public void testGroupNotExits() { // group is not owned @@ -106,7 +845,7 @@ public void testGroupNotExits() { // group is owned but does not exist yet assertTrue(groupMetadataManager.groupNotExists(groupId)); - GroupMetadata group = new GroupMetadata(groupId, GroupState.Empty); + GroupMetadata group = new GroupMetadata(groupId, Empty); groupMetadataManager.addGroup(group); // group is owned but not Dead @@ -118,62 +857,345 @@ public void testGroupNotExits() { } @Test - public void testAddGroup() { - GroupMetadata group = new GroupMetadata("foo", GroupState.Empty); - assertEquals(group, groupMetadataManager.addGroup(group)); - assertEquals(group, groupMetadataManager.addGroup( - new GroupMetadata("foo", GroupState.Empty) + public void testLoadOffsetsWithTombstones() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + TopicPartition tombstonePartition = new TopicPartition("foo", 1); + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + tombstonePartition, 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + List offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, groupId); + SimpleRecord tombstone = new SimpleRecord( + offsetCommitKey(groupId, tombstonePartition), + null + ); + offsetCommitRecords.add(tombstone); + + ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + + // The group should be loaded with pending offsets. + assertEquals(committedOffsets.size() - 1, group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> { + if (tp == tombstonePartition) { + assertEquals(Optional.empty(), group.offset(tp)); + } else { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + } + }); + }); + } + + @Test + public void testLoadOffsetsAndGroup() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + int generation = 935; + String protocolType = "consumer"; + String protocol = "range"; + + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + List offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, groupId); + String memberId = "98098230493"; + SimpleRecord groupMetadataRecord = buildStableGroupRecordWithMember( + generation, + protocolType, + protocol, + memberId + ); + offsetCommitRecords.add(groupMetadataRecord); + + ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Stable, group.currentState()); + assertEquals(memberId, group.leaderOrNull()); + assertEquals(generation, group.generationId()); + assertEquals(Optional.of(protocolType), group.protocolType()); + assertEquals( + Lists.newArrayList(memberId), + group.allMembers().stream().collect(Collectors.toList())); + assertEquals( + committedOffsets.size(), + group.allOffsets().size() + ); + committedOffsets.forEach((tp, offset) -> { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + }); + }); + } + + @Test + public void testLoadGroupWithTombstone() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + int generation = 935; + String memberId = "98098230493"; + String protocolType = "consumer"; + String protocol = "range"; + + SimpleRecord groupMetadataRecord = buildStableGroupRecordWithMember( + generation, + protocolType, + protocol, + memberId + ); + SimpleRecord groupMetadataTombstone = new SimpleRecord( + groupMetadataKey(groupId), + null + ); + ByteBuffer buffer = newMemoryRecordsBuffer(Lists.newArrayList( + groupMetadataRecord, + groupMetadataTombstone )); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> {} + ).get(); + assertFalse(groupMetadataManager.getGroup(groupId).isPresent()); + }); } - /** - * A group metadata manager test runner. - */ - @FunctionalInterface - public interface GroupMetadataManagerTester { + @Test + public void testOffsetWriteAfterGroupRemoved() throws Exception { + // this test case checks the following scenario: + // 1. the group exists at some point in time, but is later removed (because all members left) + // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets - void test(GroupMetadataManager groupMetadataManager, - Consumer consumer) throws Exception; + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + int generation = 293; + String memberId = "98098230493"; + String protocolType = "consumer"; + String protocol = "range"; + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition("foo", 0), 23L); + committedOffsets.put( + new TopicPartition("foo", 1), 455L); + committedOffsets.put( + new TopicPartition("bar", 0), 8992L); + + List offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, groupId); + SimpleRecord groupMetadataRecord = buildStableGroupRecordWithMember( + generation, + protocolType, + protocol, + memberId + ); + SimpleRecord groupMetadataTombstone = new SimpleRecord( + groupMetadataKey(groupId), + null + ); + + List newOffsetCommitRecords = new ArrayList<>(); + newOffsetCommitRecords.add(groupMetadataRecord); + newOffsetCommitRecords.add(groupMetadataTombstone); + newOffsetCommitRecords.addAll(offsetCommitRecords); + + ByteBuffer buffer = newMemoryRecordsBuffer(newOffsetCommitRecords); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Empty, group.currentState()); + assertEquals(committedOffsets.size(), group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + }); + }); } - void runGroupMetadataManagerTester(final String topicName, - GroupMetadataManagerTester tester) throws Exception { - @Cleanup - Producer producer = pulsarClient.newProducer() - .topic(topicName) - .create(); - @Cleanup - Consumer consumer = pulsarClient.newConsumer() - .topic(topicName) - .subscriptionName("test-sub") - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe(); - @Cleanup - Reader reader = pulsarClient.newReader() - .topic(topicName) - .startMessageId(MessageId.earliest) - .create(); - groupMetadataManager = new GroupMetadataManager( - 1, - offsetConfig, - producer, - reader, - time + @Test + public void testLoadGroupAndOffsetsFromDifferentSegments() throws Exception { + TopicPartition groupMetadataTopicPartition = groupTopicPartition; + int generation = 293; + String protocolType = "consumer"; + String protocol = "range"; + TopicPartition tp0 = new TopicPartition("foo", 0); + TopicPartition tp1 = new TopicPartition("foo", 1); + TopicPartition tp2 = new TopicPartition("bar", 0); + TopicPartition tp3 = new TopicPartition("xxx", 0); + + String segment1MemberId = "a"; + Map segment1Offsets = new HashMap<>(); + segment1Offsets.put(tp0, 23L); + segment1Offsets.put(tp1, 455L); + segment1Offsets.put(tp3, 42L); + List segment1Records = createCommittedOffsetRecords(segment1Offsets, groupId); + SimpleRecord segment1Group = buildStableGroupRecordWithMember( + generation, + protocolType, + protocol, + segment1MemberId ); - tester.test(groupMetadataManager, consumer); + segment1Records.add(segment1Group); + ByteBuffer segment1Buffer = newMemoryRecordsBuffer(segment1Records); + + String segment2MemberId = "a"; + Map segment2Offsets = new HashMap<>(); + segment2Offsets.put(tp0, 33L); + segment2Offsets.put(tp2, 8992L); + segment2Offsets.put(tp3, 10L); + List segment2Records = createCommittedOffsetRecords(segment2Offsets, groupId); + SimpleRecord segment2Group = buildStableGroupRecordWithMember( + generation, + protocolType, + protocol, + segment2MemberId + ); + segment2Records.add(segment2Group); + ByteBuffer segment2Buffer = newMemoryRecordsBuffer(segment2Records); + + byte[] key = groupMetadataKey(groupId); + runGroupMetadataManagerProducerTester("test-load-offsets-without-group", + (groupMetadataManager, producer) -> { + producer.newMessage() + .keyBytes(key) + .value(segment1Buffer) + .eventTime(time.milliseconds()) + .send(); + + producer.newMessage() + .keyBytes(key) + .value(segment2Buffer) + .eventTime(time.milliseconds()) + .send(); + + CompletableFuture onLoadedFuture = new CompletableFuture<>(); + groupMetadataManager.scheduleLoadGroupAndOffsets( + groupMetadataTopicPartition.partition(), + groupMetadata -> onLoadedFuture.complete(groupMetadata) + ).get(); + GroupMetadata group = onLoadedFuture.get(); + GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> { + fail("Group was not loaded into the cache"); + return null; + }); + assertSame(group, groupInCache); + + assertEquals(groupId, group.groupId()); + assertEquals(Stable, group.currentState()); + + assertEquals("segment2 group record member should be elected", + segment2MemberId, group.leaderOrNull()); + assertEquals("segment2 group record member should be only member", + Lists.newArrayList(segment2MemberId), + group.allMembers().stream().collect(Collectors.toList())); + + // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions + Map committedOffsets = new HashMap<>(); + committedOffsets.putAll(segment1Offsets); + committedOffsets.putAll(segment2Offsets); + assertEquals(committedOffsets.size(), group.allOffsets().size()); + committedOffsets.forEach((tp, offset) -> { + assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)); + }); + }); + } + + @Test + public void testAddGroup() { + GroupMetadata group = new GroupMetadata("foo", Empty); + assertEquals(group, groupMetadataManager.addGroup(group)); + assertEquals(group, groupMetadataManager.addGroup( + new GroupMetadata("foo", Empty) + )); } @Test public void testStoreEmptyGroup() throws Exception { final String topicName = "test-store-empty-group"; - runGroupMetadataManagerTester(topicName, (groupMetadataManager, consumer) -> { + runGroupMetadataManagerConsumerTester(topicName, (groupMetadataManager, consumer) -> { int generation = 27; String protocolType = "consumer"; GroupMetadata group = GroupMetadata.loadGroup( groupId, - GroupState.Empty, + Empty, generation, protocolType, null, @@ -185,23 +1207,36 @@ public void testStoreEmptyGroup() throws Exception { Errors errors = groupMetadataManager.storeGroup(group, Collections.emptyMap()).get(); assertEquals(Errors.NONE, errors); - Message message = consumer.receive(); + Message message = consumer.receive(); assertTrue(message.getEventTime() > 0L); assertTrue(message.hasKey()); byte[] key = message.getKeyBytes(); - byte[] value = message.getValue(); + BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); + assertTrue(groupKey instanceof GroupMetadataKey); + GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey; + assertEquals(groupId, groupMetadataKey.key()); - BaseKey bk = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); - assertTrue(bk instanceof GroupMetadataKey); - GroupMetadataKey gmk = (GroupMetadataKey) bk; - assertEquals(groupId, gmk.key()); + ByteBuffer value = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(value); + AtomicBoolean verified = new AtomicBoolean(false); + memRecords.batches().forEach(batch -> { + for (Record record : batch) { + assertFalse(verified.get()); + BaseKey bk = GroupMetadataConstants.readMessageKey(record.key()); + assertTrue(bk instanceof GroupMetadataKey); + GroupMetadataKey gmk = (GroupMetadataKey) bk; + assertEquals(groupId, gmk.key()); - GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue( - groupId, ByteBuffer.wrap(value) - ); - assertTrue(gm.is(GroupState.Empty)); - assertEquals(generation, gm.generationId()); - assertEquals(Optional.of(protocolType), gm.protocolType()); + GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue( + groupId, record.value() + ); + assertTrue(gm.is(Empty)); + assertEquals(generation, gm.generationId()); + assertEquals(Optional.of(protocolType), gm.protocolType()); + verified.set(true); + } + }); + assertTrue(verified.get()); }); } @@ -209,32 +1244,45 @@ public void testStoreEmptyGroup() throws Exception { public void testStoreEmptySimpleGroup() throws Exception { final String topicName = "test-store-empty-simple-group"; - runGroupMetadataManagerTester(topicName, (groupMetadataManager, consumer) -> { + runGroupMetadataManagerConsumerTester(topicName, (groupMetadataManager, consumer) -> { - GroupMetadata group = new GroupMetadata(groupId, GroupState.Empty); + GroupMetadata group = new GroupMetadata(groupId, Empty); groupMetadataManager.addGroup(group); Errors errors = groupMetadataManager.storeGroup(group, Collections.emptyMap()).get(); assertEquals(Errors.NONE, errors); - Message message = consumer.receive(); + Message message = consumer.receive(); assertTrue(message.getEventTime() > 0L); assertTrue(message.hasKey()); byte[] key = message.getKeyBytes(); - byte[] value = message.getValue(); - BaseKey bk = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); - assertTrue(bk instanceof GroupMetadataKey); - GroupMetadataKey gmk = (GroupMetadataKey) bk; - assertEquals(groupId, gmk.key()); + BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); + assertTrue(groupKey instanceof GroupMetadataKey); + GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey; + assertEquals(groupId, groupMetadataKey.key()); - GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue( - groupId, ByteBuffer.wrap(value) - ); - assertTrue(gm.is(GroupState.Empty)); - assertEquals(0, gm.generationId()); - assertEquals(Optional.empty(), gm.protocolType()); + ByteBuffer value = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(value); + AtomicBoolean verified = new AtomicBoolean(false); + memRecords.batches().forEach(batch -> { + for (Record record : batch) { + assertFalse(verified.get()); + BaseKey bk = GroupMetadataConstants.readMessageKey(record.key()); + assertTrue(bk instanceof GroupMetadataKey); + GroupMetadataKey gmk = (GroupMetadataKey) bk; + assertEquals(groupId, gmk.key()); + GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue( + groupId, record.value() + ); + assertTrue(gm.is(Empty)); + assertEquals(0, gm.generationId()); + assertEquals(Optional.empty(), gm.protocolType()); + verified.set(true); + } + }); + assertTrue(verified.get()); }); } @@ -242,12 +1290,12 @@ public void testStoreEmptySimpleGroup() throws Exception { public void testStoreNoneEmptyGroup() throws Exception { final String topicName = "test-store-non-empty-group"; - runGroupMetadataManagerTester(topicName, (groupMetadataManager, consumer) -> { + runGroupMetadataManagerConsumerTester(topicName, (groupMetadataManager, consumer) -> { String memberId = "memberId"; String clientId = "clientId"; String clientHost = "localhost"; - GroupMetadata group = new GroupMetadata(groupId, GroupState.Empty); + GroupMetadata group = new GroupMetadata(groupId, Empty); groupMetadataManager.addGroup(group); Map protocols = new HashMap<>(); @@ -273,26 +1321,627 @@ public void testStoreNoneEmptyGroup() throws Exception { Errors errors = groupMetadataManager.storeGroup(group, assignments).get(); assertEquals(Errors.NONE, errors); - Message message = consumer.receive(); + Message message = consumer.receive(); assertTrue(message.getEventTime() > 0L); assertTrue(message.hasKey()); byte[] key = message.getKeyBytes(); - byte[] value = message.getValue(); + BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); + assertTrue(groupKey instanceof GroupMetadataKey); + GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey; + assertEquals(groupId, groupMetadataKey.key()); + + ByteBuffer value = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(value); + AtomicBoolean verified = new AtomicBoolean(false); + memRecords.batches().forEach(batch -> { + for (Record record : batch) { + assertFalse(verified.get()); + BaseKey bk = GroupMetadataConstants.readMessageKey(record.key()); + assertTrue(bk instanceof GroupMetadataKey); + GroupMetadataKey gmk = (GroupMetadataKey) bk; + assertEquals(groupId, gmk.key()); + + GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue( + groupId, record.value() + ); + assertEquals(Stable, gm.currentState()); + assertEquals(1, gm.generationId()); + assertEquals(Optional.of(protocolType), gm.protocolType()); + assertEquals("protocol", gm.protocolOrNull()); + assertTrue(gm.has(memberId)); + verified.set(true); + } + }); + assertTrue(verified.get()); + }); + } - BaseKey bk = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); - assertTrue(bk instanceof GroupMetadataKey); - GroupMetadataKey gmk = (GroupMetadataKey) bk; + @Test + public void testCommitOffset() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + String memberId = ""; + TopicPartition topicPartition = new TopicPartition("foo", 0); + groupMetadataManager.addPartitionOwnership(groupPartitionId); + long offset = 37L; + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + Map offsets = ImmutableMap.builder() + .put(topicPartition, OffsetAndMetadata.apply(offset)) + .build(); + + Map commitErrors = groupMetadataManager.storeOffsets( + group, memberId, offsets + ).get(); + + assertTrue(group.hasOffsets()); + assertFalse(commitErrors.isEmpty()); + Errors maybeError = commitErrors.get(topicPartition); + assertEquals(Errors.NONE, maybeError); + assertTrue(group.hasOffsets()); + + Map cachedOffsets = groupMetadataManager.getOffsets( + groupId, + Optional.of(Lists.newArrayList(topicPartition)) + ); + PartitionData maybePartitionResponse = cachedOffsets.get(topicPartition); + assertNotNull(maybePartitionResponse); + + assertEquals(Errors.NONE, maybePartitionResponse.error); + assertEquals(offset, maybePartitionResponse.offset); + + Message message = consumer.receive(); + assertTrue(message.getEventTime() > 0L); + assertTrue(message.hasKey()); + byte[] key = message.getKeyBytes(); + BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); + assertTrue(groupKey instanceof OffsetKey); + + ByteBuffer value = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(value); + AtomicBoolean verified = new AtomicBoolean(false); + memRecords.batches().forEach(batch -> { + for (Record record : batch) { + assertFalse(verified.get()); + BaseKey bk = GroupMetadataConstants.readMessageKey(record.key()); + assertTrue(bk instanceof OffsetKey); + OffsetKey ok = (OffsetKey) bk; + GroupTopicPartition gtp = ok.key(); + assertEquals(groupId, gtp.group()); + assertEquals(topicPartition, gtp.topicPartition()); + + OffsetAndMetadata gm = GroupMetadataConstants.readOffsetMessageValue( + record.value() + ); + assertEquals(offset, gm.offset()); + verified.set(true); + } + }); + assertTrue(verified.get()); + }); + } + + @Test + public void testTransactionalCommitOffsetCommitted() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + String memberId = ""; + TopicPartition topicPartition = new TopicPartition("foo", 0); + long offset = 37L; + long producerId = 232L; + short producerEpoch = 0; + + groupMetadataManager.addPartitionOwnership(groupPartitionId); + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + Map offsets = ImmutableMap.builder() + .put(topicPartition, OffsetAndMetadata.apply(offset)) + .build(); + + CompletableFuture writeOffsetMessageFuture = new CompletableFuture<>(); + AtomicReference> realWriteFutureRef = new AtomicReference<>(); + doAnswer(invocationOnMock -> { + CompletableFuture realWriteFuture = + (CompletableFuture) invocationOnMock.callRealMethod(); + realWriteFutureRef.set(realWriteFuture); + return writeOffsetMessageFuture; + }).when(groupMetadataManager).storeOffsetMessage( + any(byte[].class), any(ByteBuffer.class), anyLong() + ); + + CompletableFuture> storeFuture = groupMetadataManager.storeOffsets( + group, memberId, offsets, producerId, producerEpoch + ); + + assertTrue(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + + // complete the write message + writeOffsetMessageFuture.complete(realWriteFutureRef.get().get()); + Map commitErrors = storeFuture.get(); + + assertFalse(commitErrors.isEmpty()); + Errors maybeError = commitErrors.get(topicPartition); + assertEquals(Errors.NONE, maybeError); + assertTrue(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + + group.completePendingTxnOffsetCommit(producerId, true); + assertTrue(group.hasOffsets()); + assertFalse(group.allOffsets().isEmpty()); + + assertEquals( + Optional.of(OffsetAndMetadata.apply(offset)), + group.offset(topicPartition) + ); + }); + } + + @Test + public void testTransactionalCommitOffsetAppendFailure() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + String memberId = ""; + TopicPartition topicPartition = new TopicPartition("foo", 0); + long offset = 37L; + long producerId = 232L; + short producerEpoch = 0; + + groupMetadataManager.addPartitionOwnership(groupPartitionId); + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + Map offsets = ImmutableMap.builder() + .put(topicPartition, OffsetAndMetadata.apply(offset)) + .build(); + + CompletableFuture writeOffsetMessageFuture = new CompletableFuture<>(); + AtomicReference> realWriteFutureRef = new AtomicReference<>(); + doAnswer(invocationOnMock -> { + CompletableFuture realWriteFuture = + (CompletableFuture) invocationOnMock.callRealMethod(); + realWriteFutureRef.set(realWriteFuture); + return writeOffsetMessageFuture; + }).when(groupMetadataManager).storeOffsetMessage( + any(byte[].class), any(ByteBuffer.class), anyLong() + ); + + CompletableFuture> storeFuture = groupMetadataManager.storeOffsets( + group, memberId, offsets, producerId, producerEpoch + ); + + assertTrue(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + + // complete the write message + writeOffsetMessageFuture.completeExceptionally( + new Exception("Not enought replicas") + ); + Map commitErrors = storeFuture.get(); + + assertFalse(commitErrors.isEmpty()); + Errors maybeError = commitErrors.get(topicPartition); + assertEquals(Errors.UNKNOWN_SERVER_ERROR, maybeError); + assertFalse(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + + group.completePendingTxnOffsetCommit(producerId, false); + assertFalse(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + }); + } + + @Test + public void testTransactionalCommitOffsetAborted() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + String memberId = ""; + TopicPartition topicPartition = new TopicPartition("foo", 0); + long offset = 37L; + long producerId = 232L; + short producerEpoch = 0; + + groupMetadataManager.addPartitionOwnership(groupPartitionId); + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + Map offsets = ImmutableMap.builder() + .put(topicPartition, OffsetAndMetadata.apply(offset)) + .build(); + + CompletableFuture writeOffsetMessageFuture = new CompletableFuture<>(); + AtomicReference> realWriteFutureRef = new AtomicReference<>(); + doAnswer(invocationOnMock -> { + CompletableFuture realWriteFuture = + (CompletableFuture) invocationOnMock.callRealMethod(); + realWriteFutureRef.set(realWriteFuture); + return writeOffsetMessageFuture; + }).when(groupMetadataManager).storeOffsetMessage( + any(byte[].class), any(ByteBuffer.class), anyLong() + ); + + CompletableFuture> storeFuture = groupMetadataManager.storeOffsets( + group, memberId, offsets, producerId, producerEpoch + ); + + assertTrue(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + + // complete the write message + writeOffsetMessageFuture.complete(realWriteFutureRef.get().get()); + Map commitErrors = storeFuture.get(); + + assertFalse(commitErrors.isEmpty()); + Errors maybeError = commitErrors.get(topicPartition); + assertEquals(Errors.NONE, maybeError); + assertTrue(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + + group.completePendingTxnOffsetCommit(producerId, false); + assertFalse(group.hasOffsets()); + assertTrue(group.allOffsets().isEmpty()); + }); + } + + @Test + public void testExpiredOffset() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + String memberId = ""; + TopicPartition topicPartition1 = new TopicPartition("foo", 0); + TopicPartition topicPartition2 = new TopicPartition("foo", 1); + groupMetadataManager.addPartitionOwnership(groupPartitionId); + long offset = 37L; + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + long startMs = time.milliseconds(); + Map offsets = ImmutableMap.builder() + .put(topicPartition1, OffsetAndMetadata.apply( + offset, "", startMs, startMs + 1)) + .put(topicPartition2, OffsetAndMetadata.apply( + offset, "", startMs, startMs + 3)) + .build(); + + Map commitErrors = groupMetadataManager.storeOffsets( + group, memberId, offsets + ).get(); + assertTrue(group.hasOffsets()); + + assertFalse(commitErrors.isEmpty()); + Errors maybeError = commitErrors.get(topicPartition1); + assertEquals(Errors.NONE, maybeError); + + // expire only one of the offsets + time.sleep(2); + + groupMetadataManager.cleanupGroupMetadata(); + + assertEquals(Optional.of(group), groupMetadataManager.getGroup(groupId)); + assertEquals(Optional.empty(), group.offset(topicPartition1)); + assertEquals(Optional.of(offset), group.offset(topicPartition2).map(OffsetAndMetadata::offset)); + + Map cachedOffsets = groupMetadataManager.getOffsets( + groupId, + Optional.of(Lists.newArrayList( + topicPartition1, + topicPartition2 + )) + ); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition1).offset); + assertEquals( + offset, + cachedOffsets.get(topicPartition2).offset); + }); + } + + @Test + public void testGroupMetadataRemoval() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + TopicPartition topicPartition1 = new TopicPartition("foo", 0); + TopicPartition topicPartition2 = new TopicPartition("foo", 1); + + groupMetadataManager.addPartitionOwnership(groupPartitionId); + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + group.generationId(5); + + groupMetadataManager.cleanupGroupMetadata().get(); + + Message message = consumer.receive(); + assertTrue(message.getEventTime() > 0L); + assertTrue(message.hasKey()); + byte[] key = message.getKeyBytes(); + + BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); + assertTrue(groupKey instanceof GroupMetadataKey); + GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey; + assertEquals(groupId, groupMetadataKey.key()); + + ByteBuffer value = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(value); + AtomicBoolean verified = new AtomicBoolean(false); + memRecords.batches().forEach(batch -> { + assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + for (Record record : batch) { + assertFalse(verified.get()); + assertTrue(record.hasKey()); + assertFalse(record.hasValue()); + assertTrue(record.timestamp() > 0); + BaseKey bk = GroupMetadataConstants.readMessageKey(record.key()); + assertTrue(bk instanceof GroupMetadataKey); + GroupMetadataKey gmk = (GroupMetadataKey) bk; + assertEquals(groupId, gmk.key()); + verified.set(true); + } + }); + assertTrue(verified.get()); + assertEquals(Optional.empty(), groupMetadataManager.getGroup(groupId)); + Map cachedOffsets = groupMetadataManager.getOffsets( + groupId, + Optional.of(Lists.newArrayList( + topicPartition1, + topicPartition2 + )) + ); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition1).offset); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition2).offset); + }); + } + + @Test + public void testExpireGroupWithOffsetsOnly() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + // verify that the group is removed properly, but no tombstone is written if + // this is a group which is only using kafka for offset storage + + String memberId = ""; + TopicPartition topicPartition1 = new TopicPartition("foo", 0); + TopicPartition topicPartition2 = new TopicPartition("foo", 1); + long offset = 37; + + groupMetadataManager.addPartitionOwnership(groupPartitionId); + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + long startMs = time.milliseconds(); + Map offsets = ImmutableMap.builder() + .put(topicPartition1, OffsetAndMetadata.apply(offset, "", startMs, startMs + 1)) + .put(topicPartition2, OffsetAndMetadata.apply(offset, "", startMs, startMs + 3)) + .build(); + + Map commitErrors = + groupMetadataManager.storeOffsets(group, memberId, offsets).get(); + assertTrue(group.hasOffsets()); + + assertFalse(commitErrors.isEmpty()); + assertEquals( + Errors.NONE, + commitErrors.get(topicPartition1) + ); + + // expire all of the offsets + time.sleep(4); + + groupMetadataManager.cleanupGroupMetadata().get(); + + // skip `storeOffsets` op + consumer.receive(); + + Message message = consumer.receive(); + assertTrue(message.getEventTime() > 0L); + assertTrue(message.hasKey()); + byte[] key = message.getKeyBytes(); + + BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key)); + assertTrue(groupKey instanceof GroupMetadataKey); + GroupMetadataKey gmk = (GroupMetadataKey) groupKey; assertEquals(groupId, gmk.key()); - GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue( - groupId, ByteBuffer.wrap(value) + ByteBuffer value = message.getValue(); + MemoryRecords memRecords = MemoryRecords.readableRecords(value); + AtomicInteger verified = new AtomicInteger(2); + memRecords.batches().forEach(batch -> { + assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + for (Record record : batch) { + verified.decrementAndGet(); + assertTrue(record.hasKey()); + assertFalse(record.hasValue()); + assertTrue(record.timestamp() > 0); + BaseKey bk = GroupMetadataConstants.readMessageKey(record.key()); + assertTrue(bk instanceof OffsetKey); + OffsetKey ok = (OffsetKey) bk; + assertEquals(groupId, ok.key().group()); + assertEquals("foo", ok.key().topicPartition().topic()); + } + }); + assertEquals(0, verified.get()); + assertEquals(Optional.empty(), groupMetadataManager.getGroup(groupId)); + Map cachedOffsets = groupMetadataManager.getOffsets( + groupId, + Optional.of(Lists.newArrayList( + topicPartition1, + topicPartition2 + )) + ); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition1).offset); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition2).offset); + }); + } + + @Test + public void testExpireOffsetsWithActiveGroup() throws Exception { + runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> { + String memberId = "memberId"; + String clientId = "clientId"; + String clientHost = "localhost"; + TopicPartition topicPartition1 = new TopicPartition("foo", 0); + TopicPartition topicPartition2 = new TopicPartition("foo", 1); + long offset = 37; + + groupMetadataManager.addPartitionOwnership(groupPartitionId); + + GroupMetadata group = new GroupMetadata(groupId, Empty); + groupMetadataManager.addGroup(group); + + MemberMetadata member = new MemberMetadata( + memberId, groupId, clientId, clientHost, + rebalanceTimeout, + sessionTimeout, + protocolType, + ImmutableMap.builder() + .put("protocol", new byte[0]) + .build() + ); + CompletableFuture memberJoinFuture = new CompletableFuture<>(); + member.awaitingJoinCallback(memberJoinFuture); + group.add(member); + group.transitionTo(PreparingRebalance); + group.initNextGeneration(); + + long startMs = time.milliseconds(); + Map offsets = ImmutableMap.builder() + .put(topicPartition1, OffsetAndMetadata.apply(offset, "", startMs, startMs + 1)) + .put(topicPartition2, OffsetAndMetadata.apply(offset, "", startMs, startMs + 3)) + .build(); + + Map commitErrors = + groupMetadataManager.storeOffsets(group, memberId, offsets).get(); + assertTrue(group.hasOffsets()); + + assertFalse(commitErrors.isEmpty()); + assertEquals( + Errors.NONE, + commitErrors.get(topicPartition1) + ); + + // expire all of the offsets + time.sleep(4); + + groupMetadataManager.cleanupGroupMetadata().get(); + + // group should still be there, but the offsets should be gone + assertEquals( + Optional.of(group), + groupMetadataManager.getGroup(groupId) + ); + assertEquals( + Optional.empty(), + group.offset(topicPartition1) ); - assertEquals(GroupState.Stable, gm.currentState()); - assertEquals(1, gm.generationId()); - assertEquals(Optional.of(protocolType), gm.protocolType()); - assertEquals("protocol", gm.protocolOrNull()); - assertTrue(gm.has(memberId)); + assertEquals( + Optional.empty(), + group.offset(topicPartition2) + ); + + Map cachedOffsets = groupMetadataManager.getOffsets( + groupId, + Optional.of(Lists.newArrayList( + topicPartition1, + topicPartition2 + )) + ); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition1).offset); + assertEquals( + OffsetFetchResponse.INVALID_OFFSET, + cachedOffsets.get(topicPartition2).offset); }); } + /** + * A group metadata manager test runner. + */ + @FunctionalInterface + public interface GroupMetadataManagerProducerTester { + + void test(GroupMetadataManager groupMetadataManager, + Producer consumer) throws Exception; + + } + + /** + * A group metadata manager test runner. + */ + @FunctionalInterface + public interface GroupMetadataManagerConsumerTester { + + void test(GroupMetadataManager groupMetadataManager, + Consumer consumer) throws Exception; + + } + + void runGroupMetadataManagerProducerTester(final String topicName, + GroupMetadataManagerProducerTester tester) throws Exception { + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTEBUFFER) + .topic(topicName) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER) + .topic(topicName) + .subscriptionName("test-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + @Cleanup + Reader reader = pulsarClient.newReader(Schema.BYTEBUFFER) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create(); + groupMetadataManager = spy(new GroupMetadataManager( + 1, + offsetConfig, + producer, + reader, + scheduler, + time + )); + tester.test(groupMetadataManager, producer); + } + + void runGroupMetadataManagerConsumerTester(final String topicName, + GroupMetadataManagerConsumerTester tester) throws Exception { + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTEBUFFER) + .topic(topicName) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER) + .topic(topicName) + .subscriptionName("test-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + @Cleanup + Reader reader = pulsarClient.newReader(Schema.BYTEBUFFER) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create(); + groupMetadataManager = spy(new GroupMetadataManager( + 1, + offsetConfig, + producer, + reader, + scheduler, + time + )); + tester.test(groupMetadataManager, consumer); + } + } diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 4f9dc8da81eed..d4e681da61196 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -25,8 +25,8 @@ - - + +