Skip to content

Commit

Permalink
add a new schema config file
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Feb 26, 2024
1 parent 172f0b3 commit 5f8e58d
Show file tree
Hide file tree
Showing 19 changed files with 319 additions and 21 deletions.
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ preprocessorConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
kaldbSchemaFile: ${PREPROCESSOR_SCHEMA_FILE:-schema.yaml}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand Down
1 change: 1 addition & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fields:
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -34,12 +35,14 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;
private final Schema.KaldbSchema kaldbSchema;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {
int rateLimitExceededErrorCode,
Schema.KaldbSchema kaldbSchema) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
Expand All @@ -52,6 +55,7 @@ public BulkIngestApi(
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
this.kaldbSchema = kaldbSchema;
}

@Post("/_bulk")
Expand All @@ -65,7 +69,8 @@ public HttpResponse addDocument(String bulkRequest) {
try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);
Map<String, List<Trace.Span>> docs =
BulkApiRequestParser.parseRequest(bulkRequestBytes, kaldbSchema);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
Expand Down Expand Up @@ -36,8 +37,9 @@ public class BulkApiRequestParser {

private static final String SERVICE_NAME_KEY = "service_name";

public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
public static Map<String, List<Trace.Span>> parseRequest(
byte[] postBody, Schema.KaldbSchema kaldbSchema) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody), kaldbSchema);
}

