Skip to content

Commit

Permalink
Merge branch 'master' into bburkholder/bump-recovery-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb authored Feb 8, 2024
2 parents 0361803 + f96ffbd commit e74d1e3
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 59 deletions.
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,4 @@ preprocessorConfig:
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
rateLimitExceededErrorCode: ${KALDB_PREPROCESSOR_RATE_LIMIT_EXCEEDED_ERROR_CODE:-400}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.slack.kaldb.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -33,18 +33,25 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) {
this.rateLimitExceededErrorCode = 400;
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
}

@Post("/_bulk")
Expand Down Expand Up @@ -77,7 +84,8 @@ public HttpResponse addDocument(String bulkRequest) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
future.complete(
HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), response));
return HttpResponse.of(future);
}
}
Expand Down
37 changes: 30 additions & 7 deletions kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,30 @@ private void recoveryNodeListener(RecoveryNodeMetadata recoveryNodeMetadata) {
* fails. To break this cycle add a enqueue_count value to recovery task so we can stop recovering
* it if the task fails a certain number of times.
*/
private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) {
protected void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) {
try {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.RECOVERING);
RecoveryTaskMetadata recoveryTaskMetadata =
recoveryTaskMetadataStore.getSync(recoveryNodeMetadata.recoveryTaskName);

boolean success = handleRecoveryTask(recoveryTaskMetadata);
if (success) {
// delete the completed recovery task on success
if (!isValidRecoveryTask(recoveryTaskMetadata)) {
LOG.error(
"Invalid recovery task detected, skipping and deleting invalid task {}",
recoveryTaskMetadata);
recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name);
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentSuccess.increment();
} else {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentFailed.increment();
} else {
boolean success = handleRecoveryTask(recoveryTaskMetadata);
if (success) {
// delete the completed recovery task on success
recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name);
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentSuccess.increment();
} else {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeAssignmentFailed.increment();
}
}
} catch (Exception e) {
setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
Expand All @@ -234,6 +243,20 @@ private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetad
}
}

/**
* Attempts a final sanity-check on the recovery task to prevent a bad task from halting the
* recovery pipeline. Bad state should be ideally prevented at the creation, as well as prior to
* assignment, but this can be considered a final fail-safe if invalid recovery tasks somehow made
* it this far.
*/
private boolean isValidRecoveryTask(RecoveryTaskMetadata recoveryTaskMetadata) {
// todo - consider adding further invalid recovery task detections
if (recoveryTaskMetadata.endOffset <= recoveryTaskMetadata.startOffset) {
return false;
}
return true;
}

