Skip to content

Commit

Permalink
Merge pull request #16 from DataDog/mackjmr/add-kafka-staging
Browse files Browse the repository at this point in the history
Add kafka/{zookeeper,producer,consumer}
  • Loading branch information
mackjmr authored Aug 7, 2023
2 parents b41eb32 + 8715391 commit e8318c8
Show file tree
Hide file tree
Showing 18 changed files with 871 additions and 16 deletions.
9 changes: 9 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,19 @@ build-ci-image-kafka:
DOCKERFILE: src/kafka/Dockerfile
IMAGE_TAG_SUFFIX: kafka
CONTEXT: .
build-ci-image-orderproducer:
!!merge <<: *build-ci-image
variables:
DOCKERFILE: src/orderproducerservice/Dockerfile
IMAGE_TAG_SUFFIX: orderproducer
CONTEXT: .
.prod-deploy: &prod-deploy
stage: prod-deploy
tags: ["runner:docker", "size:large"]
image: $CI_IMAGE
rules:
- if: '$CI_COMMIT_REF_NAME == "prod"'
- if: '$CI_COMMIT_REF_NAME =~ /-staging$/'
script:
# # For debugging
#- aws sts get-caller-identity
Expand Down
7 changes: 7 additions & 0 deletions ci/datadog-agent-values.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
agents:
image:
tagSuffix: jmx
datadog:
logs:
enabled: true
tags:
- "env:otel-ingest"
otlp:
receiver:
protocols:
Expand Down
8 changes: 7 additions & 1 deletion ci/scripts/ci-deploy-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ install_demo() {
# Set the namespace and release name
release_name="opentelemetry-demo"

# Deploy zookeeper which is not a default component.
kubectl apply -f ./src/zookeeperservice/deployment.yaml -n "${namespace}"

# if repo already exists, helm 3+ will skip
helm --debug repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts

# --install will run `helm install` if not already present.
helm --debug upgrade "${release_name}" -n "${namespace}" open-telemetry/opentelemetry-demo --install \
-f ./ci/values.yaml \
--set-string default.image.tag="v$CI_COMMIT_SHORT_SHA"


# Deploy java order producer which is not a default component.
sed -i "s/PLACEHOLDER_COMMIT_SHA/v$CI_COMMIT_SHORT_SHA/g" ./src/orderproducerservice/deployment.yaml
kubectl apply -f ./src/orderproducerservice/deployment.yaml -n "${namespace}"
}

