Skip to content

ntent/kafka4net

Repository files navigation

NuGet version kafka4net: kafka-0.8 client

Install:

Install-Package kafka4net

##Features:

  • Event-driven architecture, all asynchronous
  • Automatic following changes of Leader Partition in case of broker failure
  • Connection sharing: one connection per kafka broker is used
  • Flow control: slow consumer will suspend fetching and prevent memory exhausting
  • Integration tests are part of the codebase. Use Vagrant to provision 1 zookeeper and 3 kafka virtual servers
  • Use RxExtensions library to expose API and for internal implementation
  • Support compression (gzip, lz4, snappy). Unit-tested to be interoperable with Java implementation

##Not implemented:

  • Offset Fetch/Commit API
  • Only protocol for 0.8 (aka v0) is implemented at the moment

Documentation

Usage

Consumer

using System;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using kafka4net;
using kafka4net.ConsumerImpl;

namespace examples
{
    public static class ConsumerExample
    {
        // Notice, unlike java driver, port is not mandatory and will resolve to default value 9092.
        // Explicit ports are allowed though, for example "kafka1:9092, kafka2:9093, kafka3:9094"
        readonly static string _connectionString = "kafka1, kafka2, kafka3";
        readonly static string _topic = "some.topic";

        /// <summary>Simplest consumer with termination example</summary>
        public static async Task TakeForOneMinute()
        {
            // If you want to consume all messages in the topic, use TopicPositionFactory.Start
            // TopicPositionFactory.End will start waiting for new messages starting from the moment of subscription
            var consumer = new Consumer(new ConsumerConfiguration(_connectionString, _topic, new StartPositionTopicEnd()));

            consumer.OnMessageArrived.Subscribe(msg => {
                // Perform your own deserialization here
                var text = Encoding.UTF8.GetString(msg.Value);
                Console.WriteLine($"Got message: '{text}' Partition: {msg.Partition} Offset: {msg.Offset} Lag: {msg.HighWaterMarkOffset - msg.Offset}");
            });

            // Connecting starts when subscribing to OnMessageArrived. If you need to know when connection is actually one, wait for IsConnected task completion
            await consumer.IsConnected;
            Console.WriteLine("Connected");

            // Consume for one minute
            await Task.Delay(TimeSpan.FromMinutes(1));

            await consumer.CloseAsync();
            Console.WriteLine("Closed");
        }
    }
}

Consumer from multiple partitions

using System;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using kafka4net;
using kafka4net.ConsumerImpl;

namespace examples
{
    public static class ConsumerExample
    {
        // Notice, unlike java driver, port is not mandatory and will resolve to default value 9092.
        // Explicit ports are allowed though, for example "kafka1:9092, kafka2:9093, kafka3:9094"
        readonly static string _connectionString = "kafka1, kafka2, kafka3";
        readonly static string _topic = "some.topic";

        public static async Task SubscribeToMultipleTopics()
        {
            var rx = new Regex("some\\.topic\\..+");
            var cluster = new Cluster(_connectionString);
            await cluster.ConnectAsync();
            var allTopics = await cluster.GetAllTopicsAsync();
            var wantedTopics = allTopics.Where(topic => rx.IsMatch(topic)).ToArray();

            var consumers = wantedTopics.Select(topic => new Consumer(new ConsumerConfiguration(_connectionString, topic, new StartPositionTopicEnd()))).ToArray();
            consumers.AsParallel().ForAll(consumer => consumer.OnMessageArrived.Subscribe(msg => {
                var text = Encoding.UTF8.GetString(msg.Value);
                Console.WriteLine($"Got message '{text}' from topic '{msg.Topic}' partition {msg.Partition} offset {msg.Offset}");
            }));

            await Task.Delay(TimeSpan.FromMinutes(1));

            await Task.WhenAll(consumers.Select(consumer => consumer.CloseAsync()));
        }

    }
}

Producer

using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using kafka4net;

namespace examples
{
    public static class ProducerExample
    {
        // Notice, unlike java driver, port is not mandatory and will resolve to default value 9092.
        // Explicit ports are allowed though, for example "kafka1:9092, kafka2:9093, kafka3:9094"
        readonly static string _connectionString = "kafka1, kafka2, kafka3";
        readonly static string _topic = "some.topic";

        public async static Task Produce100RandomMessages()
        {
            var rnd = new Random();
            var randomNumbers = Enumerable.Range(1, 100).Select(_ => rnd.Next().ToString());
            var producer = new Producer(_connectionString, new ProducerConfiguration(_topic));

            // Technically not mandatory, but good idea to listen to possible errors and act accordingly to your application requirements
            producer.OnPermError += (exception, messages) => Console.WriteLine($"Failed to write {messages.Length} because of {exception.Message}");

            // When message is confirmed by kafka broker to be persisted, OnSuccess is called. Can be used if upstream requires acknowlegement for extra reliability.
            producer.OnSuccess += messages => Console.WriteLine($"Sent {messages.Length} messages");

            await producer.ConnectAsync();

            foreach(var str in randomNumbers)
            {
                // Message.Key is optional. If not set, then driver will partition messages at random. In this case, for sake of example, partition by 1st character of the string
                var key = BitConverter.GetBytes(str[0]);
                // Implement your own serialization here.
                var msg = new Message { Value = Encoding.UTF8.GetBytes(str),  Key = key};
                producer.Send(msg);
            }

            // Await for every producer to complete. Note, that calling CloseAsync is safe: all bessages currently in buffers will be awaited until flushed.
            await producer.CloseAsync(TimeSpan.FromMinutes(1));
        }
    }
}

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages