Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka/{zookeeper,producer,consumer} #16

Merged
merged 3 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,19 @@ build-ci-image-kafka:
DOCKERFILE: src/kafka/Dockerfile
IMAGE_TAG_SUFFIX: kafka
CONTEXT: .
build-ci-image-orderproducer:
!!merge <<: *build-ci-image
variables:
DOCKERFILE: src/orderproducerservice/Dockerfile
IMAGE_TAG_SUFFIX: orderproducer
CONTEXT: .
.prod-deploy: &prod-deploy
stage: prod-deploy
tags: ["runner:docker", "size:large"]
image: $CI_IMAGE
rules:
- if: '$CI_COMMIT_REF_NAME == "prod"'
- if: '$CI_COMMIT_REF_NAME =~ /-staging$/'
script:
# # For debugging
#- aws sts get-caller-identity
Expand Down
7 changes: 7 additions & 0 deletions ci/datadog-agent-values.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
agents:
image:
tagSuffix: jmx
datadog:
logs:
enabled: true
tags:
- "env:otel-ingest"
otlp:
receiver:
protocols:
Expand Down
8 changes: 7 additions & 1 deletion ci/scripts/ci-deploy-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ install_demo() {
# Set the namespace and release name
release_name="opentelemetry-demo"

# Deploy zookeeper which is not a default component.
kubectl apply -f ./src/zookeeperservice/deployment.yaml -n "${namespace}"

# if repo already exists, helm 3+ will skip
helm --debug repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts

# --install will run `helm install` if not already present.
helm --debug upgrade "${release_name}" -n "${namespace}" open-telemetry/opentelemetry-demo --install \
-f ./ci/values.yaml \
--set-string default.image.tag="v$CI_COMMIT_SHORT_SHA"


# Deploy java order producer which is not a default component.
sed -i "s/PLACEHOLDER_COMMIT_SHA/v$CI_COMMIT_SHORT_SHA/g" ./src/orderproducerservice/deployment.yaml
kubectl apply -f ./src/orderproducerservice/deployment.yaml -n "${namespace}"
}

###########################################################################################################
Expand Down
70 changes: 69 additions & 1 deletion ci/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ default:
value: >-
service.name=$(OTEL_SERVICE_NAME),
service.instance.id=$(OTEL_K8S_POD_UID),
ervice.namespace=opentelemetry-demo,
service.namespace=opentelemetry-demo,
k8s.namespace.name=$(OTEL_K8S_NAMESPACE),
k8s.node.name=$(OTEL_K8S_NODE_NAME),
k8s.pod.name=$(OTEL_K8S_POD_NAME),
Expand Down Expand Up @@ -81,6 +81,43 @@ components:
resources:
limits:
memory: 200Mi
ports:
- name: jmx-consumer
value: 1098
env:
- name: KAFKA_SERVICE_ADDR
value: 'opentelemetry-demo-kafka:9092'
- name: OTEL_RESOURCE_ATTRIBUTES
value: >-
service.name=$(OTEL_SERVICE_NAME),
service.instance.id=$(OTEL_K8S_POD_UID),
service.namespace=opentelemetry-demo,
k8s.namespace.name=$(OTEL_K8S_NAMESPACE),
k8s.node.name=$(OTEL_K8S_NODE_NAME),
k8s.pod.name=$(OTEL_K8S_POD_NAME),
deployment.environment=$(OTEL_K8S_NAMESPACE),
k8s.pod.ip=$(POD_IP),
kafka_source=javainstrumentation
podAnnotations:
ad.datadoghq.com/frauddetectionservice.logs: '[{"source":"kafka","service":"frauddetectionservice"}]'
ad.datadoghq.com/frauddetectionservice.checks: |
{
"kafka": {
"init_config": {
"is_jmx": true,
"collect_default_metrics": true
},
"instances": [
{
"host": "opentelemetry-demo-frauddetectionservice",
"port": "1098",
"tags": [
"kafka_source:agent_consumer"
]
}
]
}
}
frontend:
resources:
limits:
Expand Down Expand Up @@ -129,6 +166,37 @@ components:
resources:
limits:
memory: 1Gi
ports:
- name: jmx
value: 1099
- name: plaintext
value: 9092
- name: controller
value: 9093
initContainers:
- name: wait-for-zookeeper
image: busybox:latest
command: ['sh', '-c', 'until nc -z -v -w30 opentelemetry-demo-zookeeper 2181; do echo waiting for zookeeper; sleep 2; done;']
podAnnotations:
ad.datadoghq.com/kafka.logs: '[{"source":"kafka","service":"kafka"}]'
ad.datadoghq.com/kafka.checks: |
{
"kafka": {
"init_config": {
"is_jmx": true,
"collect_default_metrics": true
},
"instances": [
{
"host": "opentelemetry-demo-kafka",
"port": "1099",
"tags": [
"kafka_source:agent"
]
}
]
}
}
serviceAccount:
create: false
opentelemetry-collector:
Expand Down
3 changes: 2 additions & 1 deletion src/frauddetectionservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ WORKDIR /usr/src/app/
COPY --from=builder /usr/src/app/build/libs/frauddetectionservice-1.0-all.jar ./
ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v$version/opentelemetry-javaagent.jar /app/opentelemetry-javaagent.jar
# RUN chmod 644 /app/opentelemetry-javaagent.jar
ENV JAVA_TOOL_OPTIONS=-javaagent:/app/opentelemetry-javaagent.jar
ENV JAVA_TOOL_OPTIONS="-javaagent:/app/opentelemetry-javaagent.jar -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=opentelemetry-demo-frauddetectionservice -Dcom.sun.management.jmxremote.port=1098 -Dcom.sun.management.jmxremote.rmi.port=1098"
ENV JMX_PORT=1098

