Skip to content

Commit

Permalink
add a new schema config file
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Feb 27, 2024
1 parent 172f0b3 commit 2a70a39
Show file tree
Hide file tree
Showing 21 changed files with 503 additions and 69 deletions.
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ preprocessorConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
kaldbSchemaFile: ${PREPROCESSOR_SCHEMA_FILE:-schema.yaml}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand Down
1 change: 1 addition & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fields:
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -34,12 +35,14 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;
private final Schema.KaldbSchema kaldbSchema;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {
int rateLimitExceededErrorCode,
Schema.KaldbSchema kaldbSchema) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
Expand All @@ -52,6 +55,7 @@ public BulkIngestApi(
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
this.kaldbSchema = kaldbSchema;
}

@Post("/_bulk")
Expand All @@ -65,7 +69,8 @@ public HttpResponse addDocument(String bulkRequest) {
try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);
Map<String, List<Trace.Span>> docs =
BulkApiRequestParser.parseRequest(bulkRequestBytes, kaldbSchema);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
Expand Down Expand Up @@ -36,8 +37,9 @@ public class BulkApiRequestParser {

private static final String SERVICE_NAME_KEY = "service_name";

public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
public static Map<String, List<Trace.Span>> parseRequest(
byte[] postBody, Schema.KaldbSchema kaldbSchema) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody), kaldbSchema);
}

