Confluent.RestClient is a .NET client libaray for Confluent.io REST API
REST API documentation: http://confluent.io/docs/current/kafka-rest/docs/index.html
Confluent client settings can be implemented by extending IConfluentClientSettings
, for example
public class MyConfluentClientSettings : IConfluentClientSettings
{
public string KafkaBaseUrl
{
get
{
return ConfigurationManager.AppSettings["Confluent.KafkaBaseUrl"];
}
}
public TimeSpan RequestTimeout
{
get
{
return TimeSpan.Parse(ConfigurationManager.AppSettings["Confluent.RequestTimeout"]);
}
}
}
You can check out all the client operartions in the Confluent.TestHarness
app. They are very easy to use, for example:
// Contract to publish as Avro data
[DataContract]
public class Person
{
[DataMember]
public string Name { get; set; }
[DataMember]
public int Age { get; set; }
}
IConfluentClient client = new ConfluentClient(settings);
var records = new[]
{
new AvroRecord<string, Person>
{
PartitionId = Convert.ToInt32(0),
Value = new Person { Name = Guid.NewGuid().ToString("N"), Age = 25 }
},
new AvroRecord<string, Person>
{
Value = new Person { Name = Guid.NewGuid().ToString("N"), Age = 26 }
}
};
var recordSet = new AvroRecordSet<string, Person>(records)
{
//Creating schema using "Microsoft.Hadoop.Avro" - https://www.nuget.org/packages/Microsoft.Hadoop.Avro/
ValueSchema = AvroSerializer.Create<Person>().ReaderSchema.ToString()
};
await client.PublishAsAvroAsync("TestTopic", recordSet);
IConfluentClient client = new ConfluentClient(settings);
var request = new CreateConsumerRequest
{
// Confluent API will create a new InstanceId if not supplied
InstanceId = "TestConsumerInstance",
MessageFormat = MessageFormat.Avro
};
ConfluentResponse<ConsumerInstance> response = await client.CreateConsumerAsync("TestConsumerGroup", request);
ConsumerInstance consumerInstance = response.Payload;
IConfluentClient client = new ConfluentClient(settings);
ConfluentResponse<List<AvroMessage<string, Person>>> response
= await client.ConsumeAsAvroAsync<string, Person>(consumerInstance, "TestTopic");
foreach (AvroMessage<string, Person> message in response.Payload)
{
Person person = message.Value;
Console.WriteLine("Name: {0}, Age: {1}", person.Name, person.Age);
}
await client.CommitOffsetAsync(consumerInstance);