Skip to content

Commit

Permalink
Migrate NATS pub/sub to JetStream
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Jul 22, 2023
1 parent 287d6af commit e93502e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 12 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.3.1"
version = "0.3.2"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand Down
9 changes: 6 additions & 3 deletions apiserver/src/handlers/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use amp_common::sync::Synchronization;
use async_nats::jetstream;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::sse::{Event, KeepAlive};
Expand Down Expand Up @@ -207,14 +208,16 @@ pub async fn sync(
Path((pid, name)): Path<(Uuid, String)>,
Json(req): Json<Synchronization>,
) -> Result<impl IntoResponse, ApiError> {
// Connect to NATS server and create a JetStream instance
let client = async_nats::connect(&ctx.config.nats_url)
.await
.map_err(|err| ApiError::NatsConnectError(err.to_string()))?;
let jetstream = jetstream::new(client);

let subject = format!("{}/{}", pid, name);
// Publish a message to the stream
let payload = serde_json::to_vec(&req).map_err(|err| ApiError::SerializeError(err.to_string()))?;
client
.publish(subject, payload.into())
jetstream
.publish(format!("{}-{}", pid, name), payload.into())
.await
.map_err(|err| ApiError::NetsPublishError(err.to_string()))?;

Expand Down
35 changes: 32 additions & 3 deletions syncer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::path::Path;

use amp_common::sync::EventKinds::*;
use amp_common::sync::Synchronization;
use async_nats::jetstream::consumer::pull;
use async_nats::jetstream::{self, stream};
use clap::Parser;
use config::Config;
use futures::StreamExt;
Expand All @@ -42,13 +44,37 @@ async fn main() -> Result<(), async_nats::Error> {
let config = Config::parse();
debug!("the nats url: {:?}", config.nats_url);
debug!("the subject: {:?}", config.subject);

debug!("the workspace: {:?}", config.workspace);
let workspace = Path::new(&config.workspace);

// Connect to NATS server and create a JetStream instance.
let client = async_nats::connect(&config.nats_url).await?;
let mut subscriber = client.subscribe(config.subject).await?;

let workspace = Path::new(&config.workspace);
while let Some(message) = subscriber.next().await {
let jetstream = jetstream::new(client);

// get or create a stream and a consumer
let name = format!("consumers-{}", config.subject);
// First, we create a stream and bind to it.
let consumer = jetstream
.get_or_create_stream(stream::Config {
name: config.subject.clone(),
..Default::default()
})
.await?
// Then, on that `Stream` use method to create Consumer and bind to it.
.get_or_create_consumer(
&name,
pull::Config {
durable_name: Some(name.clone()),
..Default::default()
},
)
.await?;

// Consume messages from the consumer
let mut messages = consumer.messages().await?;
while let Some(Ok(message)) = messages.next().await {
let synchronization = serde_json::from_slice::<Synchronization>(message.payload.as_ref());
if let Err(err) = synchronization {
error!("Received invalid message: {:?} with error: {:?}", message.payload, err);
Expand All @@ -66,6 +92,9 @@ async fn main() -> Result<(), async_nats::Error> {
Override => handle::override_all(workspace, req),
Other => debug!("Received other event, nothing to do!"),
}

// Acknowledge the message
message.ack().await?;
}

Ok(())
Expand Down

0 comments on commit e93502e

Please sign in to comment.