ENTRYPOINT [ "java", "-jar", "frauddetectionservice-1.0-all.jar" ]
21 changes: 8 additions & 13 deletions src/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,18 @@ FROM registry.ddbuild.io/images/mirror/confluentinc/cp-kafka:7.4.0

USER root
ARG version=1.23.0
ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v$version/opentelemetry-javaagent.jar /tmp/opentelemetry-javaagent.jar
RUN chmod go+r /tmp/opentelemetry-javaagent.jar

USER appuser

ENV KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
ENV KAFKA_CONTROLLER_QUORUM_VOTERS='[email protected]:9093'
ENV KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
ENV KAFKA_BROKER_ID=1
ENV KAFKA_ZOOKEEPER_CONNECT='opentelemetry-demo-zookeeper:2181'
ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
ENV KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
ENV KAFKA_PROCESS_ROLES=controller,broker
ENV KAFKA_NODE_ID=1
ENV KAFKA_METADATA_LOG_SEGMENT_MS=15000
ENV KAFKA_METADATA_MAX_RETENTION_MS=60000
ENV KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS=2800
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
ENV KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092
ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ENV KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
ENV KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
ENV KAFKA_OPTS="-javaagent:/tmp/opentelemetry-javaagent.jar -Dotel.jmx.target.system=kafka-broker"
ENV CLUSTER_ID=ckjPoprWQzOf0-FuNkGfFQ
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=true

ENV KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=opentelemetry-demo-kafka -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099"
ENV JMX_PORT=1099
9 changes: 9 additions & 0 deletions src/orderproducerservice/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# https://help.github.com/articles/dealing-with-line-endings/
#
# Linux start script should use lf
/gradlew text eol=lf

# These are Windows script files and should use crlf
*.bat text eol=crlf

5 changes: 5 additions & 0 deletions src/orderproducerservice/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Ignore Gradle project-specific cache directory
.gradle

# Ignore Gradle build output directory
build
15 changes: 15 additions & 0 deletions src/orderproducerservice/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM openjdk:17-buster

RUN apt-get update -y; apt-get install curl -y

WORKDIR /home/otel
RUN curl -Lo opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
COPY ./src/orderproducerservice/ orderproducerservice/
WORKDIR /home/otel/orderproducerservice
COPY ./pb/ ./app/src/main/proto/
RUN ./gradlew shadowJar

