Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 396 ListOffsets #2086

Merged
merged 13 commits into from
Oct 19, 2023
82 changes: 78 additions & 4 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
// Copyright 2016-2023 Confluent Inc., 2015-2016 Andreas Heider
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -241,6 +241,61 @@ static List<UserScramCredentialAlteration> ParseUserScramCredentialAlterations(
return alterations;
}

static List<TopicPartitionOffsetSpec> ParseTopicPartitionOffsetSpecs(string[] args)
{
if (args.Length == 0)
{
Console.WriteLine("usage: .. <bootstrapServers> list-offsets " +
"<topic> <partition> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
Environment.ExitCode = 1;
return null;
}

var topicPartitionOffsetSpecs = new List<TopicPartitionOffsetSpec>();
for (int i = 0; i < args.Length;) {
string topic = args[i];
var partition = Int32.Parse(args[i + 1]);
var offsetSpec = args[i + 2];
if (offsetSpec == "TIMESTAMP")
{
var timestamp = Int64.Parse(args[i + 3]);
i = i + 1;
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.ForTimestamp(timestamp)
});
}
else if (offsetSpec == "MAXTIMESTAMP")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.MaxTimestamp()
});
}
else if (offsetSpec == "EARLIEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Earliest()
});
}
else if (offsetSpec == "LATEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Latest()
});
}
else
{
throw new ArgumentException(
"offsetSpec can be EARLIEST, LATEST, MAXTIMESTAMP or TIMESTAMP T1.");
}
i = i + 3;
}
return topicPartitionOffsetSpecs;
}

static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
{
List<AclBinding> aclBindings;
Expand Down Expand Up @@ -757,19 +812,35 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
}
}

static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) {

var topicPartitionOffsetSpecs = ParseTopicPartitionOffsetSpecs(commandArgs);
var timeout = TimeSpan.FromSeconds(30);
ListOffsetsOptions options = new ListOffsetsOptions(){RequestTimeout = timeout, IsolationLevel = Confluent.Kafka.Admin.IsolationLevel.ReadUncommitted};

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
var ListOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsetSpecs, options);
foreach(var ListOffsetsResultInfo in ListOffsetsResult.ListOffsetsResultInfos)
{
TopicPartitionOffsetError topicPartition = ListOffsetsResultInfo.TopicPartitionOffsetError;
long Timestamp = ListOffsetsResultInfo.Timestamp;
Console.WriteLine($"{topicPartition.Topic} ${topicPartition.Partition} ${topicPartition.Error.Code} ${topicPartition.Offset} ${Timestamp}");
}
}

}
public static async Task Main(string[] args)
{
if (args.Length < 2)
{
Console.WriteLine(
"usage: .. <bootstrapServers> " + String.Join("|", new string[] {
"list-groups", "metadata", "library-version", "create-topic", "create-acls",
"describe-acls", "delete-acls",
"list-consumer-groups", "describe-consumer-groups",
"list-consumer-group-offsets", "alter-consumer-group-offsets",
"incremental-alter-configs", "describe-user-scram-credentials",
"alter-user-scram-credentials"

"alter-user-scram-credentials", "list-offsets"
}) +
" ..");
Environment.ExitCode = 1;
Expand Down Expand Up @@ -824,6 +895,9 @@ public static async Task Main(string[] args)
case "alter-user-scram-credentials":
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs);
break;
case "list-offsets":
await ListOffsetsAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
36 changes: 36 additions & 0 deletions src/Confluent.Kafka/Admin/IsolationLevel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the enum for different IsolationLevel.
/// </summary>
public enum IsolationLevel : int
{
/// <summary>
/// ReadUncommitted
/// </summary>
ReadUncommitted = 0,

/// <summary>
/// ReadCommitted
/// </summary>
ReadCommitted = 1,

}
}
47 changes: 47 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetResultInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offset request for a single topic partition.
/// </summary>
public class ListOffsetsResultInfo
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// TopicPartitionOffsetError of the ListOffset Result Element which is a Topic Partition
/// </summary>
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }

/// <summary>
/// Timestamp Corresponding to the Offset, -1 if not set by broker.
/// </summary>
public long Timestamp { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// </summary>
public override string ToString()
{
string res = "TopicPartitionOffsetError:\n";
res += TopicPartitionOffsetError.ToString();
res += "\n";
res += $"Timestamp : {Timestamp}\n";
return res;
}
}
}
48 changes: 48 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents an error that occured during list offsets.
/// </summary>
public class ListOffsetsException : KafkaException
{
/// <summary>
/// Initializes a new instance of ListOffsetsException.
/// </summary>
/// <param name="results">
/// The result corresponding to all partitions in the request
/// (whether or not they were in error). At least one of these
/// topic partiton in result will be in error.
/// </param>
public ListOffsetsException(ListOffsetsResult result)
: base(new Error(ErrorCode.Local_Partial,
"An error occurred in list offsets, check individual topic partiton in result."))
{
Result = result;
}

/// <summary>
/// The result corresponding to all partitions in the request
/// (whether or not they were in error). At least one of these
/// results will be in error.
/// </summary>
public ListOffsetsResult Result { get; }
}
}
45 changes: 45 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsOption.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;
using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Options for the "AdminClient.ListOffsetsAsync" method.
/// </summary>
public class ListOffsetsOptions
{
/// <summary>
/// The overall request timeout, including broker lookup, request
/// transmission, operation time on broker, and response. If set
/// to null, the default request timeout for the AdminClient will
/// be used.
///
/// Default: null
/// </summary>
public TimeSpan? RequestTimeout { get; set; }

/// <summary>
/// Isolation Level to fetch the offset for. This is on the request level
/// rather than different isolation level for different TopicPartitions
///
/// Default: ReadUncommitted
/// </summary>
public IsolationLevel IsolationLevel { get; set; } = IsolationLevel.ReadUncommitted;
}
}
49 changes: 49 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsReport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offsets operation.
/// </summary>
public class ListOffsetsReport
{
/// <summary>
/// ListOffsetsResultInfo Elements for all the TopicPartitions queried
/// for ListOffsets.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }

/// <summary>
/// List of non-client level errors encountered while listing offsets.
/// </summary>
public Error Error { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// </summary>
public override string ToString()
{
string res = "ListOffsetsReport :\n";
foreach (var listoffsetsresultinfo in ListOffsetsResultInfos)
{
res += listoffsetsresultinfo.ToString();
}
return res;
}
}
}
44 changes: 44 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offsets operation.
/// </summary>
public class ListOffsetsResult
{
/// <summary>
/// ListOffsetsResultInfo Elements for all the TopicPartitions queried
/// for ListOffsets
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// </summary>
public override string ToString()
{
string res = "ListOffsetsResult:\n";
foreach (var listoffsetsresultinfo in ListOffsetsResultInfos)
{
res += listoffsetsresultinfo.ToString();
}
return res;
}
}
}
Loading