This is an Amazon MSK Library in Python. This library provides a function to generates a base 64 encoded signed url to enable authentication/authorization with an MSK Cluster. The signed url is generated by using your IAM credentials.
- Free software: Apache Software License 2.0
- Provides a function to generate auth token using IAM credentials from the AWS default credentials chain.
- Provides a function to generate auth token using IAM credentials from the AWS named profile.
- Provides a function to generate auth token using assumed IAM role's credentials.
- Provides a function to generate auth token using a CredentialProvider. The CredentialProvider should be inherited from botocore.credentials.CredentialProvider class.
- For installation, refer to installation guide
- In order to use the signer library with a Kafka client library with SASL/OAUTHBEARER mechanism, add the callback function in your code.
- For example, here is the sample code to use with dpkp/kafka-python library:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('<my aws region>')
return token
tp = MSKTokenProvider()
producer = KafkaProducer(
bootstrap_servers='<my bootstrap string>',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
)
topic = "<my-topic>"
while True:
try:
inp=input(">")
producer.send(topic, inp.encode())
producer.flush()
print("Produced!")
except Exception:
print("Failed to send message:", e)
producer.close()
- Here is a sample consumer with confluent-kafka-python library :
from confluent_kafka import Consumer
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
def oauth_cb(oauth_config):
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("<my aws region>")
# Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms
return auth_token, expiry_ms/1000
c = Consumer({
"debug": "all",
'bootstrap.servers': "<my bootstrap string>",
'client.id': socket.gethostname(),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
'oauth_cb': oauth_cb,
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['<my-topic>'])
print("Starting consumer!")
while True:
msg = c.poll(5)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
- In order to use a named profile to generate token, replace the token() function with code below :
class MSKTokenProvider():
def token(self):
oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_profile('<your aws region>', '<named_profile>')
return oauth2_token
- In order to use a role arn to generate token, replace the token() function with code below :
class MSKTokenProvider():
def token(self):
oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_role_arn('<your aws region>', '<role_arn>')
return oauth2_token
- In order to use a custom credentials provider, replace the token() function with code below :
class MSKTokenProvider():
def token(self):
oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_credentials_provider('<your aws region>', '<your_credentials_provider')
return oauth2_token
You can run tests in all supported Python versions using pytest
. By default,
it will run all of the unit tests.
$ pytest
You can also run tests with setup.py:
$ python setup.py test
To fix lint issues, run the pre-commit command:
$ pre-commit run --all-files
To run tests with coverage information, run:
$ coverage run --source=aws_msk_iam_sasl_signer.MSKAuthTokenProvider -m pytest tests/test_auth_token_provider.py
$ coverage report -m
You may receive an Access denied error and there may be some doubt as to which credential is being exactly used. The credential may be sourced from a role ARN, EC2 instance profile, credential profile etc. When calling generate_auth_token(), you can set aws_debug_creds argument to True along with client side logging set to DEBUG then the signer library will print a debug log of the form:
MSKAuthTokenProvider.generate_auth_token('<my aws region>', aws_debug_creds = True)
Credentials Identity: {UserId: ABCD:test124, Account: 1234567890, Arn: arn:aws:sts::1234567890:assumed-role/abc/test124}
The log line provides the IAM Account, IAM user id and the ARN of the IAM Principal corresponding to the credential being used.
Please use these community resources for getting help. We use the GitHub issues for tracking bugs and feature requests.
- Ask a question or open a discussion.
- If you think you may have found a bug, please open an issue.
- Open a support case with AWS Support.
This repository provides a pluggable library with any Python Kafka client for SASL/OAUTHBEARER mechanism. For more information about SASL/OAUTHBEARER mechanism please go to KIP 255.
If you encounter a bug with the AWS MSK IAM SASL Signer for Python, we would like to hear about it. Search the Issues and see if others are also experiencing the same issue before opening a new issue. Please include the version of AWS MSK IAM SASL Signer for Python, Python, and OS you’re using. Please also include reproduction case when appropriate.
The GitHub issues are intended for bug reports and feature requests. For help and questions with using AWS MSK IAM SASL Signer for Python, please make use of the resources listed in the Getting Help section. Keeping the list of open issues lean will help us respond in a timely manner.
We value feedback and contributions from our community. Whether it's a bug report, new feature, correction, or additional documentation, we welcome your issues and pull requests. Please read through this CONTRIBUTING document before submitting any issues or pull requests to ensure we have all the necessary information to effectively respond to your contribution.
This package was created with Cookiecutter and the audreyr/cookiecutter-pypackage project template.