Skip to content

Commit

Permalink
feat(schema-registry): replace confluent schema registry (#7930)
Browse files Browse the repository at this point in the history
Co-authored-by: Pedro Silva <[email protected]>
Co-authored-by: Shirshanka Das <[email protected]>
Co-authored-by: Ryan Holstien <[email protected]>
  • Loading branch information
4 people authored May 1, 2023
1 parent ebb2af6 commit cd05f5b
Show file tree
Hide file tree
Showing 128 changed files with 7,787 additions and 310 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ project.ext.externalDependency = [
'testContainersPostgresql':'org.testcontainers:postgresql:' + testContainersVersion,
'testContainersElasticsearch': 'org.testcontainers:elasticsearch:' + testContainersVersion,
'testContainersCassandra': 'org.testcontainers:cassandra:' + testContainersVersion,
'testContainersKafka': 'org.testcontainers:kafka:' + testContainersVersion,
'typesafeConfig':'com.typesafe:config:1.4.1',
'wiremock':'com.github.tomakehurst:wiremock:2.10.0',
'zookeeper': 'org.apache.zookeeper:zookeeper:3.4.14'
Expand Down
1 change: 1 addition & 0 deletions docker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ task quickstart(type: Exec, dependsOn: ':metadata-ingestion:install') {
'--version', "v${version}",
'--dump-logs-on-failure'
]

commandLine 'bash', '-c', cmd.join(" ")
}

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-actions/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ DATAHUB_GMS_PORT=8080

KAFKA_BOOTSTRAP_SERVER=broker:29092
SCHEMA_REGISTRY_URL=http://schema-registry:8081
# SCHEMA_REGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=MetadataChangeLog_Versioned_v1

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=fal
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.cassandra.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=fal
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.mariadb.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_URL=jdbc:mariadb://mariadb:3306/datahub
EBEAN_DATASOURCE_DRIVER=org.mariadb.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.postgres.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ EBEAN_DATASOURCE_DRIVER=org.postgresql.Driver
# EBEAN_POSTGRES_USE_AWS_IAM_AUTH=true
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
3 changes: 2 additions & 1 deletion docker/datahub-mae-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

MAE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=false
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
3 changes: 2 additions & 1 deletion docker/datahub-mae-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

MAE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=false
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-mce-consumer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RUN chmod +x /datahub/datahub-mce-consumer/scripts/start.sh
FROM base as dev-install
# Dummy stage for development. Assumes code is built on your machine and mounted to this image.
# See this excellent thread https://github.com/docker/cli/issues/1134
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-mce-consumer/resources/entity-registry.yml

FROM ${APP_ENV}-install as final

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-mce-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=fal
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
3 changes: 2 additions & 1 deletion docker/datahub-mce-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ MCE_CONSUMER_ENABLED=true
EBEAN_DATASOURCE_USERNAME=datahub
EBEAN_DATASOURCE_PASSWORD=datahub
EBEAN_DATASOURCE_HOST=mysql:3306
EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8&enabledTLSProtocols=TLSv1.2
EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver

KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/

ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver

KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/

ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
depends_on:
- broker
ports:
- "8081:8081"
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081

elasticsearch:
image: elasticsearch:7.10.1
Expand Down
8 changes: 4 additions & 4 deletions docker/docker-compose.consumers.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ services:
datahub-mae-consumer:
image: linkedin/datahub-mae-consumer:debug
build:
context: datahub-mae-consumer
dockerfile: Dockerfile
context: ../
dockerfile: docker/datahub-mae-consumer/Dockerfile
args:
APP_ENV: dev
volumes:
Expand All @@ -16,8 +16,8 @@ services:
datahub-mce-consumer:
image: linkedin/datahub-mce-consumer:debug
build:
context: datahub-mce-consumer
dockerfile: Dockerfile
context: ../
dockerfile: docker/datahub-mce-consumer/Dockerfile
args:
APP_ENV: dev
volumes:
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.tools.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
depends_on:
- zookeeper
- broker
- schema-registry
- schema-registry # -datahub-gms
- kafka-rest-proxy

kibana:
Expand Down
1 change: 1 addition & 0 deletions docker/kafka-rest-proxy/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
KAFKA_REST_LISTENERS=http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081/
# KAFKA_REST_SCHEMA_REGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
KAFKA_REST_HOST_NAME=kafka-rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS=PLAINTEXT://broker:29092
1 change: 1 addition & 0 deletions docker/kafka-setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ENV METADATA_CHANGE_PROPOSAL_TOPIC_NAME="MetadataChangeProposal_v1"
ENV FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME="FailedMetadataChangeProposal_v1"
ENV PLATFORM_EVENT_TOPIC_NAME="PlatformEvent_v1"
ENV DATAHUB_UPGRADE_HISTORY_TOPIC_NAME="DataHubUpgradeHistory_v1"
ENV USE_CONFLUENT_SCHEMA_REGISTRY="TRUE"

COPY docker/kafka-setup/kafka-setup.sh ./kafka-setup.sh
COPY docker/kafka-setup/kafka-config.sh ./kafka-config.sh
Expand Down
1 change: 1 addition & 0 deletions docker/kafka-setup/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_BOOTSTRAP_SERVER=broker:29092
USE_CONFLUENT_SCHEMA_REGISTRY=TRUE

# Configure the topics that are created by kafka-setup
# Make sure these names are consistent across the whole deployment
Expand Down
8 changes: 7 additions & 1 deletion docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ echo "Topic Creation Complete."
# End Topic Creation Logic
############################################################

kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
## If using confluent schema registry as a standalone component, then configure compact cleanup policy.
if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--entity-type topics \
--entity-name _schemas \
--alter --add-config cleanup.policy=compact
fi

# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- MAE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=false
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
Expand Down
4 changes: 2 additions & 2 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- MAE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=false
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
Expand Down Expand Up @@ -41,7 +41,7 @@ services:
- EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
- EBEAN_DATASOURCE_HOST=mysql:3306
- EBEAN_DATASOURCE_PASSWORD=datahub
- EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8&enabledTLSProtocols=TLSv1.2
- EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8
- EBEAN_DATASOURCE_USERNAME=datahub
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
1 change: 1 addition & 0 deletions docker/schema-registry-ui/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
SCHEMAREGISTRY_URL=http://schema-registry:8081
# SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ALLOW_GLOBAL=true
ALLOW_TRANSITIVE=true
ALLOW_DELETION=true
Expand Down
13 changes: 9 additions & 4 deletions docs/deploy/environment-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ In general, there are **lots** of Kafka configuration environment variables for
These environment variables follow the standard Spring representation of properties as environment variables.
Simply replace the dot, `.`, with an underscore, `_`, and convert to uppercase.

| Variable | Default | Unit/Type | Components | Description |
|------------------------------------------------------|----------|-----------|------------------------------------------|--------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|-----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.opentelemetry.extension.annotations.WithSpan;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -95,7 +96,7 @@ record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);

