Memphis{dev} is a Go-based message broker for developers made out of devs' struggles develop around message brokers.
Enables devs to achieve all other message brokers' benefits in a fraction of the time.
Focusing on automatic optimization, schema management, inline processing, and troubleshooting abilities. All under the same hood.
Utilizing NATS core.
$ pip3 install memphis-py
from memphis import Memphis
from memphis import retention_types, storage_types
First, we need to create Memphis object
and then connect with Memphis by using memphis.connect
.
async def main():
try:
memphis = Memphis()
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
connection_token="<broker-token>",
port="<port>", # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500 # defaults to 1500
)
...
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Once connected, the entire functionalities offered by Memphis are available.
To disconnect from Memphis, call close()
on the memphis object.
await memphis.close()
factory = memphis.factory(name="<factory-name>", description="")
Destroying a factory will remove all its resources (stations/producers/consumers)
factory.destroy()
station = memphis.station(
name="<station-name>",
factory_name="<factory-name>",
retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value=604800, # defaults to 604800
storage_type=storage_types.FILE, # torage_types.FILE/torage_types.MEMORY. Defaults to MEMORY
replicas=1, # defaults to 1
dedup_enabled=False, # defaults to false
dedup_window_ms: 0, # defaults to 0
)
Memphis currently supports the following types of retention:
memphis.retention_types.MAX_MESSAGE_AGE_SECONDS
Means that every message persists for the value set in retention value field (in seconds)
memphis.retention_types.MESSAGES
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
memphis.retention_types.BYTES
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
Memphis currently supports the following types of messages storage:
memphis.storage_types.FILE
Means that messages persist on the file system
memphis.storage_types.MEMORY
Means that messages persist on the main memory
Destroying a station will remove all its resources (producers/consumers)
station.destroy()
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are Uint8Arrays
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
producer = await memphis.producer(station_name="<station-name>", producer_name="<producer-name>")
await prod.produce(
message=bytearray(msg, 'utf-8')), # Uint8Arrays
ack_wait_sec=15, # defaults to 15
producer.destroy()
consumer = await memphis.consumer(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
)
Once all the messages in the station were consumed the msg_handler will receive error: Memphis: TimeoutError
.
async def msg_handler(msgs, error):
for msg in msgs:
print("message: ", msg.get_data())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
await message.ack()
consumer.destroy()