Skip to content

Commit

Permalink
make rate limit code configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Feb 7, 2024
1 parent 42aa91c commit c945afd
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.slack.kaldb.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -33,18 +33,25 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) {
this.rateLimitExceededErrorCode = 400;
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
}

@Post("/_bulk")
Expand Down Expand Up @@ -77,7 +84,8 @@ public HttpResponse addDocument(String bulkRequest) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
future.complete(
HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), response));
return HttpResponse.of(future);
}
}
Expand Down
6 changes: 5 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ private static Set<Service> getServices(
services.add(datasetRateLimitingService);

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
6 changes: 6 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,10 @@ message PreprocessorConfig {
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;

// Make the rate limit exceeded error code confugurable
// We default to 400 to prioritize fresh logs and drop excess logs
// Set this to 429 for clients to retry the request after a delay
// Only used when we use the bulk API
int32 rate_limit_exceeded_error_code = 11;
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void bootstrapCluster() throws Exception {
datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION);
bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);

bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
bulkApi =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, 429);
}

// I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and
Expand Down

0 comments on commit c945afd

Please sign in to comment.