diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index bd02e88f0ff81f..c6513c6dff052e 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -74,7 +74,6 @@ unitTest( const reqBody = await request.text(); assertEquals("hello world", reqBody); await respondWith(new Response("")); - break; } break; } @@ -83,7 +82,9 @@ unitTest( const resp = await fetch("http://127.0.0.1:4501/", { body: stream.readable, method: "POST", + headers: { "connection": "close" }, }); + await resp.arrayBuffer(); await promise; }, diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js index 9a008f3cfabd53..0248891329884b 100644 --- a/runtime/js/40_http.js +++ b/runtime/js/40_http.js @@ -186,11 +186,7 @@ ); if (read > 0) { // We read some data. Enqueue it onto the stream. - if (chunk.length == read) { - controller.enqueue(chunk); - } else { - controller.enqueue(chunk.subarray(0, read)); - } + controller.enqueue(chunk.subarray(0, read)); } else { // We have reached the end of the body, so we close the stream. controller.close(); diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index aeca0ca880110f..664776b4b029c1 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -157,13 +157,13 @@ pub struct NextRequestResponse( pub async fn op_next_request( state: Rc>, - rid: ResourceId, + conn_rid: ResourceId, _data: Option, ) -> Result { let conn_resource = state .borrow() .resource_table - .get::(rid) + .get::(conn_rid) .ok_or_else(bad_resource_id)?; poll_fn(|cx| { @@ -209,6 +209,7 @@ pub async fn op_next_request( let stream_reader = StreamReader::new(stream); let mut state = state.borrow_mut(); let request_body_rid = state.resource_table.add(RequestBodyResource { + conn_rid, reader: AsyncRefCell::new(stream_reader), cancel: CancelHandle::default(), }); @@ -221,7 +222,7 @@ pub async fn op_next_request( let response_sender_rid = state.resource_table.add(ResponseSenderResource { sender: tx, - conn_rid: rid, + conn_rid, }); let req_json = NextRequestResponse( @@ -346,12 +347,11 @@ pub fn op_respond( let (sender, body) = Body::channel(); res = builder.body(body)?; - let response_body_rid = - state.resource_table.add(DyperResponseBodyResource { - body: AsyncRefCell::new(sender), - cancel: CancelHandle::default(), - conn_rid: response_sender.conn_rid, - }); + let response_body_rid = state.resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + cancel: CancelHandle::default(), + conn_rid: response_sender.conn_rid, + }); Some(response_body_rid) }; @@ -404,10 +404,39 @@ pub async fn op_read_request( .resource_table .get::(rid as u32) .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); - let read = reader.read(&mut data).try_or_cancel(cancel).await?; - Ok(read) + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + let r = read_fut.poll_unpin(cx); + + let poll_result = match &conn_resource.hyper_connection { + ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), + ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), + }; + + if let Poll::Ready(Err(e)) = poll_result { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(AnyError::from(e))); + } + + if let Poll::Ready(result) = r { + return Poll::Ready(result.map_err(AnyError::from)); + } + + Poll::Pending + }) + .await } pub async fn op_write_response( @@ -419,7 +448,7 @@ pub async fn op_write_response( let resource = state .borrow() .resource_table - .get::(rid as u32) + .get::(rid as u32) .ok_or_else(bad_resource_id)?; let conn_resource = state @@ -466,6 +495,7 @@ type BytesStream = Pin> + Unpin>>; struct RequestBodyResource { + conn_rid: ResourceId, reader: AsyncRefCell>, cancel: CancelHandle, } @@ -487,13 +517,13 @@ impl Resource for ResponseSenderResource { } } -struct DyperResponseBodyResource { +struct ResponseBodyResource { body: AsyncRefCell, cancel: CancelHandle, conn_rid: ResourceId, } -impl Resource for DyperResponseBodyResource { +impl Resource for ResponseBodyResource { fn name(&self) -> Cow { "responseBody".into() }