/**
Expand Down Expand Up @@ -81,7 +83,8 @@ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument)
}

@VisibleForTesting
public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
public static Trace.Span fromIngestDocument(
IngestDocument ingestDocument, Schema.KaldbSchema kaldbSchema) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Expand Down Expand Up @@ -124,7 +127,7 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
}

protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
List<IndexRequest> indexRequests) {
List<IndexRequest> indexRequests, Schema.KaldbSchema kaldbSchema) {
// key - index. value - list of docs to be indexed
Map<String, List<Trace.Span>> indexDocs = new HashMap<>();

Expand All @@ -135,7 +138,7 @@ protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
}
IngestDocument ingestDocument = convertRequestToDocument(indexRequest);
List<Trace.Span> docs = indexDocs.computeIfAbsent(index, key -> new ArrayList<>());
docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument));
docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument, kaldbSchema));
}
return indexDocs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.slack.kaldb.metadata.schema;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.slack.kaldb.proto.schema.Schema;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.text.StringSubstitutor;
import org.apache.commons.text.lookup.StringLookup;

public class KaldbSchemaUtil {

public static Schema.KaldbSchema parseSchema(Path schemaPath) throws IOException {
String filename = schemaPath.getFileName().toString();
if (filename.endsWith(".yaml")) {
return parseSchemaYaml(Files.readString(schemaPath), System::getenv);
} else if (filename.endsWith(".json")) {
return parseJsonSchema(Files.readString(schemaPath));
} else {
throw new RuntimeException(
"Invalid config file format provided - must be either .json or .yaml");
}
}

@VisibleForTesting
public static Schema.KaldbSchema parseSchemaYaml(String yamlStr, StringLookup variableResolver)
throws JsonProcessingException, InvalidProtocolBufferException {
StringSubstitutor substitute = new StringSubstitutor(variableResolver);
ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
ObjectMapper jsonWriter = new ObjectMapper();

Object obj = yamlReader.readValue(substitute.replace(yamlStr), Object.class);
return parseJsonSchema(jsonWriter.writeValueAsString(obj));
}

@VisibleForTesting
public static Schema.KaldbSchema parseJsonSchema(String jsonStr)
throws InvalidProtocolBufferException {
Schema.KaldbSchema.Builder kaldbSchemaBuilder = Schema.KaldbSchema.newBuilder();
JsonFormat.parser().merge(jsonStr, kaldbSchemaBuilder);
Schema.KaldbSchema kaldbSchema = kaldbSchemaBuilder.build();
// TODO: validate schema
return kaldbSchema;
}
}
8 changes: 7 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import com.slack.kaldb.metadata.recovery.RecoveryNodeMetadataStore;
import com.slack.kaldb.metadata.recovery.RecoveryTaskMetadataStore;
import com.slack.kaldb.metadata.replica.ReplicaMetadataStore;
import com.slack.kaldb.metadata.schema.KaldbSchemaUtil;
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.preprocessor.PreprocessorService;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.proto.metadata.Metadata;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.recovery.RecoveryService;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.kaldb.zipkinApi.ZipkinService;
Expand Down Expand Up @@ -398,12 +400,16 @@ private static Set<Service> getServices(
new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry);
services.add(datasetRateLimitingService);

Schema.KaldbSchema kaldbSchema =
KaldbSchemaUtil.parseSchema(Path.of(preprocessorConfig.getKaldbSchemaFile()));

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
preprocessorConfig.getRateLimitExceededErrorCode(),
kaldbSchema);
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
2 changes: 2 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,6 @@ message PreprocessorConfig {
// Set this to 429 for clients to retry the request after a delay
// Only used when we use the bulk API
int32 rate_limit_exceeded_error_code = 11;

string kaldb_schema_file = 12;
}
33 changes: 33 additions & 0 deletions kaldb/src/main/proto/schema.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package slack.proto.kaldb.schema;

option java_package = "com.slack.kaldb.proto.schema";

message KaldbSchema {
// convert to a map
map<string, SchemaField> fields = 1;
}

message SchemaField {
SchemaFieldType type = 2;
// other field definitions in the future
}

// https://opensearch.org/docs/2.4/opensearch/supported-field-types/index/
// Add the remaining types as needed
enum SchemaFieldType {
KEYWORD = 0;
TEXT = 1;
IP = 2;
DATE = 3;
BOOLEAN = 4;
DOUBLE = 5;
FLOAT = 6;
HALF_FLOAT = 7;
INTEGER = 8;
LONG = 9;
SCALED_LONG = 10;
SHORT = 11;
BYTE = 12;
};
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.io.Resources;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -38,7 +39,8 @@ public void testSimpleIndexRequest() throws Exception {
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(3);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);

Expand All @@ -62,7 +64,8 @@ public void testIndexNoFields() throws Exception {
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);

Expand All @@ -85,7 +88,8 @@ public void testIndexNoFieldsNoId() throws Exception {
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);

Expand All @@ -108,7 +112,8 @@ public void testIndexEmptyRequest() throws Exception {
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(0);
}

Expand All @@ -125,7 +130,8 @@ public void testIndexRequestWithSpecialChars() throws Exception {
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("index_name").size()).isEqualTo(1);

Expand All @@ -148,7 +154,8 @@ public void testBulkRequests() throws Exception {
assertThat(indexRequests.size()).isEqualTo(2);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(2);
assertThat(indexDocs.get("test1").size()).isEqualTo(1);
assertThat(indexDocs.get("test3").size()).isEqualTo(1);
Expand Down Expand Up @@ -187,7 +194,8 @@ public void testUpdatesAgainstTwoIndexes() throws Exception {
assertThat(indexRequests.size()).isEqualTo(2);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
BulkApiRequestParser.convertIndexRequestToTraceFormat(
indexRequests, Schema.KaldbSchema.newBuilder().build());
assertThat(indexDocs.keySet().size()).isEqualTo(2);
assertThat(indexDocs.get("test1").size()).isEqualTo(1);
assertThat(indexDocs.get("test2").size()).isEqualTo(1);
Expand All @@ -204,7 +212,9 @@ public void testTraceSpanGeneratedTimestamp() throws IOException {
assertThat(indexRequests.size()).isEqualTo(1);

IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0));
Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument);
Trace.Span span =
BulkApiRequestParser.fromIngestDocument(
ingestDocument, Schema.KaldbSchema.newBuilder().build());

// timestamp is in microseconds based on the trace.proto definition
Instant ingestDocumentTime =
Expand Down
Loading

0 comments on commit 5f8e58d

Please sign in to comment.