Skip to content

Commit

Permalink
expose more apis to the preprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Oct 18, 2023
1 parent 8ea307b commit be1791b
Showing 1 changed file with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

import com.google.common.util.concurrent.AbstractService;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.elasticsearchApi.BulkIngestResponse;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
Expand Down Expand Up @@ -154,6 +157,41 @@ public OpenSearchBulkIngestApi(
this.kafkaProducer.initTransactions();
}

// along with the bulk API we also need to expose some node info that logstash needs info from
@Get("/")
public HttpResponse getNodeInfo() {
String output =
"""
{
"name" : "node_name",
"cluster_name" : "cluster_name",
"cluster_uuid" : "uuid",
"version" : {
"number" : "7.12.0",
"build_flavor" : "default",
"build_type" : "deb",
},
"tagline" : "You Know, for Search"
}
""";
return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, output);
}

@Get("/_license")
public HttpResponse getLicenseInfo() {
String output =
"""
{
"license" : {
"status" : "active",
"uid" : "8afdc262-f37a-4b48-ad2e-68e224180640",
"type" : "basic"
}
}
""";
return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, output);
}

/**
* 1. Kaldb does not support the concept of "updates". It's always an add 2. The "index" is used
* as the span name
Expand Down Expand Up @@ -222,6 +260,7 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
// exit.
new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e));
} catch (Exception e) {
LOG.warn("failed transaction with error", e);
try {
kafkaProducer.abortTransaction();
} catch (ProducerFencedException err) {
Expand Down

0 comments on commit be1791b

Please sign in to comment.