Skip to content

Commit

Permalink
Describe All
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed Oct 17, 2023
1 parent 42db7f6 commit 08d155c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 33 deletions.
30 changes: 16 additions & 14 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,21 @@ def example_describe_user_scram_credentials(a, args):
"""
Describe User Scram Credentials
"""
futmap = a.describe_user_scram_credentials(args)
if "__all__" not in futmap:
if len(args) == 0:
f = a.describe_user_scram_credentials(args)
try:
results = f.result()
for username, response in results.items():
print("Username : {}\n".format(username))
for scram_credential_info in response.scram_credential_infos:
print(f" Mechanism: {scram_credential_info.mechanism} " +
f"Iterations: {scram_credential_info.iterations}")
except KafkaException as e:
print("Failed to describe all user scram credentials : {}".format(e))
except Exception as e:
raise
else:
futmap = a.describe_user_scram_credentials(args)
for username, fut in futmap.items():
print("Username: {}".format(username))
try:
Expand All @@ -614,18 +627,7 @@ def example_describe_user_scram_credentials(a, args):
except KafkaException as e:
print(" Error: {}".format(e))
except Exception as e:
print(f" Unexpected exception: {e}")
else:
fut = futmap["__all__"]
results = fut.result()
for username, response in results.items():
print("Username : {}\n".format(username))
if isinstance(response, KafkaError):
print(response)
else:
for scram_credential_info in response.scram_credential_infos:
print(f" Mechanism: {scram_credential_info.mechanism} " +
f"Iterations: {scram_credential_info.iterations}")
raise


def example_alter_user_scram_credentials(a, args):
Expand Down
59 changes: 40 additions & 19 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,25 +248,35 @@ def _make_acls_result(f, futmap):
def _make_user_scram_credentials_result(f, futmap):
try:
results = f.result()
if "__all__" in futmap:
fut = futmap["__all__"]
fut.set_result(results)
else:
if len(results) != len(futmap):
raise RuntimeError(
"Result does not have same number of users as queried by the non-empty user list")
for username, value in results.items():
fut = futmap.get(username, None)
if fut is None:
raise RuntimeError(f"username {username} not found in future-map: {futmap}")
if isinstance(value, KafkaError):
fut.set_exception(KafkaException(value))
else:
fut.set_result(value)
len_results = len(results)
len_futures = len(futmap)
if len(results) != len_futures:
raise RuntimeError(
f"Results length {len_results} is different from future-map length {len_futures}")
for username, value in results.items():
fut = futmap.get(username, None)
if fut is None:
raise RuntimeError(f"username {username} not found in future-map: {futmap}")
if isinstance(value, KafkaError):
fut.set_exception(KafkaException(value))
else:
fut.set_result(value)
except Exception as e:
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_describe_all_user_scram_credentials_result(internalfuture, f):
try:
results = internalfuture.result()
for _, value in results.items():
if isinstance(value, KafkaError):
f.set_exception(KafkaException(value))
return
f.set_result(results)
except Exception as e:
f.set_exception(e)

@staticmethod
def _create_future():
f = concurrent.futures.Future()
Expand Down Expand Up @@ -316,6 +326,16 @@ def _make_futures_v2(futmap_keys, class_check, make_result_fn):

return f, futmap

@staticmethod
def _make_futures_v3(make_result_fn):
# Create an internal future for the entire request,
# this future will trigger _make_..._result() and set result/exception
f = AdminClient._create_future()
internalfuture = AdminClient._create_future()
internalfuture.add_done_callback(lambda internalfuture: make_result_fn(internalfuture, f))

return internalfuture, f

@staticmethod
def _has_duplicates(items):
return len(set(items)) != len(items)
Expand Down Expand Up @@ -979,12 +999,13 @@ def describe_user_scram_credentials(self, users, **kwargs):
"""
AdminClient._check_describe_user_scram_credentials_request(users)

if len(users) == 0:
internalfuture, f = AdminClient._make_futures_v3(AdminClient._make_describe_all_user_scram_credentials_result)
super(AdminClient, self).describe_user_scram_credentials(users, internalfuture, **kwargs)
return f

f, futmap = AdminClient._make_futures_v2(users, None,
AdminClient._make_user_scram_credentials_result)

if len(users) == 0:
futmap["__all__"] = AdminClient._create_future()

super(AdminClient, self).describe_user_scram_credentials(users, f, **kwargs)
return futmap

Expand Down

0 comments on commit 08d155c

Please sign in to comment.