Skip to content

Commit

Permalink
fix(consumer): fix datahub usage event topic consumer (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored May 1, 2023
1 parent 9ef281a commit 87aa792
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 15 deletions.
1 change: 1 addition & 0 deletions datahub-frontend/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ auth.native.enabled = ${?AUTH_NATIVE_ENABLED}
auth.session.ttlInHours = 720
auth.session.ttlInHours = ${?AUTH_SESSION_TTL_HOURS}

analytics.enabled = true
analytics.enabled = ${?DATAHUB_ANALYTICS_ENABLED}

# Kafka Producer Configuration
Expand Down
10 changes: 5 additions & 5 deletions docs/deploy/environment-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ DataHub works.

## Feature Flags

| Variable | Default | Unit/Type | Components | Description |
|---------------------------------------------------|---------|-----------|-----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| `UI_INGESTION_ENABLED` | `true` | boolean | [`GMS`, `MCE Consumer`] | Enable UI based ingestion. |
| `DATAHUB_ANALYTICS_ENABLED` | `true` | boolean | [`Frontend`, `GMS`] | Enabled analytics within DataHub. |
| `BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE` | `true` | boolean | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Do not wait for the `system-update` to complete before starting. This should typically only be disabled during development. |
| Variable | Default | Unit/Type | Components | Description |
|--------------------------------------------------|---------|-----------|-----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| `UI_INGESTION_ENABLED` | `true` | boolean | [`GMS`, `MCE Consumer`] | Enable UI based ingestion. |
| `DATAHUB_ANALYTICS_ENABLED` | `true` | boolean | [`Frontend`, `GMS`] | Collect DataHub usage to populate the analytics dashboard. | |
| `BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE` | `true` | boolean | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Do not wait for the `system-update` to complete before starting. This should typically only be disabled during development. |

## Ingestion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.Topics;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -52,19 +52,36 @@ public void consume(final ConsumerRecord<String, String> consumerRecord) {

Optional<DataHubUsageEventTransformer.TransformedDocument> eventDocument =
dataHubUsageEventTransformer.transformDataHubUsageEvent(record);
if (!eventDocument.isPresent()) {
if (eventDocument.isEmpty()) {
log.warn("Failed to apply usage events transform to record: {}", record);
return;
}
JsonElasticEvent elasticEvent = new JsonElasticEvent(eventDocument.get().getDocument());
try {
elasticEvent.setId(URLEncoder.encode(eventDocument.get().getId(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error("Failed to encode the urn with error: {}", e.toString());
return;
}
elasticEvent.setId(generateDocumentId(eventDocument.get().getId(), consumerRecord.offset()));
elasticEvent.setIndex(indexName);
elasticEvent.setActionType(ChangeType.CREATE);
elasticSearchConnector.feedElasticEvent(elasticEvent);
}

/**
* DataHub Usage Event is written to an append-only index called a data stream. Due to circumstances
* it is possible that the event's id, even though it contains an epoch millisecond, results in duplicate ids
* in the index. The collisions will stall processing of the topic. To prevent the collisions we append
* the last 5 digits, padded with zeros, of the kafka offset to prevent the collision.
* @param eventId the event's id
* @param kafkaOffset the kafka offset for the message
* @return unique identifier for event
*/
private static String generateDocumentId(String eventId, long kafkaOffset) {
return URLEncoder.encode(String.format("%s_%05d", eventId, leastSignificant(kafkaOffset, 5)), StandardCharsets.UTF_8);
}

private static int leastSignificant(long kafkaOffset, int digits) {
final String input = String.valueOf(kafkaOffset);
if (input.length() > digits) {
return Integer.parseInt(input.substring(input.length() - digits));
} else {
return Integer.parseInt(input);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public ElasticsearchConnector(ESBulkProcessor bulkProcessor, int numRetries) {
_numRetries = numRetries;
}

/*
Be careful here, we are mixing `DataHub` change type semantics with `Elasticsearch` concepts.
*/
public void feedElasticEvent(@Nonnull ElasticEvent event) {
if (event.getActionType().equals(ChangeType.DELETE)) {
_bulkProcessor.add(createDeleteRequest(event));
Expand Down
4 changes: 2 additions & 2 deletions smoke-test/cypress-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ python -c 'from tests.cypress.integration_test import ingest_data; ingest_data()
cd tests/cypress
npm install

source ./set-cypress-creds.sh
source ../../set-cypress-creds.sh

npx cypress open
npx cypress open

0 comments on commit 87aa792

Please sign in to comment.