###########################################################################################################
Expand Down
70 changes: 69 additions & 1 deletion ci/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ default:
value: >-
service.name=$(OTEL_SERVICE_NAME),
service.instance.id=$(OTEL_K8S_POD_UID),
ervice.namespace=opentelemetry-demo,
service.namespace=opentelemetry-demo,
k8s.namespace.name=$(OTEL_K8S_NAMESPACE),
k8s.node.name=$(OTEL_K8S_NODE_NAME),
k8s.pod.name=$(OTEL_K8S_POD_NAME),
Expand Down Expand Up @@ -81,6 +81,43 @@ components:
resources:
limits:
memory: 200Mi
ports:
- name: jmx-consumer
value: 1098
env:
- name: KAFKA_SERVICE_ADDR
value: 'opentelemetry-demo-kafka:9092'
- name: OTEL_RESOURCE_ATTRIBUTES
value: >-
service.name=$(OTEL_SERVICE_NAME),
service.instance.id=$(OTEL_K8S_POD_UID),
service.namespace=opentelemetry-demo,
k8s.namespace.name=$(OTEL_K8S_NAMESPACE),
k8s.node.name=$(OTEL_K8S_NODE_NAME),
k8s.pod.name=$(OTEL_K8S_POD_NAME),
deployment.environment=$(OTEL_K8S_NAMESPACE),
k8s.pod.ip=$(POD_IP),
kafka_source=javainstrumentation
podAnnotations:
ad.datadoghq.com/frauddetectionservice.logs: '[{"source":"kafka","service":"frauddetectionservice"}]'
ad.datadoghq.com/frauddetectionservice.checks: |
{
"kafka": {
"init_config": {
"is_jmx": true,
"collect_default_metrics": true
},
"instances": [
{
"host": "opentelemetry-demo-frauddetectionservice",
"port": "1098",
"tags": [
"kafka_source:agent_consumer"
]
}
]
}
}
frontend:
resources:
limits:
Expand Down Expand Up @@ -129,6 +166,37 @@ components:
resources:
limits:
memory: 1Gi
ports:
- name: jmx
value: 1099
- name: plaintext
value: 9092
- name: controller
value: 9093
initContainers:
- name: wait-for-zookeeper
image: busybox:latest
command: ['sh', '-c', 'until nc -z -v -w30 opentelemetry-demo-zookeeper 2181; do echo waiting for zookeeper; sleep 2; done;']
podAnnotations:
ad.datadoghq.com/kafka.logs: '[{"source":"kafka","service":"kafka"}]'
ad.datadoghq.com/kafka.checks: |
{
"kafka": {
"init_config": {
"is_jmx": true,
"collect_default_metrics": true
},
"instances": [
{
"host": "opentelemetry-demo-kafka",
"port": "1099",
"tags": [
"kafka_source:agent"
]
}
]
}
}
serviceAccount:
create: false
opentelemetry-collector:
Expand Down
3 changes: 2 additions & 1 deletion src/frauddetectionservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ WORKDIR /usr/src/app/
COPY --from=builder /usr/src/app/build/libs/frauddetectionservice-1.0-all.jar ./
ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v$version/opentelemetry-javaagent.jar /app/opentelemetry-javaagent.jar
# RUN chmod 644 /app/opentelemetry-javaagent.jar
ENV JAVA_TOOL_OPTIONS=-javaagent:/app/opentelemetry-javaagent.jar
ENV JAVA_TOOL_OPTIONS="-javaagent:/app/opentelemetry-javaagent.jar -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=opentelemetry-demo-frauddetectionservice -Dcom.sun.management.jmxremote.port=1098 -Dcom.sun.management.jmxremote.rmi.port=1098"
ENV JMX_PORT=1098

ENTRYPOINT [ "java", "-jar", "frauddetectionservice-1.0-all.jar" ]
21 changes: 8 additions & 13 deletions src/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,18 @@ FROM registry.ddbuild.io/images/mirror/confluentinc/cp-kafka:7.4.0

USER root
ARG version=1.23.0
ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v$version/opentelemetry-javaagent.jar /tmp/opentelemetry-javaagent.jar
RUN chmod go+r /tmp/opentelemetry-javaagent.jar

USER appuser

ENV KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
ENV KAFKA_CONTROLLER_QUORUM_VOTERS='[email protected]:9093'
ENV KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
ENV KAFKA_BROKER_ID=1
ENV KAFKA_ZOOKEEPER_CONNECT='opentelemetry-demo-zookeeper:2181'
ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
ENV KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
ENV KAFKA_PROCESS_ROLES=controller,broker
ENV KAFKA_NODE_ID=1
ENV KAFKA_METADATA_LOG_SEGMENT_MS=15000
ENV KAFKA_METADATA_MAX_RETENTION_MS=60000
ENV KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS=2800
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
ENV KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092
ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ENV KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
ENV KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
ENV KAFKA_OPTS="-javaagent:/tmp/opentelemetry-javaagent.jar -Dotel.jmx.target.system=kafka-broker"
ENV CLUSTER_ID=ckjPoprWQzOf0-FuNkGfFQ
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=true

ENV KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=opentelemetry-demo-kafka -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099"
ENV JMX_PORT=1099
9 changes: 9 additions & 0 deletions src/orderproducerservice/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# https://help.github.com/articles/dealing-with-line-endings/
#
# Linux start script should use lf
/gradlew text eol=lf

# These are Windows script files and should use crlf
*.bat text eol=crlf

5 changes: 5 additions & 0 deletions src/orderproducerservice/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Ignore Gradle project-specific cache directory
.gradle

# Ignore Gradle build output directory
build
15 changes: 15 additions & 0 deletions src/orderproducerservice/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM openjdk:17-buster

RUN apt-get update -y; apt-get install curl -y