ENV JAVA_TOOL_OPTIONS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=opentelemetry-demo-orderproducer -Dcom.sun.management.jmxremote.port=1097 -Dcom.sun.management.jmxremote.rmi.port=1097"
ENV JMX_PORT=1097

ENTRYPOINT ["java","-javaagent:../opentelemetry-javaagent.jar", "-jar" , "app/build/libs/app-all.jar"]
90 changes: 90 additions & 0 deletions src/orderproducerservice/app/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java application project to get you started.
* For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.2.1/userguide/building_java_projects.html in the Gradle documentation.
*/

plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
id 'com.github.johnrengelman.shadow' version '7.0.0'
id "com.google.protobuf" version "0.9.4"
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}

shadowJar {
mergeServiceFiles()
}

ext {
grpcVersion = "1.56.1"
protobufVersion = "3.23.4"
}

dependencies {
// Use JUnit test framework.
testImplementation 'junit:junit:4.13.2'

// This dependency is used by the application.
implementation 'com.google.guava:guava:31.1-jre'

implementation 'org.apache.kafka:kafka_2.13:3.5.1'

implementation 'org.apache.kafka:kafka-clients:2.3.0'

implementation("com.google.api.grpc:proto-google-common-protos:2.22.1")

implementation("io.grpc:grpc-protobuf:${grpcVersion}")

implementation "com.google.protobuf:protobuf-java:${protobufVersion}"

implementation "io.grpc:grpc-stub:${grpcVersion}" // Add this line

implementation("javax.annotation:javax.annotation-api:1.3.2")

implementation 'org.slf4j:slf4j-api:2.0.7'

implementation 'org.slf4j:slf4j-simple:2.0.7'
}

// Apply a specific Java toolchain to ease working on different environments.
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}

application {
// Define the main class for the application.
mainClass = 'orderproducerservice.App'
}

jar {
manifest {
attributes 'Main-Class': 'orderproducerservice.App'
}
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generateProtoTasks {
ofSourceSet("main").forEach {
it.plugins {
grpc {}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package orderproducerservice;

import java.util.Properties;
import java.util.Random;
import java.lang.Thread;
import java.lang.InterruptedException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;

import oteldemo.Demo.*;

public class App {
private static Logger logger = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {

String kafkaAddr = System.getenv("KAFKA_SERVICE_ADDR");
if (kafkaAddr != null) {
logger.info("Using Kafka Broker Address: " + kafkaAddr);
} else {
throw new RuntimeException("Environment variable KAFKA_SERVICE_ADDR is not set.");
}

Properties props = new Properties();
props.put("bootstrap.servers", kafkaAddr);
props.put("acks", "all");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

Producer<String, byte[]> producer = new KafkaProducer<>(props);

while (true) {
Random random = new Random();
try {
String orderId = "ORDER-" + random.nextInt(1000);
String shippingTrackingId = "TRACK-" + random.nextInt(1000);
double shippingCost = 10 + random.nextDouble() * 90;
OrderResult orderResult = OrderResult.newBuilder()
.setOrderId(orderId)
.setShippingTrackingId(shippingTrackingId)
.setShippingCost(Money.newBuilder()
.setCurrencyCode("EUR")
.setUnits((long) shippingCost)
.setNanos((int) ((shippingCost - (long) shippingCost) * 1e9))
.build())
.setShippingAddress(Address.newBuilder()
.setStreetAddress("21 rue Chateaudun")
.setCity("Paris")
.setState("Paris")
.setCountry("France")
.setZipCode("75009")
.build())
.build();
ProducerRecord<String, byte[]> record = new ProducerRecord<>("orders", null, orderResult.toByteArray());
producer.send(record);
} catch (Exception e) {
logger.error("Unable to send record: ", e);
} finally {
logger.info("Message sent successfully!");
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
logger.error("Unable to sleep: ", e);
}
// producer is never closed because of while loop above. Leaving in case the code changes, to not forget to close producer.
// producer.close();
}
}
}
Loading