/**
* This method does the recovery work from a recovery task. A recovery task indicates the start
* and end offset of a kafka partition to index. To do the recovery work, we create a recovery
Expand Down
6 changes: 5 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ private static Set<Service> getServices(
services.add(datasetRateLimitingService);

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ public long determineStartingOffset(
} else if (indexerConfig.getCreateRecoveryTasksOnStart()
&& indexerConfig.getReadFromLocationOnStart()
== KaldbConfigs.KafkaOffsetLocation.LATEST) {
// Todo - this appears to be able to create recovery tasks that have a start and end
// position of 0, which is invalid. This seems to occur when new clusters are initialized,
// and is especially problematic when indexers are created but never get assigned (ie,
// deploy 5, only assign 3).
LOG.info(
"CreateRecoveryTasksOnStart is set and ReadLocationOnStart is set to current. Reading from current and"
+ " spinning up recovery tasks");
Expand Down
6 changes: 6 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,10 @@ message PreprocessorConfig {
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;

// Make the rate limit exceeded error code confugurable
// We default to 400 to prioritize fresh logs and drop excess logs
// Set this to 429 for clients to retry the request after a delay
// Only used when we use the bulk API
int32 rate_limit_exceeded_error_code = 11;
}
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,61 @@ public void testValidateOffsetsWhenRecoveryTaskOverlapsWithEndOfKafkaRange() {
assertThat(offsets.endOffset).isEqualTo(kafkaEndOffset);
}

@Test
public void shouldHandleInvalidRecoveryTasks() throws Exception {
KaldbConfigs.KaldbConfig kaldbCfg = makeKaldbConfig(TEST_S3_BUCKET);
curatorFramework =
CuratorBuilder.build(meterRegistry, kaldbCfg.getMetadataStoreConfig().getZookeeperConfig());

// Start recovery service
recoveryService = new RecoveryService(kaldbCfg, curatorFramework, meterRegistry, blobFs);
recoveryService.startAsync();
recoveryService.awaitRunning(DEFAULT_START_STOP_DURATION);

// Create a recovery task
RecoveryTaskMetadataStore recoveryTaskMetadataStore =
new RecoveryTaskMetadataStore(curatorFramework, false);
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero();
RecoveryTaskMetadata recoveryTask =
new RecoveryTaskMetadata("testRecoveryTask", "0", 0, 0, Instant.now().toEpochMilli());
recoveryTaskMetadataStore.createSync(recoveryTask);
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size())
.isEqualTo(1);
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).get(0))
.isEqualTo(recoveryTask);

// Assign the recovery task to node.
RecoveryNodeMetadataStore recoveryNodeMetadataStore =
new RecoveryNodeMetadataStore(curatorFramework, false);
List<RecoveryNodeMetadata> recoveryNodes =
KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore);
assertThat(recoveryNodes.size()).isEqualTo(1);
RecoveryNodeMetadata recoveryNodeMetadata = recoveryNodes.get(0);
assertThat(recoveryNodeMetadata.recoveryNodeState)
.isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
recoveryNodeMetadataStore.updateSync(
new RecoveryNodeMetadata(
recoveryNodeMetadata.getName(),
Metadata.RecoveryNodeMetadata.RecoveryNodeState.ASSIGNED,
recoveryTask.getName(),
Instant.now().toEpochMilli()));
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size())
.isEqualTo(1);

await().until(() -> getCount(RECOVERY_NODE_ASSIGNMENT_FAILED, meterRegistry) == 1);
assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_SUCCESS, meterRegistry)).isZero();
assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_RECEIVED, meterRegistry)).isEqualTo(1);

// Post recovery checks
assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore).size())
.isEqualTo(1);
assertThat(
KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore)
.get(0)
.recoveryNodeState)
.isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE);
}

// returns startOffset or endOffset based on the supplied OffsetSpec
private static AdminClient getAdminClient(long startOffset, long endOffset) {
AdminClient adminClient = mock(AdminClient.class);
Expand Down
103 changes: 56 additions & 47 deletions kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer;
import com.slack.kaldb.bulkIngestApi.BulkIngestResponse;
import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.metadata.core.CuratorBuilder;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
import com.slack.kaldb.preprocessor.PreprocessorRateLimiter;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.testlib.MetricsUtil;
import com.slack.kaldb.testlib.TestKafkaServer;
import com.slack.kaldb.util.JsonUtil;
import com.slack.kaldb.util.TestingZKServer;
import com.slack.service.murron.trace.Trace;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
Expand All @@ -31,8 +34,8 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -67,7 +70,7 @@ public void bootstrapCluster() throws Exception {
Tracing.newBuilder().build();
meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

zkServer = new TestingServer();
zkServer = TestingZKServer.createTestingServer();
KaldbConfigs.ZookeeperConfig zkConfig =
KaldbConfigs.ZookeeperConfig.newBuilder()
.setZkConnectString(zkServer.getConnectString())
Expand Down Expand Up @@ -122,7 +125,8 @@ public void bootstrapCluster() throws Exception {
datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION);
bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);

bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
bulkApi =
new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, 400);
}

// I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and
Expand Down Expand Up @@ -195,7 +199,12 @@ public void testBulkApiBasic() throws Exception {
{ "index": {"_index": "testindex", "_id": "1"} }
{ "field1" : "value1" }
""";
updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length);
// use the way we calculate the throughput in the rate limiter to get the exact bytes
Map<String, List<Trace.Span>> docs =
BulkApiRequestParser.parseRequest(request1.getBytes(StandardCharsets.UTF_8));
int limit = PreprocessorRateLimiter.getSpanBytes(docs.get("testindex"));
// for some reason if we pass the exact limit, the rate limiter doesn't work as expected
updateDatasetThroughput(limit / 2);

// test with empty causes a parse exception
AggregatedHttpResponse response = bulkApi.addDocument("{}\n").aggregate().join();
Expand All @@ -208,49 +217,30 @@ public void testBulkApiBasic() throws Exception {

// test with request1 twice. first one should succeed, second one will fail because of rate
// limiter
CompletableFuture<AggregatedHttpResponse> response1 =
bulkApi
.addDocument(request1)
.aggregate()
.thenApply(
httpResponse -> {
assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
assertThat(httpResponse.status().code()).isEqualTo(OK.code());
BulkIngestResponse httpResponseObj = null;
try {
httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
} catch (IOException e) {
fail("", e);
}
assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
return httpResponse;
});

CompletableFuture<AggregatedHttpResponse> response2 =
bulkApi
.addDocument(request1)
.aggregate()
.thenApply(
httpResponse -> {
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
BulkIngestResponse httpResponseObj = null;
try {
httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
} catch (IOException e) {
fail("", e);
}
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
return httpResponse;
});

await().until(response1::isDone);
await().until(response2::isDone);
AggregatedHttpResponse httpResponse = bulkApi.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
assertThat(httpResponse.status().code()).isEqualTo(OK.code());
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
} catch (IOException e) {
fail("", e);
}

httpResponse = bulkApi.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(400);
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
} catch (IOException e) {
fail("", e);
}

// test with multiple indexes
String request2 =
Expand All @@ -267,6 +257,25 @@ public void testBulkApiBasic() throws Exception {
assertThat(responseObj.totalDocs()).isEqualTo(0);
assertThat(responseObj.failedDocs()).isEqualTo(0);
assertThat(responseObj.errorMsg()).isEqualTo("request must contain only 1 unique index");

BulkIngestApi bulkApi2 =
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
TOO_MANY_REQUESTS.code());
httpResponse = bulkApi2.addDocument(request1).aggregate().join();
assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
try {
BulkIngestResponse httpResponseObj =
JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
} catch (IOException e) {
fail("", e);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public void testParseKaldbJsonConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);
assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false);
assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(400);

final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
config.getPreprocessorConfig().getKafkaConfig();
Expand Down Expand Up @@ -476,6 +477,7 @@ public void testParseKaldbYamlConfigFile() throws IOException {
assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");

assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true);
assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(429);

final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
Expand Down
Loading

0 comments on commit e74d1e3

Please sign in to comment.