Stream to SSE from another endpoint (upload) #446
Answered
by
sunli829
AlexMikhalev
asked this question in
Q&A
-
Hello all, firstly - poem is great, exactly what I need to move from python fast API.
So, what's the right way to trigger the streaming of events from one open API handler to another? |
Beta Was this translation helpful? Give feedback.
Answered by
sunli829
Dec 10, 2022
Replies: 2 comments 6 replies
-
I wrote an example, it's easy: try it withcurl -X 'POST' 'http://localhost:3000/api/exec' -H 'Content-Type: text/plain; charset=utf-8' -d 'ls' -v Cargo.toml[package]
name = "exec"
version = "0.1.0"
edition = "2021"
[dependencies]
poem = { version = "1.3.49", features = ["session"] }
poem-openapi = { version = "2.0.21", features = ["swagger-ui"] }
tokio = { version = "1.21.2", features = [
"rt-multi-thread",
"macros",
"net",
"sync",
"process",
"io-util",
] }
tokio-stream = { version = "0.1.11", features = ["sync"] } main.rsuse std::process::Stdio;
use poem::{error::BadRequest, listener::TcpListener, Result, Route, Server};
use poem_openapi::{
payload::{EventStream, PlainText},
OpenApi, OpenApiService,
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::mpsc,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
struct Api;
#[OpenApi]
impl Api {
#[oai(path = "/exec", method = "post")]
async fn exec(
&self,
command: PlainText<String>,
) -> Result<EventStream<UnboundedReceiverStream<String>>> {
let mut child = Command::new(command.0)
.stdout(Stdio::piped())
.spawn()
.map_err(BadRequest)?;
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
if let Some(stdout) = child.stdout.take() {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
if matches!(reader.read_line(&mut line).await, Ok(0) | Err(_)) {
break;
}
_ = tx.send(line.clone());
}
}
});
Ok(EventStream::new(UnboundedReceiverStream::new(rx)))
}
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let app = OpenApiService::new(Api, "demo", "1.0").server("http://localhost:3000/api");
let ui = app.swagger_ui();
Server::new(TcpListener::bind("127.0.0.1:3000"))
.name("hello-world")
.run(Route::new().nest("/api", app).nest("/", ui))
.await
} |
Beta Was this translation helpful? Give feedback.
5 replies
-
Execute bash script with POSTcurl -X 'POST' 'http://localhost:3000/api/exec' -H 'Content-Type: text/plain; charset=utf-8' -d 'ls' -v Get the outputs with SSE:curl 'http://localhost:3000/api/output/afdec250-b5a6-42fe-bd37-d14aeadf9cf4' -H 'Content-Type: text/plain; charset=utf-8' -v Cargo.toml[package]
name = "ws_example"
version = "0.1.0"
edition = "2021"
[dependencies]
poem = { version = "1.3.49", features = ["session"] }
poem-openapi = { version = "2.0.21", features = ["swagger-ui", "uuid"] }
tokio = { version = "1.21.2", features = [
"rt-multi-thread",
"macros",
"net",
"sync",
"process",
"io-util",
] }
tokio-stream = { version = "0.1.11", features = ["sync"] }
uuid = { version = "1.2.2", features = ["v4"] } main.rsuse std::{collections::HashMap, process::Stdio};
use poem::{
error::BadRequest, http::StatusCode, listener::TcpListener, Error, Result, Route, Server,
};
use poem_openapi::{
param::Path,
payload::{EventStream, Json, PlainText},
OpenApi, OpenApiService,
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::{mpsc, Mutex},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
type OutputStream = EventStream<UnboundedReceiverStream<String>>;
#[derive(Default)]
struct Api {
outputs: Mutex<HashMap<Uuid, OutputStream>>,
}
#[OpenApi]
impl Api {
#[oai(path = "/exec", method = "post")]
async fn exec(&self, command: PlainText<String>) -> Result<Json<Uuid>> {
let mut child = Command::new(command.0)
.stdout(Stdio::piped())
.spawn()
.map_err(BadRequest)?;
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
if let Some(stdout) = child.stdout.take() {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
if matches!(reader.read_line(&mut line).await, Ok(0) | Err(_)) {
break;
}
_ = tx.send(line.clone());
}
}
});
let id = Uuid::new_v4();
self.outputs
.lock()
.await
.insert(id, EventStream::new(UnboundedReceiverStream::new(rx)));
Ok(Json(id))
}
#[oai(path = "/output/:id", method = "get")]
async fn output(&self, id: Path<Uuid>) -> Result<OutputStream> {
self.outputs
.lock()
.await
.remove(&id)
.ok_or_else(|| Error::from_status(StatusCode::NOT_FOUND))
}
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let app =
OpenApiService::new(Api::default(), "demo", "1.0").server("http://localhost:3000/api");
let ui = app.swagger_ui();
Server::new(TcpListener::bind("127.0.0.1:3000"))
.name("hello-world")
.run(Route::new().nest("/api", app).nest("/", ui))
.await
} |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
AlexMikhalev
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Execute bash script with POST
Get the outputs with SSE:
Cargo.toml