Skip to content

Commit

Permalink
fix(consumer): fix datahub usage event topic consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Apr 20, 2023
1 parent 535e1ab commit d8eb575
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.Topics;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -52,19 +52,14 @@ public void consume(final ConsumerRecord<String, String> consumerRecord) {

Optional<DataHubUsageEventTransformer.TransformedDocument> eventDocument =
dataHubUsageEventTransformer.transformDataHubUsageEvent(record);
if (!eventDocument.isPresent()) {
if (eventDocument.isEmpty()) {
log.warn("Failed to apply usage events transform to record: {}", record);
return;
}
JsonElasticEvent elasticEvent = new JsonElasticEvent(eventDocument.get().getDocument());
try {
elasticEvent.setId(URLEncoder.encode(eventDocument.get().getId(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error("Failed to encode the urn with error: {}", e.toString());
return;
}
elasticEvent.setId(URLEncoder.encode(eventDocument.get().getId(), StandardCharsets.UTF_8));
elasticEvent.setIndex(indexName);
elasticEvent.setActionType(ChangeType.CREATE);
elasticSearchConnector.feedElasticEvent(elasticEvent);
elasticSearchConnector.feedElasticEvent(elasticEvent, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ public ElasticsearchConnector(ESBulkProcessor bulkProcessor, int numRetries) {
_numRetries = numRetries;
}

public void feedElasticEvent(@Nonnull ElasticEvent event) {
/*
Be careful here, we are mixing `DataHub` change type semantics with `Elasticsearch` concepts.
*/
public void feedElasticEvent(@Nonnull ElasticEvent event, boolean allowReplacement) {
if (event.getActionType().equals(ChangeType.DELETE)) {
_bulkProcessor.add(createDeleteRequest(event));
} else if (event.getActionType().equals(ChangeType.CREATE)) {
_bulkProcessor.add(createIndexRequest(event));
if (allowReplacement) {
createIndexRequestWithReplace(event);
} else {
_bulkProcessor.add(createIndexRequest(event));
}
} else if (event.getActionType().equals(ChangeType.UPDATE)) {
_bulkProcessor.add(createUpsertRequest(event));
}
Expand All @@ -40,6 +47,13 @@ private static IndexRequest createIndexRequest(@Nonnull ElasticEvent event) {
.opType(DocWriteRequest.OpType.CREATE);
}

@Nonnull
private static IndexRequest createIndexRequestWithReplace(@Nonnull ElasticEvent event) {
return new IndexRequest(event.getIndex()).id(event.getId())
.source(event.buildJson())
.opType(DocWriteRequest.OpType.INDEX);
}

@Nonnull
private static DeleteRequest createDeleteRequest(@Nonnull ElasticEvent event) {
return new DeleteRequest(event.getIndex()).id(event.getId());
Expand Down

0 comments on commit d8eb575

Please sign in to comment.