Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding projects created for content about How to ingest data to Elasticsearch through Apache Kafka #353

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/docker/elasticsearch/
/elasticsearch/
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Data Ingestion with Apache Kafka and Elasticsearch

This project demonstrates a data ingestion pipeline using **Apache Kafka** and **Elasticsearch** with **Python**. Messages are produced and consumed through Kafka, indexed in Elasticsearch, and visualized in Kibana.

## Project Structure

The infrastructure is managed with **Docker Compose**, which starts the following services:

- **Zookeeper**: Manages and coordinates the Kafka brokers.
- **Kafka**: Responsible for distributing and storing messages.
- **Elasticsearch**: Stores and indexes the messages for analysis.
- **Kibana**: Visualization interface for data stored in Elasticsearch.

The **Producer** code sends messages to Kafka, while the **Consumer** reads and indexes these messages in Elasticsearch.

---

## Prerequisites

- **Docker and Docker Compose**: Ensure you have Docker and Docker Compose installed on your machine.
- **Python 3.x**: To run the Producer and Consumer scripts.

---

## Configure the Producer and Consumer

### Producer
The producer.py sends messages to the logs topic in Kafka in batches.
It uses the batch_size and linger_ms settings to optimize message sending.
````
python producer.py
````

### Consumer
The consumer.py reads messages from the logs topic and indexes them in Elasticsearch. It consumes messages in batches and automatically commits the processing of messages.

````
python consumer.py
````

## Data Verification in Kibana
After running the producer.py and consumer.py scripts, access Kibana at http://localhost:5601 to visualize the indexed data. Messages sent by the producer and processed by the consumer will be in the Elasticsearch index.

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
version: "3"

services:

zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.0.0
container_name: kafka-connect
platform: linux/amd64
depends_on:
- zookeeper
- kafka
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
volumes:
- $PWD/data:/data
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:10.0.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1
container_name: elasticsearch-8.15.1
environment:
- node.name=elasticsearch
- xpack.security.enabled=false
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- ./elasticsearch:/usr/share/elasticsearch/data
ports:
- 9200:9200

kibana:
image: docker.elastic.co/kibana/kibana:8.15.1
container_name: kibana-8.15.1
ports:
- 5601:5601
environment:
ELASTICSEARCH_URL: http://elasticsearch:9200
ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name=elasticsearch-sink-connector
topics=logs
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=http://localhost:9200
type.name=_doc
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true
key.ignore=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch, helpers
from datetime import datetime
import json

es = Elasticsearch(["http://localhost:9200"])

consumer = KafkaConsumer(
"logs", # Topic name
bootstrap_servers=["localhost:9092"],
auto_offset_reset="latest", # Ensures reading from the latest offset if the group has no offset stored
enable_auto_commit=True, # Automatically commits the offset after processing
group_id="log_consumer_group", # Specifies the consumer group to manage offset tracking
max_poll_records=10, # Maximum number of messages per batch
fetch_max_wait_ms=2000, # Maximum wait time to form a batch (in ms)
)


def create_bulk_actions(logs):
for log in logs:
yield {
"_index": "logs",
"_source": {
"level": log["level"],
"message": log["message"],
"timestamp": log["timestamp"],
},
}


if __name__ == "__main__":
try:
print("Starting message consumption...")
while True:

messages = consumer.poll(timeout_ms=1000)

# process each batch messages
for _, records in messages.items():
logs = [json.loads(record.value) for record in records]
# print(logs)
bulk_actions = create_bulk_actions(logs)
response = helpers.bulk(es, bulk_actions)
print(f"Indexed {response[0]} logs.")
except Exception as e:
print(f"Error: {e}")
finally:
consumer.close()
print(f"Finish")
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime

from kafka import KafkaProducer
import json
import time
import logging
import random

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("log_producer")

producer = KafkaProducer(
bootstrap_servers=["localhost:9092"], # Specifies the Kafka server to connect
value_serializer=lambda x: json.dumps(x).encode(
"utf-8"
), # Serializes data as JSON and encodes it to UTF-8 before sending
batch_size=16384, # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending
linger_ms=10, # Sets the maximum delay (in milliseconds) before sending the batch
acks="all", # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
)


def generate_log_message():

diff_seconds = random.uniform(300, 600)
timestamp = time.time() - diff_seconds

log_messages = {
"INFO": [
"User login successful",
"Database connection established",
"Service started",
"Payment processed",
],
"WARNING": ["Service stopped", "Payment may not have been processed"],
"ERROR": ["User login failed", "Database connection failed", "Payment failed"],
"DEBUG": ["Debugging user login flow", "Debugging database connection"],
}

level = random.choice(list(log_messages.keys()))

message = random.choice(log_messages[level])

log_entry = {"level": level, "message": message, "timestamp": timestamp}

return log_entry


def send_log_batches(topic, num_batches=5, batch_size=10):
for i in range(num_batches):
logger.info(f"Sending batch {i + 1}/{num_batches}")
for _ in range(batch_size):
log_message = generate_log_message()
producer.send(topic, value=log_message)
producer.flush()
time.sleep(1)


if __name__ == "__main__":
topic = "logs"
send_log_batches(topic)
producer.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kafka-python==2.0.2
elasticsearch==7.10.0
Loading