Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 396 ListOffsets #2086

Merged
merged 13 commits into from
Oct 19, 2023
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
## Enhancements

- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484): Added support for ListOffsets Admin API (#2086).
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).


# 2.2.0
Expand Down
78 changes: 59 additions & 19 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,59 +242,85 @@ static List<UserScramCredentialAlteration> ParseUserScramCredentialAlterations(
return alterations;
}

static List<TopicPartitionOffsetSpec> ParseTopicPartitionOffsetSpecs(string[] args)
static Tuple<IsolationLevel, List<TopicPartitionOffsetSpec>> ParseListOffsetsArgs(string[] args)
{
if (args.Length == 0)
{
Console.WriteLine("usage: .. <bootstrapServers> list-offsets " +
"<topic> <partition> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
Console.WriteLine("usage: .. <bootstrapServers> list-offsets <isolation_level> " +
"<topic1> <partition1> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
Environment.ExitCode = 1;
return null;
}

var isolationLevel = Enum.Parse<IsolationLevel>(args[0]);
var topicPartitionOffsetSpecs = new List<TopicPartitionOffsetSpec>();
for (int i = 0; i < args.Length;) {
for (int i = 1; i < args.Length;)
{
if (args.Length < i+3)
{
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
}

string topic = args[i];
var partition = Int32.Parse(args[i + 1]);
var offsetSpec = args[i + 2];
if (offsetSpec == "TIMESTAMP")
{
if (args.Length < i+4)
{
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
}

var timestamp = Int64.Parse(args[i + 3]);
i = i + 1;
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.ForTimestamp(timestamp)
});
}
else if (offsetSpec == "MAXTIMESTAMP")
else if (offsetSpec == "MAX_TIMESTAMP")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.MaxTimestamp()
});
}
else if (offsetSpec == "EARLIEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Earliest()
});
}
else if (offsetSpec == "LATEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Latest()
});
}
else
{
throw new ArgumentException(
"offsetSpec can be EARLIEST, LATEST, MAXTIMESTAMP or TIMESTAMP T1.");
"offsetSpec can be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP T1.");
}
i = i + 3;
}
return topicPartitionOffsetSpecs;
return Tuple.Create(isolationLevel, topicPartitionOffsetSpecs);
}

static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
{
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
{
Console.WriteLine(" ListOffsetsResultInfo:");
Console.WriteLine($" TopicPartitionOffsetError: {listOffsetsResultInfo.TopicPartitionOffsetError}");
Console.WriteLine($" Timestamp: {listOffsetsResultInfo.Timestamp}");
}
}

static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
Expand Down Expand Up @@ -850,21 +876,35 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,

static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) {

var topicPartitionOffsetSpecs = ParseTopicPartitionOffsetSpecs(commandArgs);
var listOffsetsArgs = ParseListOffsetsArgs(commandArgs);
if (listOffsetsArgs == null) { return; }

var isolationLevel = listOffsetsArgs.Item1;
var topicPartitionOffsets = listOffsetsArgs.Item2;

var timeout = TimeSpan.FromSeconds(30);
ListOffsetsOptions options = new ListOffsetsOptions(){RequestTimeout = timeout, IsolationLevel = Confluent.Kafka.Admin.IsolationLevel.ReadUncommitted};
ListOffsetsOptions options = new ListOffsetsOptions(){ RequestTimeout = timeout, IsolationLevel = isolationLevel };

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
var ListOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsetSpecs, options);
foreach(var ListOffsetsResultInfo in ListOffsetsResult.ListOffsetsResultInfos)
try
{
TopicPartitionOffsetError topicPartition = ListOffsetsResultInfo.TopicPartitionOffsetError;
long Timestamp = ListOffsetsResultInfo.Timestamp;
Console.WriteLine($"{topicPartition.Topic} ${topicPartition.Partition} ${topicPartition.Error.Code} ${topicPartition.Offset} ${Timestamp}");
var listOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsets, options);
Console.WriteLine("ListOffsetsResult:");
PrintListOffsetsResultInfos(listOffsetsResult.ListOffsetsResultInfos);
}
catch (ListOffsetsException e)
{
Console.WriteLine("ListOffsetsReport:");
Console.WriteLine($" Error: {e.Error}");
PrintListOffsetsResultInfos(e.Result.ListOffsetsResultInfos);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred listing offsets: {e}");
Environment.ExitCode = 1;
}
}

}
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
Expand Down
36 changes: 0 additions & 36 deletions src/Confluent.Kafka/Admin/IsolationLevel.cs

