A proxy that allows applications to use Kafka via HTTP... written in NodeJS.
Native clients for Kafka are tricky. The protocol is moving fast and there is a lot to manage. This project gives applications a bridge so that languages that aren't "supported" can still use Kafka.
The inspiration for this project came from the Confluent REST Proxy. We found that project to be too heavy for what we wanted and introduce a lot of lag between messages going from producer to consumer. We have tried to keep the API consistent with that so clients already using it can easily switch over, but wanted to simplify as much as possible.
Get the source...
git clone https://github.com/hoppity/kafka-http-proxy
npm install
export ZOOKEEPER_CONNECT=127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181
node server.js
Or using docker...
docker run
-p 8085:8085 `
-e "ZOOKEEPER_CONNECT=127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181" `
--name kafka-http-proxy
hoppity/kafka-http-proxy
Most configuration is contained in config/default.json
.
kafka
zkConnect
- the Zookeeper connection stringclientId
- the client identifier for the applicationlogging
logName
- ?level
- the level of logging to outputaccessLogPath
- the path to an Apache standard compliant access logport
- the port to expose the application onconsumer
timeoutMs
- the number of milliseconds to wait between polls before a consumer is deemed inactive and timed-out.
This can be changed or overriden by placing values in another file in the config
directory and setting the NODE_ENV
variable (e.g. production.json
and export NODE_ENV=production
).
TODO:
Write about standard errors.
Creates a topic with the name specified using the default topic creation settings. This will fail if automatic topic creation is disabled in Kafka.
Input
- params
topic
- the name of the topic to create
Output
- JSON Response
message
- "All created"
E.g.
$ curl -X PUT http://localhost:8085/topics/test-topic
{"message":"All created"}
Publishes a message to the specified topic.
Input
- params
topic
- the name of the topic to publish to- JSON body
records
- array of messages to publishkey
value
partition
E.g.
$ curl -X POST \
-H 'Content-Type: application/kafka.binary.v1+json' \
-d '{"records":[{"value":"1234"}]}' \
http://localhost:8085/topics/test-topic
{"offsets":[{"partition":"0","offset":0}]}
Create a consumer with the specified group name.
Input
- params
group
- the name of the consumer group- JSON body
auto.offset.reset
- the point to automatically reset the offset to if the consumer group does not exist. Options: smallest, largest. Default: largestauto.commit.enable
- enable/disable the consumer to autocommit on the server side. When true, the server will commit the offset after every get request. When false, you must commit manually (as below). Default: true.value.encode
- enable/disable base64 encoded messages to be returned from the consumer. Default: truerequest.max.messages
- the number of messages to return in a single get request. Configurable in config.consumer.requestMaxMessages. Default: 100.
Output
- JSON response
instance_id
- the identifier of the consumer instancebase_uri
- the base URI for consumers to use when polling for messages
E.g.
$ curl -X POST http://localhost:8085/consumers/test-group
{
"instance_id":"3bae5d25-b0bc-4e4a-b937-bdc75ceece39",
"base_uri":"http://localhost:8085/consumers/test-group/instances/3bae5d25-b0bc-4e4a-b937-bdc75ceece39"
}
Consumes messages from the topic on the specified consumer. The URI should be formed by concatenating the base_uri returned when creating the consumer and /topics/{topic}
.
Input
- params
group
- the name of the consumer groupinstance_id
- the consumer instance idtopic
- the name of the topic to consume
Output
- JSON response
- Array of message
topic
partition
offset
key
value
E.g.
$ curl http://localhost:8085/consume/test-group/instances/3bae5d25-b0bc-4e4a-b937-bdc75ceece39
[
{
"topic":"test-topic",
"partition":0,
"offset":0,
"key":"",
"value":"1234"
}
]
Commit the current offset of the consumer instance. The URI should be formed by concatenating the base_uri returned when creating the consumer and /offsets
.
Input
- params
group
- the name of the consumer groupinstance_id
- the consumer instance id
Output
- JSON Response
- Empty array
E.g.
$ curl -X POST http://localhost:8085/consume/test-group/instances/3bae5d25-b0bc-4e4a-b937-bdc75ceece39/offsets
[]
Shut down a consumer instance. The URI should the base_uri returned when creating the consumer.
Input
- params
group
- the name of the consumer groupinstance_id
- the consumer instance id
Output
- JSON Response
- Empty object
E.g.
$ curl -X DELETE http://localhost:8085/consume/test-group/instances/3bae5d25-b0bc-4e4a-b937-bdc75ceece39
{}