diff --git a/config/config.yaml b/config/config.yaml index 6c95431fd1..a261caccb0 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -141,3 +141,4 @@ preprocessorConfig: rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1} kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0} useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false} + rateLimitExceededErrorCode: ${KALDB_PREPROCESSOR_RATE_LIMIT_EXCEEDED_ERROR_CODE:-400} diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java index b67a9d8a84..37cb47e00f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java @@ -1,9 +1,9 @@ package com.slack.kaldb.bulkIngestApi; import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; -import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.server.annotation.Post; import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.service.murron.trace.Trace; @@ -33,11 +33,13 @@ public class BulkIngestApi { private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte"; private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs"; private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest"; + private final int rateLimitExceededErrorCode; public BulkIngestApi( BulkIngestKafkaProducer bulkIngestKafkaProducer, DatasetRateLimitingService datasetRateLimitingService, - MeterRegistry meterRegistry) { + MeterRegistry meterRegistry, + int rateLimitExceededErrorCode) { this.bulkIngestKafkaProducer = bulkIngestKafkaProducer; this.datasetRateLimitingService = datasetRateLimitingService; @@ -45,6 +47,11 @@ public BulkIngestApi( this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL); this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS); this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER); + if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) { + this.rateLimitExceededErrorCode = 400; + } else { + this.rateLimitExceededErrorCode = rateLimitExceededErrorCode; + } } @Post("/_bulk") @@ -77,7 +84,8 @@ public HttpResponse addDocument(String bulkRequest) { final String index = indexDocs.getKey(); if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) { BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded"); - future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response)); + future.complete( + HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), response)); return HttpResponse.of(future); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java index 47515a7d15..1bc97b4d12 100644 --- a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java @@ -211,21 +211,30 @@ private void recoveryNodeListener(RecoveryNodeMetadata recoveryNodeMetadata) { * fails. To break this cycle add a enqueue_count value to recovery task so we can stop recovering * it if the task fails a certain number of times. */ - private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) { + protected void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) { try { setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.RECOVERING); RecoveryTaskMetadata recoveryTaskMetadata = recoveryTaskMetadataStore.getSync(recoveryNodeMetadata.recoveryTaskName); - boolean success = handleRecoveryTask(recoveryTaskMetadata); - if (success) { - // delete the completed recovery task on success + if (!isValidRecoveryTask(recoveryTaskMetadata)) { + LOG.error( + "Invalid recovery task detected, skipping and deleting invalid task {}", + recoveryTaskMetadata); recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name); - setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); - recoveryNodeAssignmentSuccess.increment(); - } else { setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); recoveryNodeAssignmentFailed.increment(); + } else { + boolean success = handleRecoveryTask(recoveryTaskMetadata); + if (success) { + // delete the completed recovery task on success + recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name); + setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + recoveryNodeAssignmentSuccess.increment(); + } else { + setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + recoveryNodeAssignmentFailed.increment(); + } } } catch (Exception e) { setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); @@ -234,6 +243,20 @@ private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetad } } + /** + * Attempts a final sanity-check on the recovery task to prevent a bad task from halting the + * recovery pipeline. Bad state should be ideally prevented at the creation, as well as prior to + * assignment, but this can be considered a final fail-safe if invalid recovery tasks somehow made + * it this far. + */ + private boolean isValidRecoveryTask(RecoveryTaskMetadata recoveryTaskMetadata) { + // todo - consider adding further invalid recovery task detections + if (recoveryTaskMetadata.endOffset <= recoveryTaskMetadata.startOffset) { + return false; + } + return true; + } + /** * This method does the recovery work from a recovery task. A recovery task indicates the start * and end offset of a kafka partition to index. To do the recovery work, we create a recovery diff --git a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java index 8c93374235..8c8a6fbf53 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java @@ -399,7 +399,11 @@ private static Set getServices( services.add(datasetRateLimitingService); BulkIngestApi openSearchBulkApiService = - new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry); + new BulkIngestApi( + bulkIngestKafkaProducer, + datasetRateLimitingService, + meterRegistry, + preprocessorConfig.getRateLimitExceededErrorCode()); armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService); } else { PreprocessorService preprocessorService = diff --git a/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java b/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java index 8bfda6112e..c13f793e1d 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java @@ -218,6 +218,10 @@ public long determineStartingOffset( } else if (indexerConfig.getCreateRecoveryTasksOnStart() && indexerConfig.getReadFromLocationOnStart() == KaldbConfigs.KafkaOffsetLocation.LATEST) { + // Todo - this appears to be able to create recovery tasks that have a start and end + // position of 0, which is invalid. This seems to occur when new clusters are initialized, + // and is especially problematic when indexers are created but never get assigned (ie, + // deploy 5, only assign 3). LOG.info( "CreateRecoveryTasksOnStart is set and ReadLocationOnStart is set to current. Reading from current and" + " spinning up recovery tasks"); diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index 21ac233489..8784eb3b24 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -270,4 +270,10 @@ message PreprocessorConfig { // we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future KafkaConfig kafka_config = 9; bool use_bulk_api = 10; + + // Make the rate limit exceeded error code confugurable + // We default to 400 to prioritize fresh logs and drop excess logs + // Set this to 429 for clients to retry the request after a delay + // Only used when we use the bulk API + int32 rate_limit_exceeded_error_code = 11; } diff --git a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java index 138dd53c62..d3e23fb654 100644 --- a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java @@ -655,6 +655,61 @@ public void testValidateOffsetsWhenRecoveryTaskOverlapsWithEndOfKafkaRange() { assertThat(offsets.endOffset).isEqualTo(kafkaEndOffset); } + @Test + public void shouldHandleInvalidRecoveryTasks() throws Exception { + KaldbConfigs.KaldbConfig kaldbCfg = makeKaldbConfig(TEST_S3_BUCKET); + curatorFramework = + CuratorBuilder.build(meterRegistry, kaldbCfg.getMetadataStoreConfig().getZookeeperConfig()); + + // Start recovery service + recoveryService = new RecoveryService(kaldbCfg, curatorFramework, meterRegistry, blobFs); + recoveryService.startAsync(); + recoveryService.awaitRunning(DEFAULT_START_STOP_DURATION); + + // Create a recovery task + RecoveryTaskMetadataStore recoveryTaskMetadataStore = + new RecoveryTaskMetadataStore(curatorFramework, false); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero(); + RecoveryTaskMetadata recoveryTask = + new RecoveryTaskMetadata("testRecoveryTask", "0", 0, 0, Instant.now().toEpochMilli()); + recoveryTaskMetadataStore.createSync(recoveryTask); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()) + .isEqualTo(1); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).get(0)) + .isEqualTo(recoveryTask); + + // Assign the recovery task to node. + RecoveryNodeMetadataStore recoveryNodeMetadataStore = + new RecoveryNodeMetadataStore(curatorFramework, false); + List recoveryNodes = + KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore); + assertThat(recoveryNodes.size()).isEqualTo(1); + RecoveryNodeMetadata recoveryNodeMetadata = recoveryNodes.get(0); + assertThat(recoveryNodeMetadata.recoveryNodeState) + .isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + recoveryNodeMetadataStore.updateSync( + new RecoveryNodeMetadata( + recoveryNodeMetadata.getName(), + Metadata.RecoveryNodeMetadata.RecoveryNodeState.ASSIGNED, + recoveryTask.getName(), + Instant.now().toEpochMilli())); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()) + .isEqualTo(1); + + await().until(() -> getCount(RECOVERY_NODE_ASSIGNMENT_FAILED, meterRegistry) == 1); + assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_SUCCESS, meterRegistry)).isZero(); + assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_RECEIVED, meterRegistry)).isEqualTo(1); + + // Post recovery checks + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore).size()) + .isEqualTo(1); + assertThat( + KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore) + .get(0) + .recoveryNodeState) + .isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + } + // returns startOffset or endOffset based on the supplied OffsetSpec private static AdminClient getAdminClient(long startOffset, long endOffset) { AdminClient adminClient = mock(AdminClient.class); diff --git a/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java index c112d35bc0..fca2e5e8be 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java @@ -15,14 +15,17 @@ import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer; import com.slack.kaldb.bulkIngestApi.BulkIngestResponse; import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService; +import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.kaldb.metadata.core.CuratorBuilder; import com.slack.kaldb.metadata.dataset.DatasetMetadata; import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; +import com.slack.kaldb.preprocessor.PreprocessorRateLimiter; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.testlib.MetricsUtil; import com.slack.kaldb.testlib.TestKafkaServer; import com.slack.kaldb.util.JsonUtil; +import com.slack.kaldb.util.TestingZKServer; import com.slack.service.murron.trace.Trace; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; @@ -31,8 +34,8 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.CompletableFuture; import org.apache.curator.test.TestingServer; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -67,7 +70,7 @@ public void bootstrapCluster() throws Exception { Tracing.newBuilder().build(); meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); - zkServer = new TestingServer(); + zkServer = TestingZKServer.createTestingServer(); KaldbConfigs.ZookeeperConfig zkConfig = KaldbConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(zkServer.getConnectString()) @@ -122,7 +125,8 @@ public void bootstrapCluster() throws Exception { datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION); bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION); - bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry); + bulkApi = + new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, 400); } // I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and @@ -195,7 +199,12 @@ public void testBulkApiBasic() throws Exception { { "index": {"_index": "testindex", "_id": "1"} } { "field1" : "value1" } """; - updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length); + // use the way we calculate the throughput in the rate limiter to get the exact bytes + Map> docs = + BulkApiRequestParser.parseRequest(request1.getBytes(StandardCharsets.UTF_8)); + int limit = PreprocessorRateLimiter.getSpanBytes(docs.get("testindex")); + // for some reason if we pass the exact limit, the rate limiter doesn't work as expected + updateDatasetThroughput(limit / 2); // test with empty causes a parse exception AggregatedHttpResponse response = bulkApi.addDocument("{}\n").aggregate().join(); @@ -208,49 +217,30 @@ public void testBulkApiBasic() throws Exception { // test with request1 twice. first one should succeed, second one will fail because of rate // limiter - CompletableFuture response1 = - bulkApi - .addDocument(request1) - .aggregate() - .thenApply( - httpResponse -> { - assertThat(httpResponse.status().isSuccess()).isEqualTo(true); - assertThat(httpResponse.status().code()).isEqualTo(OK.code()); - BulkIngestResponse httpResponseObj = null; - try { - httpResponseObj = - JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); - } catch (IOException e) { - fail("", e); - } - assertThat(httpResponseObj.totalDocs()).isEqualTo(1); - assertThat(httpResponseObj.failedDocs()).isEqualTo(0); - return httpResponse; - }); - - CompletableFuture response2 = - bulkApi - .addDocument(request1) - .aggregate() - .thenApply( - httpResponse -> { - assertThat(httpResponse.status().isSuccess()).isEqualTo(false); - assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code()); - BulkIngestResponse httpResponseObj = null; - try { - httpResponseObj = - JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); - } catch (IOException e) { - fail("", e); - } - assertThat(httpResponseObj.totalDocs()).isEqualTo(0); - assertThat(httpResponseObj.failedDocs()).isEqualTo(0); - assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded"); - return httpResponse; - }); - - await().until(response1::isDone); - await().until(response2::isDone); + AggregatedHttpResponse httpResponse = bulkApi.addDocument(request1).aggregate().join(); + assertThat(httpResponse.status().isSuccess()).isEqualTo(true); + assertThat(httpResponse.status().code()).isEqualTo(OK.code()); + try { + BulkIngestResponse httpResponseObj = + JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); + assertThat(httpResponseObj.totalDocs()).isEqualTo(1); + assertThat(httpResponseObj.failedDocs()).isEqualTo(0); + } catch (IOException e) { + fail("", e); + } + + httpResponse = bulkApi.addDocument(request1).aggregate().join(); + assertThat(httpResponse.status().isSuccess()).isEqualTo(false); + assertThat(httpResponse.status().code()).isEqualTo(400); + try { + BulkIngestResponse httpResponseObj = + JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); + assertThat(httpResponseObj.totalDocs()).isEqualTo(0); + assertThat(httpResponseObj.failedDocs()).isEqualTo(0); + assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded"); + } catch (IOException e) { + fail("", e); + } // test with multiple indexes String request2 = @@ -267,6 +257,25 @@ public void testBulkApiBasic() throws Exception { assertThat(responseObj.totalDocs()).isEqualTo(0); assertThat(responseObj.failedDocs()).isEqualTo(0); assertThat(responseObj.errorMsg()).isEqualTo("request must contain only 1 unique index"); + + BulkIngestApi bulkApi2 = + new BulkIngestApi( + bulkIngestKafkaProducer, + datasetRateLimitingService, + meterRegistry, + TOO_MANY_REQUESTS.code()); + httpResponse = bulkApi2.addDocument(request1).aggregate().join(); + assertThat(httpResponse.status().isSuccess()).isEqualTo(false); + assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code()); + try { + BulkIngestResponse httpResponseObj = + JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); + assertThat(httpResponseObj.totalDocs()).isEqualTo(0); + assertThat(httpResponseObj.failedDocs()).isEqualTo(0); + assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded"); + } catch (IOException e) { + fail("", e); + } } @Test diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java index adb80f0fea..474a635701 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java @@ -295,6 +295,7 @@ public void testParseKaldbJsonConfigFile() throws IOException { assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false); + assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(400); final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = config.getPreprocessorConfig().getKafkaConfig(); @@ -476,6 +477,7 @@ public void testParseKaldbYamlConfigFile() throws IOException { assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic"); assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true); + assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(429); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); diff --git a/kaldb/src/test/java/com/slack/kaldb/util/TestingZKServer.java b/kaldb/src/test/java/com/slack/kaldb/util/TestingZKServer.java new file mode 100644 index 0000000000..048b4d16f8 --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/util/TestingZKServer.java @@ -0,0 +1,34 @@ +package com.slack.kaldb.util; + +import static org.assertj.core.api.Assertions.fail; + +import org.apache.curator.test.TestingServer; + +/** + * This class is responsible for creating a testing ZK server for testing purposes. We create the ZK + * server in a separate thread to avoid blocking the main thread. This improves the reliability of + * the tests i.e I can put a debug point while running a test and ZKServer won't get blocked. + * ZkServer getting blocked leads to a session expiry which will cause curator(the client) to + * disconnect and call the runtime halter + */ +public class TestingZKServer { + + public static TestingServer createTestingServer() throws InterruptedException { + ZKTestingServer zkTestingServer = new ZKTestingServer(); + Thread.ofVirtual().start(zkTestingServer).join(); + return zkTestingServer.zkServer; + } + + private static class ZKTestingServer implements Runnable { + public TestingServer zkServer; + + @Override + public void run() { + try { + zkServer = new TestingServer(); + } catch (Exception e) { + fail("Failed to start ZK server", e); + } + } + } +} diff --git a/kaldb/src/test/resources/test_config.json b/kaldb/src/test/resources/test_config.json index 1d1f4e6833..74ec5e6ce7 100644 --- a/kaldb/src/test/resources/test_config.json +++ b/kaldb/src/test/resources/test_config.json @@ -157,7 +157,8 @@ "dataTransformer": "api_log", "rateLimiterMaxBurstSeconds": 2, "bootstrapServers": "localhost:9092", - "useBulkApi": false + "useBulkApi": false, + "rateLimitExceededErrorCode": 400 }, "clusterConfig": { "clusterName": "test_kaldb_json_cluster", diff --git a/kaldb/src/test/resources/test_config.yaml b/kaldb/src/test/resources/test_config.yaml index e5c62cd740..8c179af084 100644 --- a/kaldb/src/test/resources/test_config.yaml +++ b/kaldb/src/test/resources/test_config.yaml @@ -128,6 +128,7 @@ preprocessorConfig: rateLimiterMaxBurstSeconds: 2 bootstrapServers: localhost:9092 useBulkApi: true + rateLimitExceededErrorCode: 429 clusterConfig: clusterName: "test_kaldb_cluster"