Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP:430 Describe ConsumerGroups/Topic/Cluster with authorized operations #1543

Closed
wants to merge 13 commits into from
77 changes: 72 additions & 5 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,9 @@ def example_describe_consumer_groups(a, args):
"""
Describe Consumer Groups
"""

futureMap = a.describe_consumer_groups(args, request_timeout=10)
include_auth_ops = bool(int(args[0]))
args = args[1:]
futureMap = a.describe_consumer_groups(args, include_authorized_operations=include_auth_ops, request_timeout=10)

for group_id, future in futureMap.items():
try:
Expand All @@ -475,12 +476,72 @@ def example_describe_consumer_groups(a, args):
print(" Assignments :")
for toppar in member.assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
print(" Authorized operations: ")
op_string = ""
for acl_op in g.authorized_operations:
op_string += acl_op.name + " "
print(" {}".format(op_string))
except KafkaException as e:
print("Error while describing group id '{}': {}".format(group_id, e))
except Exception:
raise


def example_describe_topics(a, args):
"""
Describe Topics
"""
include_auth_ops = bool(int(args[0]))
args = args[1:]
futureMap = a.describe_topics(args, request_timeout=10, include_topic_authorized_operations=include_auth_ops)

for topic, future in futureMap.items():
try:
t = future.result()
print("Topic Name: {}".format(t.topic))
print(" Partitions: ")
for partition in t.partitions:
print(" Id : {}".format(partition.id))
print(" Leader : {}".format(partition.leader))
print(" Replicas : {}".format(partition.replicas))
print(" In-Sync Replicas : {}".format(partition.isrs))
print("")
print(" Authorized operations: ")
op_string = ""
for acl_op in t.authorized_operations:
op_string += acl_op.name + " "
print(" {}".format(op_string))
except KafkaException as e:
print("Error while describing topic '{}': {}".format(topic, e))
except Exception:
raise


def example_describe_cluster(a, args):
"""
Describe Cluster
"""
include_auth_ops = bool(int(args[0]))
args = args[1:]
future = a.describe_cluster(request_timeout=10, include_cluster_authorized_operations=include_auth_ops)
try:
c = future.result()
print("Cluster_id : {}".format(c.cluster_id))
print("Controller_id : {}".format(c.controller_id))
print("Nodes :")
for node in c.nodes:
print(" Node: ({}) {}:{}".format(node.id, node.host, node.port))
print("Authorized operations: ")
op_string = ""
for acl_op in c.authorized_operations:
op_string += acl_op.name + " "
print(" {}".format(op_string))
except KafkaException as e:
print("Error while describing cluster: {}".format(e))
except Exception:
raise


def example_delete_consumer_groups(a, args):
"""
Delete Consumer Groups
Expand Down Expand Up @@ -580,7 +641,9 @@ def example_alter_consumer_group_offsets(a, args):
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
sys.stderr.write(' describe_consumer_groups <group1> <group2> ..\n')
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
sys.stderr.write(' describe_topics <include_topic_authorized_operations> <topic1> <topic2> ..\n')
sys.stderr.write(' describe_cluster <include_cluster_authorized_operations>\n')
sys.stderr.write(' delete_consumer_groups <group1> <group2> ..\n')
sys.stderr.write(' list_consumer_group_offsets <group> [<topic1> <partition1> <topic2> <partition2> ..]\n')
sys.stderr.write(
Expand All @@ -593,8 +656,9 @@ def example_alter_consumer_group_offsets(a, args):
operation = sys.argv[2]
args = sys.argv[3:]

conf = {'bootstrap.servers': broker}
# Create Admin client
a = AdminClient({'bootstrap.servers': broker})
a = AdminClient(conf)

opsmap = {'create_topics': example_create_topics,
'delete_topics': example_delete_topics,
Expand All @@ -608,9 +672,12 @@ def example_alter_consumer_group_offsets(a, args):
'list': example_list,
'list_consumer_groups': example_list_consumer_groups,
'describe_consumer_groups': example_describe_consumer_groups,
'describe_topics': example_describe_topics,
'describe_cluster': example_describe_cluster,
'delete_consumer_groups': example_delete_consumer_groups,
'list_consumer_group_offsets': example_list_consumer_group_offsets,
'alter_consumer_group_offsets': example_alter_consumer_group_offsets}
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
'describe_topics': example_describe_topics}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
84 changes: 84 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
ConsumerGroupDescription,
MemberAssignment,
MemberDescription)
from ._topic import (TopicDescription, # noqa: F401
PartitionDescription)
from ._cluster import (ClusterDescription) # noqa: F401
from ..cimpl import (KafkaException, # noqa: F401
KafkaError,
_AdminClientImpl,
Expand Down Expand Up @@ -181,6 +184,31 @@ def _make_consumer_groups_result(f, futmap):
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_describe_topics_result(f, futmap):
"""
Map per-topic results to per-topic futures in futmap.
"""
try:

results = f.result()
futmap_values = list(futmap.values())
len_results = len(results)
len_futures = len(futmap_values)
if len_results != len_futures:
raise RuntimeError(
"Results length {} is different from future-map length {}".format(len_results, len_futures))
for i, result in enumerate(results):
fut = futmap_values[i]
if isinstance(result, KafkaError):
fut.set_exception(KafkaException(result))
else:
fut.set_result(result)
except Exception as e:
# Request-level exception, raise the same for all topics
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_consumer_group_offsets_result(f, futmap):
"""
Expand Down Expand Up @@ -664,6 +692,7 @@ def describe_consumer_groups(self, group_ids, **kwargs):
Describe consumer groups.

