From d8c2857224c5ea50e4862edd6214c24f75c647a1 Mon Sep 17 00:00:00 2001 From: Bazen <49089563+bazen-teklehaymanot@users.noreply.github.com> Date: Mon, 5 Feb 2024 18:45:23 +0200 Subject: [PATCH] Add station partitions update on consumer creation --- src/Memphis.Client/MemphisClient.Consumer.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Memphis.Client/MemphisClient.Consumer.cs b/src/Memphis.Client/MemphisClient.Consumer.cs index 3ccb44c..27561d4 100644 --- a/src/Memphis.Client/MemphisClient.Consumer.cs +++ b/src/Memphis.Client/MemphisClient.Consumer.cs @@ -77,6 +77,11 @@ public async Task CreateConsumer(MemphisConsumerOptions consume var consumer = new MemphisConsumer(this, consumerOptions, createConsumerResponse.PartitionsUpdate.PartitionsList); _consumerCache.AddOrUpdate(consumer.Key, consumer, (_, _) => consumer); + if(createConsumerResponse is { PartitionsUpdate: { } partitionsUpdate }) + { + _stationPartitions.AddOrUpdate(consumerOptions.StationName, partitionsUpdate, (_, _) => partitionsUpdate); + } + await ListenForSchemaUpdate(consumerOptions.StationName); return consumer;