From a34f59378dcdd5d4cf87b88a1f0c5f9be01d9bbc Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 26 Sep 2024 21:19:22 -0700 Subject: [PATCH 1/2] bump up to 60s timeout --- .../java/dev/responsive/kafka/internal/utils/SessionUtil.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java index 87a793f8b..a60c96de8 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java @@ -42,6 +42,7 @@ import java.net.InetSocketAddress; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.bson.Document; @@ -121,6 +122,7 @@ public static MongoClient connect( MongoClientSettings settings = MongoClientSettings.builder() .applyConnectionString(new ConnectionString(connectionString)) + .applyToConnectionPoolSettings(b -> b.maxConnectionIdleTime(60000, TimeUnit.MILLISECONDS)) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .serverApi(serverApi) From 41aaf842f63b1a281466ee79923bf1b81a01abf1 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 30 Sep 2024 14:32:12 -0700 Subject: [PATCH 2/2] allow generic connection string params --- .../responsive/kafka/api/ResponsiveKafkaStreams.java | 4 +++- .../responsive/kafka/api/config/ResponsiveConfig.java | 10 ++++++++++ .../responsive/kafka/internal/utils/SessionUtil.java | 11 +++++++---- .../kafka/integration/MinimalIntegrationTest.java | 7 ++++++- .../ResponsiveKafkaStreamsIntegrationTest.java | 2 +- ...ResponsiveKeyValueStoreRestoreIntegrationTest.java | 3 ++- .../kafka/internal/db/mongo/MongoKVTableTest.java | 2 +- .../internal/db/mongo/MongoSessionTableTest.java | 2 +- .../kafka/internal/db/mongo/MongoWindowTableTest.java | 2 +- .../internal/db/mongo/MongoWindowedTableTest.java | 2 +- 10 files changed, 33 insertions(+), 12 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 231bd3674..c3adfbad9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -19,6 +19,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_PASSWORD_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_USERNAME_CONFIG; @@ -493,7 +494,8 @@ public Params build() { final var mongoClient = SessionUtil.connect( hostname, clientId, - clientSecret == null ? null : clientSecret.value() + clientSecret == null ? null : clientSecret.value(), + responsiveConfig.getString(MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG) ); final boolean timestampFirstOrder = responsiveConfig.getBoolean(MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 3b545a2c2..acbc1b63d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -85,6 +85,10 @@ public class ResponsiveConfig extends AbstractConfig { public static final String MONGO_ENDPOINT_CONFIG = "responsive.mongo.endpoint"; private static final String MONGO_ENDPOINT_DOC = "The MongoDB endpoint to connect to."; + public static final String MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG = "responsive.mongo.additional.connection.string.params"; + private static final String MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_DOC = "Additional MongoDB config options to be appended to the " + + "connection string. "; + public static final String MONGO_COLLECTION_SHARDING_ENABLED_CONFIG = "responsive.mongo.collection.sharding.enabled"; private static final boolean MONGO_COLLECTION_SHARDING_ENABLED_DEFAULT = false; private static final String MONGO_COLLECTION_SHARDING_ENABLED_DOC = "Toggles use of sharded collections. Set " @@ -503,6 +507,12 @@ public class ResponsiveConfig extends AbstractConfig { MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DEFAULT, Importance.LOW, MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DOC + ).define( + MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG, + Type.STRING, + "", + Importance.LOW, + MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_DOC ).define( WINDOW_BLOOM_FILTER_COUNT_CONFIG, Type.INT, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java index a60c96de8..c47039c8e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/SessionUtil.java @@ -42,7 +42,6 @@ import java.net.InetSocketAddress; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.bson.Document; @@ -94,7 +93,8 @@ public static CqlSession connect( public static MongoClient connect( final String hostname, @Nullable final String clientId, - @Nullable final String clientSecret + @Nullable final String clientSecret, + final String additionalParams ) { final String connectionString; if (clientId != null && clientSecret != null) { @@ -116,13 +116,16 @@ public static MongoClient connect( connectionString = hostname; } + final String connectionStringWithParams = additionalParams.equals("") + ? connectionString + : connectionString + "/?" + additionalParams; + ServerApi serverApi = ServerApi.builder() .version(ServerApiVersion.V1) .build(); MongoClientSettings settings = MongoClientSettings.builder() - .applyConnectionString(new ConnectionString(connectionString)) - .applyToConnectionPoolSettings(b -> b.maxConnectionIdleTime(60000, TimeUnit.MILLISECONDS)) + .applyConnectionString(new ConnectionString(connectionStringWithParams)) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .serverApi(serverApi) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java index cba1aad00..40919c1ba 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java @@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.hasItems; import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; @@ -72,7 +73,7 @@ public class MinimalIntegrationTest { @RegisterExtension - static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA); + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC = "output"; @@ -160,6 +161,10 @@ private Map getMutableProperties() { properties.put(STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); + properties.put( + ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG, "maxIdleTimeMs=60000" + ); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); properties.put(consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); properties.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java index 8e4af4174..c663ae643 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java @@ -147,7 +147,7 @@ public void shouldDefaultToResponsiveStoresWhenUsingDsl() throws Exception { // Then: try ( - final var mongoClient = SessionUtil.connect(mongo.getConnectionString(), null, null); + final var mongoClient = SessionUtil.connect(mongo.getConnectionString(), null, null, ""); final var deserializer = new StringDeserializer(); ) { final List dbs = new ArrayList<>(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java index 2564769ea..22bda7f24 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java @@ -391,7 +391,8 @@ private RemoteKVTable remoteKVTable( final var mongoClient = SessionUtil.connect( hostname, user, - pass == null ? null : pass.value() + pass == null ? null : pass.value(), + "" ); table = new MongoKVTable( mongoClient, diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java index 44df7329e..5bb2a7944 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java @@ -61,7 +61,7 @@ public void before( name = info.getDisplayName().replace("()", ""); final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); - client = SessionUtil.connect(mongoConnection, null, null); + client = SessionUtil.connect(mongoConnection, null, null, ""); } @Test diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTableTest.java index 4ef43fc69..953a3f2e1 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTableTest.java @@ -61,7 +61,7 @@ public void before( name = info.getDisplayName().replace("()", ""); final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); - client = SessionUtil.connect(mongoConnection, null, null); + client = SessionUtil.connect(mongoConnection, null, null, ""); } /* diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java index 6c3cfaf7d..70e923ae5 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java @@ -57,7 +57,7 @@ public void before( name = info.getDisplayName().replace("()", ""); final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); - client = SessionUtil.connect(mongoConnection, null, null); + client = SessionUtil.connect(mongoConnection, null, null, ""); } @Test diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java index 369fe0e44..5ba8bdf10 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java @@ -52,7 +52,7 @@ public void before( name = info.getDisplayName().replace("()", ""); final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); - client = SessionUtil.connect(mongoConnection, null, null); + client = SessionUtil.connect(mongoConnection, null, null, ""); partitioner = new WindowSegmentPartitioner(10_000L, 1_000L, false); segment = partitioner.segmenter().activeSegments(0, 100).get(0);