Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example on usage via async #185

Closed
delijati opened this issue Apr 21, 2021 · 12 comments
Closed

Example on usage via async #185

delijati opened this issue Apr 21, 2021 · 12 comments
Labels
feature-request A feature should be added or improved. p2 This is a standard priority issue

Comments

@delijati
Copy link

Platform/OS/Device
What are you running the sdk on? linux, python >= 3.7

Describe the question
There are for v1 and v2 no examples on how to use it with async/ await

@delijati delijati added guidance Question that needs advice or information. needs-triage This issue or PR still needs to be triaged. labels Apr 21, 2021
@jmklix jmklix added feature-request A feature should be added or improved. and removed guidance Question that needs advice or information. labels Apr 22, 2021
@jmklix
Copy link
Member

jmklix commented Apr 22, 2021

Is there something specific that you would want shown in said example?

@jmklix jmklix added response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 2 days. and removed needs-triage This issue or PR still needs to be triaged. labels Apr 22, 2021
@delijati
Copy link
Author

No not really just a asyncio example so i can integrate it in my fastapi application

@delijati
Copy link
Author

delijati commented Apr 23, 2021

For now i have a working solution with pure paho and asyncio over websocket but i would prefer to use the sdk.

@github-actions github-actions bot removed the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 2 days. label Apr 23, 2021
@lllama
Copy link

lllama commented Jul 4, 2021

I would like to see an example which used a async/await instead of callbacks. Simple examples would be:

  • Subscribing to a list of topics using async for
  • Publishing to a topic and having it accepted/rejected using await syntax
  • Awaiting pending job messages (notify or notify-next) and then awaiting the job descriptions (for notify) and awaiting setting the job status to IN_PROGRESS. Bonus points for also showing how to await getting a pre-signed URL from s3 😀

@hwestman
Copy link

This would be sweet!

@BestITUserEUW
Copy link

I have a Question Related to the Topic. Is it even possible to use Asyncio with Mqtt without using a seperate Event Loop? Because aws is using its own asnyc

@golkedj
Copy link

golkedj commented Nov 16, 2021

I am on a small team making use of this project and we are experiencing difficulties incorporating this into an app that leverages python's official asyncio solution as well. Documentation on how to cleanly use this package alongside asyncio would be very beneficial.

@laurentrivard
Copy link

laurentrivard commented Dec 8, 2021

I am in the same boat as @golkedj above. Can we use the same event loop the rest of the application uses?

@jmklix jmklix added the p2 This is a standard priority issue label Nov 9, 2022
@GardarGardarsson
Copy link

Any intention on releasing such an example?
I would like to build a server class that may be notified of incoming messages from different process threads, through an async event.
How to handle the MQTT Connection accordingly would be greatly appreciated.

@jmklix
Copy link
Member

jmklix commented Nov 13, 2023

This is not something that we are going to add a sample for in the near future, so I will be closing this feature request for now. Right now it is not possible to use async/await with this sdk because of how it was designed. It would require changes to the bindings to allow it to work. Sorry if you where waiting for a sample for this, I just want to be transparent that this is not something we are working on adding

@jmklix jmklix closed this as completed Nov 13, 2023
Copy link

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@ValorenceCLE
Copy link

Any intention on releasing such an example? I would like to build a server class that may be notified of incoming messages from different process threads, through an async event. How to handle the MQTT Connection accordingly would be greatly appreciated.

I have figured out a way to use it in an asynchronous context I THINK, I linked a file in one of my repositories that contains an asynchronous AWSIoTClient

import asyncio
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
from utils.logging_setup import local_logger as logger
from utils.config import settings
import json

class AWSIoTClient:
_instance = None

def __new__(cls, *args, **kwargs):
    if not cls._instance:
        cls._instance = super().__new__(cls)
    return cls._instance

def __init__(self):
    if hasattr(self, "_initialized") and self._initialized:
        return
    self._initialized = True
    self.client = self._initialize_client()
    self.device_id = settings.AWS_CLIENT_ID
    self.TIMEOUT = 100