WORKDIR /home/otel
RUN curl -Lo opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
COPY ./src/orderproducerservice/ orderproducerservice/
WORKDIR /home/otel/orderproducerservice
COPY ./pb/ ./app/src/main/proto/
RUN ./gradlew shadowJar

ENV JAVA_TOOL_OPTIONS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=opentelemetry-demo-orderproducer -Dcom.sun.management.jmxremote.port=1097 -Dcom.sun.management.jmxremote.rmi.port=1097"
ENV JMX_PORT=1097

ENTRYPOINT ["java","-javaagent:../opentelemetry-javaagent.jar", "-jar" , "app/build/libs/app-all.jar"]
90 changes: 90 additions & 0 deletions src/orderproducerservice/app/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java application project to get you started.
* For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.2.1/userguide/building_java_projects.html in the Gradle documentation.
*/

plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
id 'com.github.johnrengelman.shadow' version '7.0.0'
id "com.google.protobuf" version "0.9.4"
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}

shadowJar {
mergeServiceFiles()
}

ext {
grpcVersion = "1.56.1"
protobufVersion = "3.23.4"
}

dependencies {
// Use JUnit test framework.
testImplementation 'junit:junit:4.13.2'

// This dependency is used by the application.
implementation 'com.google.guava:guava:31.1-jre'

implementation 'org.apache.kafka:kafka_2.13:3.5.1'

implementation 'org.apache.kafka:kafka-clients:2.3.0'

implementation("com.google.api.grpc:proto-google-common-protos:2.22.1")

implementation("io.grpc:grpc-protobuf:${grpcVersion}")

implementation "com.google.protobuf:protobuf-java:${protobufVersion}"

implementation "io.grpc:grpc-stub:${grpcVersion}" // Add this line

implementation("javax.annotation:javax.annotation-api:1.3.2")

implementation 'org.slf4j:slf4j-api:2.0.7'

implementation 'org.slf4j:slf4j-simple:2.0.7'
}

// Apply a specific Java toolchain to ease working on different environments.
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}

application {
// Define the main class for the application.
mainClass = 'orderproducerservice.App'
}

jar {
manifest {
attributes 'Main-Class': 'orderproducerservice.App'
}
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generateProtoTasks {
ofSourceSet("main").forEach {
it.plugins {
grpc {}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package orderproducerservice;

import java.util.Properties;
import java.util.Random;
import java.lang.Thread;
import java.lang.InterruptedException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;

import oteldemo.Demo.*;

public class App {
private static Logger logger = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {

String kafkaAddr = System.getenv("KAFKA_SERVICE_ADDR");
if (kafkaAddr != null) {
logger.info("Using Kafka Broker Address: " + kafkaAddr);
} else {
throw new RuntimeException("Environment variable KAFKA_SERVICE_ADDR is not set.");
}

Properties props = new Properties();
props.put("bootstrap.servers", kafkaAddr);
props.put("acks", "all");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

Producer<String, byte[]> producer = new KafkaProducer<>(props);

while (true) {
Random random = new Random();
try {
String orderId = "ORDER-" + random.nextInt(1000);
String shippingTrackingId = "TRACK-" + random.nextInt(1000);
double shippingCost = 10 + random.nextDouble() * 90;
OrderResult orderResult = OrderResult.newBuilder()
.setOrderId(orderId)
.setShippingTrackingId(shippingTrackingId)
.setShippingCost(Money.newBuilder()
.setCurrencyCode("EUR")
.setUnits((long) shippingCost)
.setNanos((int) ((shippingCost - (long) shippingCost) * 1e9))
.build())
.setShippingAddress(Address.newBuilder()
.setStreetAddress("21 rue Chateaudun")
.setCity("Paris")
.setState("Paris")
.setCountry("France")
.setZipCode("75009")
.build())
.build();
ProducerRecord<String, byte[]> record = new ProducerRecord<>("orders", null, orderResult.toByteArray());
producer.send(record);
} catch (Exception e) {
logger.error("Unable to send record: ", e);
} finally {
logger.info("Message sent successfully!");
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
logger.error("Unable to sleep: ", e);
}
// producer is never closed because of while loop above. Leaving in case the code changes, to not forget to close producer.
// producer.close();
}
}
}
Loading

0 comments on commit e8318c8

Please sign in to comment.