An Apache Pulsar client written in Elixir using the Pulsar websocket API.
Stargate allows you to create producer and consumer connections to a Pulsar cluster and send messages to or receive messages from the cluster. Receivers of messages from Pulsar can be a Reader connection, receiving messages from a topic based on an initial message in the log defined by the user, or via a Consumer connection, receiving all messages on the topic after initial connection.
Per Pulsar's documentation, all messages received from a topic are acknowledged by the websocket API, but the Reader acknowledgment is solely to signal to the socket the reader is ready for additional messages, while Consumer acknowledgement will additionally signal to the socket the consumer is ready for the message to be deleted from its subscription.
Stargate receivers must supply a message handler module to tell Stargate how to handle messages received from the socket.
Producers must supply connection settings for a persistent connection or may supply a single URL signifying the desired persistence, tenant, namespace, and topic to produce to when ad hoc produce is desired.
Available in Hex, the package can be installed
by adding stargate
to your list of dependencies in mix.exs
via the Github strategy:
def deps do
[
{:stargate, "~> 0.1.0"}
]
end
The docs can be found at https://hexdocs.pm/stargate.
Under the hood, Stargate uses the Registry module to create a via-tuple name for each process within a Stargate-
managed supervision tree. The intended API is to create a top-level Stargate.Supervisor
and attach option producers
and receivers under it as needed. When you do this, Stargate creates the top-level supervisor and prepends :sg_sup_#{name}
to the name of the process (defaults to :default
) as well as a Registry named :sg_reg_#{name}
(defaults to :default
).
All processes spun up under this pair receive a via-tuple name identified by {via, Registry, {<registry_name>, {<component>, <persistence>, <tenant>, <namespace>, <topic>}}}
where component is the type of process (:producer
, :consumer
, etc),
persistence is either "persistent" or "non-persistent", and the remaining values are all strings representing the values supplied
when creating the topic in Pulsar.
To assist in finding the correct process to send messages to, Stargate provides the Stargate.registry_key/4
function as a
helper to calculate this via-tuple easily. When supplying the Stargate.registry_key(<tenant>, <namespace>, <topic>)
, Stargate
assumes you're trying to reach the Producer process as this is the process most likely to be called directly (in conjunction within
Stargate.produce/2,3
), assumes a persistent topic (because Pulsar assumes topics are persistent by default) and assumes the
Supervisor and Registry name suffix to be :default
. If necessary, you can supply a Keyword List of options to customize the
via-tuple returned by Stargate.registry_key/4
with a combination of the arguments :name
or :registry
, :component
, :persistence
.
iex> Stargate.registry_key("foo", "bar", "baz")
{:via, Registry, {:sg_reg_default, {:producer, "persistent", "foo", "bar", "baz"}}}
iex> Stargate.registry("foo", "bar", "baz", name: :custom, persistence: "non-persistent", component: :consumer)
{:via, Registry, {:sg_reg_custom, {:consumer, "non-persistent", "foo", "bar", "baz"}}}
Producing to Pulsar via Stargate is as simple as passing an Erlang term to the produce function. Stargate takes care of encoding the message payload with the necessary fields and format required by Pulsar with options to specify certain fields as fits the users' specific needs (see below).
To ad hoc produce to Pulsar via Stargate, call Stargate.produce(url, [message])
where url
is something like ws://cluster-url:8080/ws/v2/producer/:persistence/:tenant/:namespace/:topic"
and [message]
is a single message or a list of messages which can be one of:
- a string-keyed map containing the "payload" key and optionally the "context" key if a specific message context is desired (Stargate uses the "context" for tracking receipt of produced messages by the cluster).
- A two-element tuple containing a key as the first element and a payload as the second.
- A message payload only Stargate will clean up the producer processes once the produce has completed.
For persistent producer connections to the cluster, you may also start a supervised tree of processes including the socket producer itself and an acknowledger, the job of which is to ensure receipt of messages by the cluster.
To start a supervised producer, call something like the following within your application:
opts = [
name: :my_producer, optional \\ default :default
host: ["example.com": 8080],
protocol: "ws", optional \\ default ws
producer: [
persistence: "persistent", optional \\ default persistent
tenant: "public",
namespace: "default",
topic: "foo",
query_params: %{ all query params are optional
name: "summer",
send_timeout: 30_000, \\ default 30_000
batch_enabled: true, \\ default false
batch_max_msg: 1_000, \\ default 1_000
max_pending_msg: 1_000, \\ default 1_000
batch_max_delay: 25, \\ default 10
routing_mode: :round_robin, \\ (deprecated by Pulsar) options :round_robin | :single
compression_type: :lz4, \\ default uncompressed, options :lz4 | :zlib
producer_name: "myapp-producer",
initial_seq_id: 100,
hashing_scheme: :murmur3 \\ options :java_string | :murmur3
}
]
]
Stargate.Supervisor.start_link(opts)
By default, the Stargate.produce/2
will block until it receives
acknowledgement the message has successfully been produced to the cluster. To ackowledge produce
operations in a non-blocking manner, you may also use Stargate.produce/3
and pass an MFA tuple
as the third argument instructing the acknowledger which module, function, and arguments to execute
upon receipt of acknowledgement from Pulsar that the produce succeeded.
To consume messages from Pulsar via Stargate, your application needs to create a consumer or reader process by calling something like the following:
opts = [
name: :my_reader, optional \\ default :default
host: ["example.com": 8080],
protocol: "ws", optional \\ default ws
reader: [
persistence: "persistent", optional \\ default persistent
tenant: "public",
namespace: "default",
topic: "foo",
processors: 3, optional \\ default 1
handler: MyApp.Reader.Handler,
handler_init_args: [] optional \\ default []
query_params: %{ all query params are optional
queue_size: 1_000, \\ default 1_000
name: "morty",
starting_message: :latest \\ default :latest, options :latest | :earliest | "some-message-id"
}
]
]
opts = [
name: :my_consumer, optional \\ default :default
host: ["example.com": 8080],
protocol: "ws", optional \\ default ws
consumer: [
persistence: "persistent", optional \\ default persistent
tenant: "public",
namespace: "default",
topic: "foo",
subscription: "my-app",
processors: 3, optional \\ default 1
handler: MyApp.Reader.Handler,
handler_init_args: [] optional \\ default []
query_params: %{ all query params are optional
subscription_type: :shared, \\ default :exclusive
ack_timeout: 100, \\ default 0
queue_size: 1_000, \\ default 1_000
name: "rick",
priority: 10,
max_redeliver_count: 10, \\ default 0
dead_letter_topic: "ricks-dlq", \\ default "{topic}-{subscription}-DLQ"
pull_mode: true \\ default false
}
]
]
Stargate.Supervisor.start_link(opts)
Stargate's receivers are implemented using GenStage to allow for the message processing (the processes that actually
call your handler function on messages received) to be parallelized as well as to backpressure into the cluster
when setting your consumer connection to pull_mode: true
.
Starting producers and consumer can but need not require different supervision trees. If your application is a link
in a chain that will both receive and produce to the same cluster, you can start a single supervisor with keys for
both the receiver and producer configs in the options passed to the Stargate.Supervisor.start_link/1
function.
As an example, the configuration below would start a Stargate Supervisor that will manage processes that both monitor an hypothetical company's internal R&D topic publishing application features ready for release and then turn around and produce messages about those feature messages to the application marketing team's topic for public consumption.
options = [
name: :pulsar_app,
host: [{:"example.com", 8080}],
producer: [
persistence: "non-persistent",
tenant: "marketing",
namespace: "public",
topic: "new-stuff"
]
consumer: [
tenant: "internal",
namespace: "research",
topic: "ready-to-release",
subscription: "rss-feed",
handler: Publicizer.MessageHandler
]
]
Stargate.Supervisor.start_link(options)
Stargate supports client security in two ways: by allowing TLS encryption of the connection between the client and the Pulsar cluster, and by JWT token authentication of the Websocket connection.
In order to allow TLS encryption of the websocket connection, the URL to the cluster must be over the secure
port (Pulsar defaults to 8443) and using the wss://
prefix instead of the plaintext ws://
.
To pass custom certificates, to the client connection, include the following in the init options passed to the producer or receiver connection:
options = [
...
ssl_options: [
cacertfile: "/certificates/cacert.pem",
certfile: "/certificates/cert.pem",
keyfile: "/certificates/key.pem"
],
...
]
In order to pass a JWT token for authentication, include the following in your connection options for either the producer or receiver process when starting the process.
options = [
...
auth_token: "some-jwt-token-string",
...
]
The message handler module passed to a receiver is expected to implement the Stargate.Receiver.MessageHandler
behaviour which includes an optional init/1
if your message handler is expected to track state across
messages received from the topic and handle_message/1
, handle_message/2
where handle_message/1
receives
only a message while handle_message/2
receives a message and the state being tracked across all messages handled.
Including use Stargate.Receiver.MessageHandler
in your handler module will created default implementations of
these functions that can be overridden as needed. handle_message/1
is expected to return either :ack
or :continue
while handle_message/2
is expected to return {:ack, state}
or {:continue, state}
Messages received by Stargate are automatically converted to a %Stargate.Message{}
struct by the time they reach
the message handler module. This struct contains all the information about where the message came from (topic, namespace,
tenant, and persistence) as well as the message identifier assigned to it by the Pulsar cluster, the key if one was
supplied, any properties (key/value pairs) attached to the message by the producer, the timestamp the message was published
by the cluster as an Elixir DateTime struct and of course the message payload decoded from the Base64 encoding received from
Pulsar.
At present, Apache Pulsar does not support cumulative acknowledgement of messages via the websocket API. When this is supported (PR forthcoming with the upstream project), Stargate will be updated to include this as an optional ack strategy for faster processing of large groups of messages.
At the time of initial release, Stargate only provides functionality for producing to and receiving (reading or consuming) from Pulsar topics. Pulsar allows for automatically creating single-partition topics within existing tenants and namespaces.
While Pulsar provides a RESTful management API, Stargate does not currently provide any functions for interacting with this API. This is a planned expansion of Stargate functionality with the intent that management of tenants, namespaces, and topics can all be accomplished natively from Stargate modules.
Stay tuned!
Stargate uses the divo and the plugin divo_pulsar for integration testing, using Docker to stand up a single-node Pulsar "cluster" in standalone mode for testing. See the documentation on the divo and divo_pulsar hex packages for further details.
For additional information on Pulsar, including configuring and running a cluster, see the project's official documentation.