def _initialize_client(self):
    logger.info("Initializing AWS IoT client...")
    return mqtt5_client_builder.mtls_from_path(
        endpoint=settings.AWS_ENDPOINT,
        port=8883,
        cert_filepath=settings.DEVICE_COMBINED_CRT,
        pri_key_filepath=settings.DEVICE_KEY,
        ca_filepath=settings.AWS_ROOT_CA,
        http_proxy_options=None,
        on_publish_received=self.on_publish_received,
        on_lifecycle_stopped=self.on_lifecycle_stopped,
        on_lifecycle_connection_success=self.on_lifecycle_connection_success,
        on_lifecycle_connection_failure=self.on_lifecycle_connection_failure,
        client_id=settings.AWS_CLIENT_ID,
    )

async def start(self):
    try:
        logger.info("Starting AWS IoT client...")
        self.future_connection_success = asyncio.Future()
        self.client.start()
        await asyncio.wait_for(self.future_connection_success, timeout=self.TIMEOUT)
    except asyncio.TimeoutError:
        logger.error("Timeout while starting AWS IoT client.")
        raise
    except Exception as e:
        logger.error(f"Error starting AWS IoT client: {e}")
        raise

async def stop(self):
    try:
        logger.info("Stopping AWS IoT client...")
        self.future_stopped = asyncio.Future()
        self.client.stop()
        await asyncio.wait_for(self.future_stopped, timeout=self.TIMEOUT)
    except asyncio.TimeoutError:
        logger.error("Timeout while stopping AWS IoT client.")
        raise
    except Exception as e:
        logger.error(f"Error stopping AWS IoT client: {e}")
        raise

async def publish(self, topic, payload, source=None):
    if not isinstance(payload, dict):
        logger.error("Payload must be a dictionary.")
        return
    payload["device_id"] = self.device_id
    if source:
        payload["source"] = source
    json_payload = json.dumps(payload)
    prefixed_topic = f"{self.device_id}/{topic}"
    logger.debug(f"Publishing to topic '{prefixed_topic}' with payload: {json_payload}")
    publish_future = self.client.publish(
        mqtt5.PublishPacket(
            topic=prefixed_topic,
            payload=json_payload.encode("utf-8"),
            qos=mqtt5.QoS.AT_LEAST_ONCE,
        )
    )
    await self._wrap_future(publish_future)

async def subscribe(self, topic, callback=None):
    try:
        logger.info(f"Subscribing to topic '{topic}'...")
        subscribe_future = self.client.subscribe(
            subscribe_packet=mqtt5.SubscribePacket(
                subscriptions=[
                    mqtt5.Subscription(
                        topic_filter=f"{self.device_id}/{topic}",
                        qos=mqtt5.QoS.AT_LEAST_ONCE,
                    )
                ]
            )
        )
        await self._wrap_future(subscribe_future)
        if callback:
            self.client.on_publish_received = callback
    except Exception as e:
        logger.error(f"Error subscribing to topic '{topic}': {e}")
        raise

def _wrap_future(self, sdk_future):
    """
    Converts a concurrent.futures.Future into an asyncio.Future.
    """
    asyncio_future = asyncio.Future()

    def callback(_sdk_future):
        try:
            result = _sdk_future.result()
            asyncio_future.set_result(result)
        except Exception as e:
            asyncio_future.set_exception(e)

    sdk_future.add_done_callback(callback)
    return asyncio_future

# Callbacks
def on_publish_received(self, publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    logger.info(
        f"Received message from topic '{publish_packet.topic}': {publish_packet.payload}"
    )

def on_lifecycle_stopped(self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
    logger.info("MQTT connection stopped")
    self.future_stopped.set_result(lifecycle_stopped_data)

def on_lifecycle_connection_success(
    self, lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData
):
    logger.info("Lifecycle Connection Success")
    self.future_connection_success.set_result(lifecycle_connect_success_data)

def on_lifecycle_connection_failure(
    self, lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData
):
    logger.error(
        f"Connection failed with exception: {lifecycle_connection_failure.exception}"
    )

Plain Functions for Easy Usage

_client_instance = None
def _get_client_instance():
global _client_instance
if _client_instance is None:
_client_instance = AWSIoTClient()
return _client_instance

async def start():
await _get_client_instance().start()

async def stop():
await _get_client_instance().stop()

async def publish(topic, payload, source=None):
await _get_client_instance().publish(topic, payload, source)

async def subscribe(topic, callback=None):
await _get_client_instance().subscribe(topic, callback)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request A feature should be added or improved. p2 This is a standard priority issue
Projects
None yet
Development

No branches or pull requests

9 participants