Skip to content

Commit

Permalink
httpServerStreamingRequest passes
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Apr 7, 2021
1 parent 0be87c8 commit 3a68712
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
3 changes: 2 additions & 1 deletion cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ unitTest(
const reqBody = await request.text();
assertEquals("hello world", reqBody);
await respondWith(new Response(""));
break;
}
break;
}
Expand All @@ -83,7 +82,9 @@ unitTest(
const resp = await fetch("http://127.0.0.1:4501/", {
body: stream.readable,
method: "POST",
headers: { "connection": "close" },
});

await resp.arrayBuffer();
await promise;
},
Expand Down
6 changes: 1 addition & 5 deletions runtime/js/40_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,7 @@
);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
if (chunk.length == read) {
controller.enqueue(chunk);
} else {
controller.enqueue(chunk.subarray(0, read));
}
controller.enqueue(chunk.subarray(0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
Expand Down
58 changes: 44 additions & 14 deletions runtime/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ pub struct NextRequestResponse(

pub async fn op_next_request(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
conn_rid: ResourceId,
_data: Option<ZeroCopyBuf>,
) -> Result<NextRequestResponse, AnyError> {
let conn_resource = state
.borrow()
.resource_table
.get::<ConnResource>(rid)
.get::<ConnResource>(conn_rid)
.ok_or_else(bad_resource_id)?;

poll_fn(|cx| {
Expand Down Expand Up @@ -209,6 +209,7 @@ pub async fn op_next_request(
let stream_reader = StreamReader::new(stream);
let mut state = state.borrow_mut();
let request_body_rid = state.resource_table.add(RequestBodyResource {
conn_rid,
reader: AsyncRefCell::new(stream_reader),
cancel: CancelHandle::default(),
});
Expand All @@ -221,7 +222,7 @@ pub async fn op_next_request(
let response_sender_rid =
state.resource_table.add(ResponseSenderResource {
sender: tx,
conn_rid: rid,
conn_rid,
});

let req_json = NextRequestResponse(
Expand Down Expand Up @@ -346,12 +347,11 @@ pub fn op_respond(
let (sender, body) = Body::channel();
res = builder.body(body)?;

let response_body_rid =
state.resource_table.add(DyperResponseBodyResource {
body: AsyncRefCell::new(sender),
cancel: CancelHandle::default(),
conn_rid: response_sender.conn_rid,
});
let response_body_rid = state.resource_table.add(ResponseBodyResource {
body: AsyncRefCell::new(sender),
cancel: CancelHandle::default(),
conn_rid: response_sender.conn_rid,
});

Some(response_body_rid)
};
Expand Down Expand Up @@ -404,10 +404,39 @@ pub async fn op_read_request(
.resource_table
.get::<RequestBodyResource>(rid as u32)
.ok_or_else(bad_resource_id)?;

let conn_resource = state
.borrow()
.resource_table
.get::<ConnResource>(resource.conn_rid)
.ok_or_else(bad_resource_id)?;

let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let read = reader.read(&mut data).try_or_cancel(cancel).await?;
Ok(read)
let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();

poll_fn(|cx| {
let r = read_fut.poll_unpin(cx);

let poll_result = match &conn_resource.hyper_connection {
ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
};

if let Poll::Ready(Err(e)) = poll_result {
// close ConnResource
// close RequestResource associated with connection
// close ResponseBodyResource associated with connection
return Poll::Ready(Err(AnyError::from(e)));
}

if let Poll::Ready(result) = r {
return Poll::Ready(result.map_err(AnyError::from));
}

Poll::Pending
})
.await
}

pub async fn op_write_response(
Expand All @@ -419,7 +448,7 @@ pub async fn op_write_response(
let resource = state
.borrow()
.resource_table
.get::<DyperResponseBodyResource>(rid as u32)
.get::<ResponseBodyResource>(rid as u32)
.ok_or_else(bad_resource_id)?;

let conn_resource = state
Expand Down Expand Up @@ -466,6 +495,7 @@ type BytesStream =
Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;

struct RequestBodyResource {
conn_rid: ResourceId,
reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
cancel: CancelHandle,
}
Expand All @@ -487,13 +517,13 @@ impl Resource for ResponseSenderResource {
}
}

struct DyperResponseBodyResource {
struct ResponseBodyResource {
body: AsyncRefCell<hyper::body::Sender>,
cancel: CancelHandle,
conn_rid: ResourceId,
}

impl Resource for DyperResponseBodyResource {
impl Resource for ResponseBodyResource {
fn name(&self) -> Cow<str> {
"responseBody".into()
}
Expand Down

0 comments on commit 3a68712

Please sign in to comment.