/**
Expand Down Expand Up @@ -81,7 +83,8 @@ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument)
}

@VisibleForTesting
public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
public static Trace.Span fromIngestDocument(
IngestDocument ingestDocument, Schema.KaldbSchema kaldbSchema) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Expand All @@ -99,6 +102,15 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
// Trace.Span proto expects duration in microseconds today
spanBuilder.setTimestamp(
TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS));
// back-compat. Remove it once all components have moved to reading service_name from the top
// level
spanBuilder.addTags(
Trace.KeyValue.newBuilder()
.setKey(SERVICE_NAME_KEY)
.setVType(Trace.ValueType.STRING)
.setVStr(index)
.build());
spanBuilder.setServiceName(index);

// Remove the following internal metadata fields that OpenSearch adds
sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName());
Expand All @@ -113,18 +125,86 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
sourceAndMetadata.remove("@timestamp");

sourceAndMetadata.forEach(
(key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value)));
spanBuilder.addTags(
Trace.KeyValue.newBuilder()
.setKey(SERVICE_NAME_KEY)
.setVType(Trace.ValueType.STRING)
.setVStr(index)
.build());
(key, value) -> spanBuilder.addTags(convertFieldsToProto(key, value, kaldbSchema)));

return spanBuilder.build();
}

public static Trace.KeyValue convertFieldsToProto(
String key, Object value, Schema.KaldbSchema kaldbSchema) {
if (kaldbSchema.containsFields(key)) {
Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder();
tagBuilder.setKey(key);
switch (kaldbSchema.getFieldsMap().get(key).getType()) {
case KEYWORD -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
}
case TEXT -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.TEXT);
tagBuilder.setVStr(value.toString());
}
case IP -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.IP);
tagBuilder.setVStr(value.toString());
}
case DATE -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.DATE);
tagBuilder.setVStr(value.toString());
}
case BOOLEAN -> {
tagBuilder.setVType(Trace.ValueType.BOOL);
tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN);
tagBuilder.setVBool((boolean) value);
}
case DOUBLE -> {
tagBuilder.setVType(Trace.ValueType.FLOAT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE);
tagBuilder.setVFloat64((double) value);
}
case FLOAT -> {
tagBuilder.setVType(Trace.ValueType.FLOAT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT);
tagBuilder.setVFloat32((float) value);
}
case HALF_FLOAT -> {
tagBuilder.setVType(Trace.ValueType.FLOAT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.HALF_FLOAT);
tagBuilder.setVFloat32((float) value);
}
case INTEGER -> {
tagBuilder.setVType(Trace.ValueType.INT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER);
tagBuilder.setVInt32((int) value);
}
case LONG -> {
tagBuilder.setVType(Trace.ValueType.INT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.LONG);
tagBuilder.setVInt64((long) value);
}
case SCALED_LONG -> {
tagBuilder.setVType(Trace.ValueType.INT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.SCALED_LONG);
tagBuilder.setVInt64((long) value);
}
case BYTE -> {
tagBuilder.setVType(Trace.ValueType.INT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.BYTE);
tagBuilder.setVInt32((int) value);
}
}
return tagBuilder.build();
} else {
return SpanFormatter.convertKVtoProto(key, value);
}
}

protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
List<IndexRequest> indexRequests) {
List<IndexRequest> indexRequests, Schema.KaldbSchema kaldbSchema) {
// key - index. value - list of docs to be indexed
Map<String, List<Trace.Span>> indexDocs = new HashMap<>();

Expand All @@ -135,7 +215,7 @@ protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
}
IngestDocument ingestDocument = convertRequestToDocument(indexRequest);
List<Trace.Span> docs = indexDocs.computeIfAbsent(index, key -> new ArrayList<>());
docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument));
docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument, kaldbSchema));
}
return indexDocs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.slack.kaldb.metadata.schema;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.slack.kaldb.proto.schema.Schema;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.text.StringSubstitutor;
import org.apache.commons.text.lookup.StringLookup;

public class KaldbSchemaUtil {

public static Schema.KaldbSchema parseSchema(Path schemaPath) throws IOException {
String filename = schemaPath.getFileName().toString();
if (filename.endsWith(".yaml")) {
return parseSchemaYaml(Files.readString(schemaPath), System::getenv);
} else if (filename.endsWith(".json")) {
return parseJsonSchema(Files.readString(schemaPath));
} else {
throw new RuntimeException(
"Invalid config file format provided - must be either .json or .yaml");
}
}

@VisibleForTesting
public static Schema.KaldbSchema parseSchemaYaml(String yamlStr, StringLookup variableResolver)
throws JsonProcessingException, InvalidProtocolBufferException {
StringSubstitutor substitute = new StringSubstitutor(variableResolver);
ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
ObjectMapper jsonWriter = new ObjectMapper();

Object obj = yamlReader.readValue(substitute.replace(yamlStr), Object.class);
return parseJsonSchema(jsonWriter.writeValueAsString(obj));
}

@VisibleForTesting
public static Schema.KaldbSchema parseJsonSchema(String jsonStr)
throws InvalidProtocolBufferException {
Schema.KaldbSchema.Builder kaldbSchemaBuilder = Schema.KaldbSchema.newBuilder();
JsonFormat.parser().merge(jsonStr, kaldbSchemaBuilder);
Schema.KaldbSchema kaldbSchema = kaldbSchemaBuilder.build();
// TODO: validate schema
return kaldbSchema;
}
}
8 changes: 7 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import com.slack.kaldb.metadata.recovery.RecoveryNodeMetadataStore;
import com.slack.kaldb.metadata.recovery.RecoveryTaskMetadataStore;
import com.slack.kaldb.metadata.replica.ReplicaMetadataStore;
import com.slack.kaldb.metadata.schema.KaldbSchemaUtil;
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.preprocessor.PreprocessorService;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.proto.metadata.Metadata;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.recovery.RecoveryService;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.kaldb.zipkinApi.ZipkinService;
Expand Down Expand Up @@ -398,12 +400,16 @@ private static Set<Service> getServices(
new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry);
services.add(datasetRateLimitingService);

Schema.KaldbSchema kaldbSchema =
KaldbSchemaUtil.parseSchema(Path.of(preprocessorConfig.getKaldbSchemaFile()));

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
preprocessorConfig.getRateLimitExceededErrorCode(),
kaldbSchema);
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
Loading

0 comments on commit 2a70a39

Please sign in to comment.