:param list(str) group_ids: List of group_ids which need to be described.
:param bool include_authorized_operations: If True, fetches group AclOperations. Default: False
:param float request_timeout: Maximum response time before timing out, or -1 for infinite timeout.
Default: `socket.timeout.ms*1000.0`

Expand All @@ -690,6 +719,61 @@ def describe_consumer_groups(self, group_ids, **kwargs):

return futmap

def describe_topics(self, topics, **kwargs):
"""
Describe topics.

:param list(str) topics: List of topics which need to be described.
:param bool include_topic_authorized_operations: If True, fetches topic AclOperations. Default: False
:param float request_timeout: Maximum response time before timing out, or -1 for infinite timeout.
Default: `socket.timeout.ms*1000.0`

:returns: A dict of futures for each topic, keyed by the topic.
The future result() method returns :class:`TopicDescription`.

:rtype: dict[str, future]

:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""

if not isinstance(topics, list):
raise TypeError("Expected input to be list of topic names to be described")

if len(topics) == 0:
raise ValueError("Expected at least one topic to be described")

f, futmap = AdminClient._make_futures(topics, None,
AdminClient._make_describe_topics_result)

super(AdminClient, self).describe_topics(topics, f, **kwargs)

return futmap

def describe_cluster(self, **kwargs):
"""
Describe cluster.

:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`

:returns: A future returning description of the cluster as result

:rtype: future containing the description of the cluster in result.

:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""

f = AdminClient._create_future()

super(AdminClient, self).describe_cluster(f, **kwargs)

return f

def delete_consumer_groups(self, group_ids, **kwargs):
"""
Delete the given consumer groups.
Expand Down
44 changes: 44 additions & 0 deletions src/confluent_kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from .._util import ConversionUtil
from ._acl import AclOperation


class ClusterDescription:
"""
Represents cluster description information used in describe cluster operation.
Used by :meth:`AdminClient.describe_cluster`.

Parameters
----------
cluster_id : str
The current cluster id in the cluster.
controller_id : int
The current controller id in the cluster.
nodes : list(Node)
Information about each node in the cluster.
authorized_operations: list(AclOperation)
AclOperations allowed for the cluster.
"""

def __init__(self, cluster_id, controller_id, nodes, authorized_operations):
self.cluster_id = cluster_id
self.controller_id = controller_id
self.nodes = nodes
self.authorized_operations = []
if authorized_operations:
for op in authorized_operations:
self.authorized_operations.append(ConversionUtil.convert_to_enum(op, AclOperation))
16 changes: 14 additions & 2 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .._util import ConversionUtil
from .._model import ConsumerGroupState
from ._acl import AclOperation


class ConsumerGroupListing:
Expand All @@ -31,6 +32,7 @@ class ConsumerGroupListing:
state : ConsumerGroupState
Current state of the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, state=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
Expand All @@ -50,6 +52,7 @@ class ListConsumerGroupsResult:
errors : list(KafkaException)
List of errors encountered during the operation, if any.
"""

def __init__(self, valid=None, errors=None):
self.valid = valid
self.errors = errors
Expand All @@ -65,6 +68,7 @@ class MemberAssignment:
topic_partitions : list(TopicPartition)
The topic partitions assigned to a group member.
"""

def __init__(self, topic_partitions=[]):
self.topic_partitions = topic_partitions
if self.topic_partitions is None:
Expand All @@ -89,6 +93,7 @@ class MemberDescription:
group_instance_id : str
The instance id of the group member.
"""

def __init__(self, member_id, client_id, host, assignment, group_instance_id=None):
self.member_id = member_id
self.client_id = client_id
Expand All @@ -109,19 +114,26 @@ class ConsumerGroupDescription:
is_simple_consumer_group : bool
Whether a consumer group is simple or not.
members: list(MemberDescription)
Description of the memebers of the consumer group.
Description of the members of the consumer group.
authorized_operations: list(AclOperation)
AclOperations allowed for the consumer group.
partition_assignor: str
Partition assignor.
state : ConsumerGroupState
Current state of the consumer group.
coordinator: Node
Consumer group coordinator.
"""
def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,

def __init__(self, group_id, is_simple_consumer_group, members, authorized_operations, partition_assignor, state,
coordinator):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
self.members = members
self.authorized_operations = []
if authorized_operations:
for op in authorized_operations:
self.authorized_operations.append(ConversionUtil.convert_to_enum(op, AclOperation))
self.partition_assignor = partition_assignor
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
Expand Down
Loading