@Override
@WithSpan
public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec aspectSpec,
public Future<?> produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec aspectSpec,
@Nonnull final MetadataChangeLog metadataChangeLog) {
GenericRecord record;
try {
Expand All @@ -112,14 +113,14 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
if (aspectSpec.isTimeseries()) {
topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName();
}
_producer.send(new ProducerRecord(topic, urn.toString(), record),
return _producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString()));
}

@Override
@WithSpan
public void produceMetadataChangeProposal(@Nonnull final Urn urn, @Nonnull final MetadataChangeProposal
metadataChangeProposal) {
public Future<?> produceMetadataChangeProposal(@Nonnull final Urn urn,
@Nonnull final MetadataChangeProposal metadataChangeProposal) {
GenericRecord record;

try {
Expand All @@ -133,12 +134,12 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal);
}

String topic = _topicConvention.getMetadataChangeProposalTopicName();
_producer.send(new ProducerRecord(topic, urn.toString(), record),
return _producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString()));
}

@Override
public void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) {
public Future<?> producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) {
GenericRecord record;
try {
log.debug(String.format("Converting Pegasus Event to Avro Event urn %s\nEvent: %s",
Expand All @@ -151,7 +152,7 @@ record = EventUtils.pegasusToAvroPE(event);
}

final String topic = _topicConvention.getPlatformEventTopicName();
_producer.send(new ProducerRecord(topic, key == null ? name : key, record),
return _producer.send(new ProducerRecord(topic, key == null ? name : key, record),
_kafkaHealthChecker.getKafkaCallBack("Platform Event", name));
}

Expand Down
21 changes: 21 additions & 0 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,27 @@ def download_compose_files(
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")
if kafka_setup:
kafka_setup_github_file = f"{base_url}/{KAFKA_SETUP_QUICKSTART_COMPOSE_FILE}"

default_kafka_compose_file = (
Path(DATAHUB_ROOT_FOLDER) / "quickstart/docker-compose.kafka-setup.yml"
)
with open(
default_kafka_compose_file, "wb"
) if default_kafka_compose_file else tempfile.NamedTemporaryFile(
suffix=".yml", delete=False
) as tmp_file:
path = pathlib.Path(tmp_file.name)
quickstart_compose_file_list.append(path)
click.echo(
f"Fetching consumer docker-compose file {kafka_setup_github_file} from GitHub"
)
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = request_session.get(kafka_setup_github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")


def valid_restore_options(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class _KafkaConnectionConfig(ConfigModel):
bootstrap: str = "localhost:9092"

# schema registry location
schema_registry_url: str = "http://localhost:8081"
schema_registry_url: str = "http://localhost:8080/schema-registry/api/"

schema_registry_config: dict = Field(
default_factory=dict,
Expand Down
Loading

0 comments on commit cd05f5b

Please sign in to comment.