This file was deleted.

4 changes: 2 additions & 2 deletions src/Confluent.Kafka/Admin/ListOffsetsException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ListOffsetsException : KafkaException
/// (whether or not they were in error). At least one of these
/// topic partiton in result will be in error.
/// </param>
public ListOffsetsException(ListOffsetsResult result)
public ListOffsetsException(ListOffsetsReport result)
: base(new Error(ErrorCode.Local_Partial,
"An error occurred in list offsets, check individual topic partiton in result."))
{
Expand All @@ -43,6 +43,6 @@ public ListOffsetsException(ListOffsetsResult result)
/// (whether or not they were in error). At least one of these
/// results will be in error.
/// </summary>
public ListOffsetsResult Result { get; }
public ListOffsetsReport Result { get; }
}
}
27 changes: 16 additions & 11 deletions src/Confluent.Kafka/Admin/ListOffsetsReport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,41 @@
//
// Refer to LICENSE for more information.
using System.Collections.Generic;
using System.Text;
using System.Linq;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offsets operation.
/// Represents an error that occurred during a ListOffsets request.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of ListOffsets request (including error status).

/// </summary>
public class ListOffsetsReport
{
/// <summary>
/// ListOffsetsResultInfo Elements for all the TopicPartitions queried
/// for ListOffsets.
/// Result information for all the partitions queried
/// with ListOffsets. At least one of these
/// results will be in error.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }

/// <summary>
/// List of non-client level errors encountered while listing offsets.
/// Operation error status, null if successful.
/// </summary>
public Error Error { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
string res = "ListOffsetsReport :\n";
foreach (var listoffsetsresultinfo in ListOffsetsResultInfos)
{
res += listoffsetsresultinfo.ToString();
}
return res;
var result = new StringBuilder();
result.Append($"{{\"ListOffsetsResultInfos\": [");
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
result.Append($"], \"Error\": \"{Error.Code}\"}}");
return result.ToString();
}
}
}
25 changes: 15 additions & 10 deletions src/Confluent.Kafka/Admin/ListOffsetsResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,36 @@
//
// Refer to LICENSE for more information.
using System.Collections.Generic;
using System.Text;
using System.Linq;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offsets operation.
/// Represents the result of a ListOffsets request.
/// </summary>
public class ListOffsetsResult
{
/// <summary>
/// ListOffsetsResultInfo Elements for all the TopicPartitions queried
/// for ListOffsets
/// Result information for all the partitions queried
/// with ListOffsets.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }


/// <summary>
/// Returns a human readable representation of this object.
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
string res = "ListOffsetsResult:\n";
foreach (var listoffsetsresultinfo in ListOffsetsResultInfos)
{
res += listoffsetsresultinfo.ToString();
}
return res;
var result = new StringBuilder();
result.Append($"{{\"ListOffsetsResultInfos\": [");
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
result.Append("]}");
return result.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.
//
// Refer to LICENSE for more information.
using System.Collections.Generic;

using System.Text;


namespace Confluent.Kafka.Admin
{
Expand All @@ -28,20 +30,23 @@ public class ListOffsetsResultInfo
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }

/// <summary>
/// Timestamp Corresponding to the Offset, -1 if not set by broker.
/// Timestamp Corresponding to the offset, -1 if not set by the broker.
/// </summary>
public long Timestamp { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
string res = "TopicPartitionOffsetError:\n";
res += TopicPartitionOffsetError.ToString();
res += "\n";
res += $"Timestamp : {Timestamp}\n";
return res;
var result = new StringBuilder();
result.Append($"{{\"TopicPartitionOffsetError\": {TopicPartitionOffsetError.ToString().Quote()}");
result.Append($", \"Timestamp\": {Timestamp}");
result.Append("}");
return result.ToString();
}
}
}
Loading