From 3ae1581a539b67363bd87d9d8fc8635a204eec5d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 7 Jan 2021 17:22:12 -0800 Subject: [PATCH] feat(ffi): Initial C API for hyper --- .github/workflows/CI.yml | 49 +++- .gitignore | 4 +- Cargo.toml | 8 +- capi/README.md | 17 ++ capi/cbindgen.toml | 14 + capi/examples/Makefile | 22 ++ capi/examples/client.c | 343 ++++++++++++++++++++++++ capi/examples/upload.c | 386 +++++++++++++++++++++++++++ capi/gen_header.sh | 72 +++++ capi/include/hyper.h | 554 +++++++++++++++++++++++++++++++++++++++ src/body/body.rs | 28 ++ src/error.rs | 14 +- src/ffi/body.rs | 233 ++++++++++++++++ src/ffi/client.rs | 148 +++++++++++ src/ffi/error.rs | 80 ++++++ src/ffi/http_types.rs | 267 +++++++++++++++++++ src/ffi/io.rs | 173 ++++++++++++ src/ffi/macros.rs | 23 ++ src/ffi/mod.rs | 55 ++++ src/ffi/task.rs | 415 +++++++++++++++++++++++++++++ src/lib.rs | 3 + src/proto/h1/dispatch.rs | 16 +- 22 files changed, 2910 insertions(+), 14 deletions(-) create mode 100644 capi/README.md create mode 100644 capi/cbindgen.toml create mode 100644 capi/examples/Makefile create mode 100644 capi/examples/client.c create mode 100644 capi/examples/upload.c create mode 100755 capi/gen_header.sh create mode 100644 capi/include/hyper.h create mode 100644 src/ffi/body.rs create mode 100644 src/ffi/client.rs create mode 100644 src/ffi/error.rs create mode 100644 src/ffi/http_types.rs create mode 100644 src/ffi/io.rs create mode 100644 src/ffi/macros.rs create mode 100644 src/ffi/mod.rs create mode 100644 src/ffi/task.rs diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 7aeceb3f10..ca864d38a6 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -16,6 +16,7 @@ jobs: - style - test - features + - ffi - doc steps: - run: exit 0 @@ -111,7 +112,53 @@ jobs: run: cargo install cargo-hack - name: check --each-feature - run: cargo hack check --each-feature -Z avoid-dev-deps + run: cargo hack check --each-feature --skip ffi -Z avoid-dev-deps + + ffi: + name: Test C API (FFI) + needs: [style] + + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v1 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Install cbindgen + uses: actions-rs/cargo@v1 + with: + command: install + args: cbindgen + + - name: Build FFI + uses: actions-rs/cargo@v1 + env: + RUSTFLAGS: --cfg hyper_unstable_ffi + with: + command: build + args: --features client,http1,http2,ffi + + # TODO: re-enable check once figuring out how to get it working in CI + # - name: Verify cbindgen + # run: ./capi/gen_header.sh --verify + + - name: Make Examples + run: cd capi/examples && make client + + - name: Run FFI unit tests + uses: actions-rs/cargo@v1 + env: + RUSTFLAGS: --cfg hyper_unstable_ffi + with: + command: test + args: --features full,ffi --lib doc: name: Build docs diff --git a/.gitignore b/.gitignore index 4fffb2f89c..a9d37c560c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -/target -/Cargo.lock +target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 50b1463da5..f5357ed5ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,9 @@ include = [ #"build.rs", ] +[lib] +crate-type = ["lib", "staticlib", "cdylib"] + [dependencies] bytes = "1" futures-core = { version = "0.3", default-features = false } @@ -38,6 +41,7 @@ want = "0.3" # Optional +libc = { version = "0.2", optional = true } socket2 = { version = "0.3.16", optional = true } [dev-dependencies] @@ -94,7 +98,6 @@ server = [] stream = [] # Tokio support - runtime = [ "tcp", "tokio/rt", @@ -106,6 +109,9 @@ tcp = [ "tokio/time", ] +# C-API support (currently unstable (no semver)) +ffi = ["libc"] + # internal features used in CI nightly = [] __internal_happy_eyeballs_tests = [] diff --git a/capi/README.md b/capi/README.md new file mode 100644 index 0000000000..9d6f9f6d14 --- /dev/null +++ b/capi/README.md @@ -0,0 +1,17 @@ +# C API for hyper + +This provides auxiliary pieces for a C API to use the hyper library. + +## Unstable + +The C API of hyper is currently **unstable**, which means it's not part of the semver contract as the rest of the Rust API is. + +Because of that, it's only accessible if `--cfg hyper_unstable_ffi` is passed to `rustc` when compiling. The easiest way to do that is setting the `RUSTFLAGS` environment variable. + +## Building + +The C API is part of the Rust library, but isn't compiled by default. Using `cargo`, it can be compiled with the following command: + +``` +RUSTFLAGS="--cfg hyper_unstable_ffi" cargo build --features client,http1,http2,ffi +``` diff --git a/capi/cbindgen.toml b/capi/cbindgen.toml new file mode 100644 index 0000000000..fd611e18c4 --- /dev/null +++ b/capi/cbindgen.toml @@ -0,0 +1,14 @@ +language = "C" +include_guard = "_HYPER_H" +no_includes = true +sys_includes = ["stdint.h", "stddef.h"] +cpp_compat = true +documentation_style = "c" + +[parse.expand] +crates = ["hyper-capi"] + +[export.rename] +"Exec" = "hyper_executor" +"Io" = "hyper_io" +"Task" = "hyper_task" diff --git a/capi/examples/Makefile b/capi/examples/Makefile new file mode 100644 index 0000000000..6cc0a69575 --- /dev/null +++ b/capi/examples/Makefile @@ -0,0 +1,22 @@ +# +# Build the example client +# + +TARGET = client + +OBJS = client.o + +RPATH=$(PWD)/../../target/debug +CFLAGS = -I../include +LDFLAGS = -L$(RPATH) -Wl,-rpath,$(RPATH) +LIBS = -lhyper + +$(TARGET): $(OBJS) + $(CC) -o $(TARGET) $(OBJS) $(LDFLAGS) $(LIBS) + +upload: upload.o + $(CC) -o upload upload.o $(LDFLAGS) $(LIBS) + +clean: + rm -f $(OBJS) $(TARGET) + rm -f upload upload.o diff --git a/capi/examples/client.c b/capi/examples/client.c new file mode 100644 index 0000000000..6ed66a46db --- /dev/null +++ b/capi/examples/client.c @@ -0,0 +1,343 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "hyper.h" + + +struct conn_data { + int fd; + hyper_waker *read_waker; + hyper_waker *write_waker; +}; + +static size_t read_cb(void *userdata, hyper_context *ctx, uint8_t *buf, size_t buf_len) { + struct conn_data *conn = (struct conn_data *)userdata; + ssize_t ret = read(conn->fd, buf, buf_len); + + if (ret < 0) { + int err = errno; + if (err == EAGAIN) { + // would block, register interest + if (conn->read_waker != NULL) { + hyper_waker_free(conn->read_waker); + } + conn->read_waker = hyper_context_waker(ctx); + return HYPER_IO_PENDING; + } else { + // kaboom + return HYPER_IO_ERROR; + } + } else { + return ret; + } +} + +static size_t write_cb(void *userdata, hyper_context *ctx, const uint8_t *buf, size_t buf_len) { + struct conn_data *conn = (struct conn_data *)userdata; + ssize_t ret = write(conn->fd, buf, buf_len); + + if (ret < 0) { + int err = errno; + if (err == EAGAIN) { + // would block, register interest + if (conn->write_waker != NULL) { + hyper_waker_free(conn->write_waker); + } + conn->write_waker = hyper_context_waker(ctx); + return HYPER_IO_PENDING; + } else { + // kaboom + return HYPER_IO_ERROR; + } + } else { + return ret; + } +} + +static void free_conn_data(struct conn_data *conn) { + if (conn->read_waker) { + hyper_waker_free(conn->read_waker); + conn->read_waker = NULL; + } + if (conn->write_waker) { + hyper_waker_free(conn->write_waker); + conn->write_waker = NULL; + } + + free(conn); +} + +static int connect_to(const char *host, const char *port) { + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + struct addrinfo *result, *rp; + if (getaddrinfo(host, port, &hints, &result) != 0) { + printf("dns failed for %s\n", host); + return -1; + } + + int sfd; + for (rp = result; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1) { + break; + } else { + close(sfd); + } + } + + freeaddrinfo(result); + + // no address succeeded + if (rp == NULL) { + printf("connect failed for %s\n", host); + return -1; + } + + return sfd; +} + +static int print_each_header(void *userdata, + const uint8_t *name, + size_t name_len, + const uint8_t *value, + size_t value_len) { + printf("%.*s: %.*s\n", (int) name_len, name, (int) value_len, value); + return HYPER_ITER_CONTINUE; +} + +static int print_each_chunk(void *userdata, const hyper_buf *chunk) { + const uint8_t *buf = hyper_buf_bytes(chunk); + size_t len = hyper_buf_len(chunk); + + write(1, buf, len); + + return HYPER_ITER_CONTINUE; +} + +typedef enum { + EXAMPLE_NOT_SET = 0, // tasks we don't know about won't have a userdata set + EXAMPLE_HANDSHAKE, + EXAMPLE_SEND, + EXAMPLE_RESP_BODY +} example_id; + +#define STR_ARG(XX) (uint8_t *)XX, strlen(XX) + +int main(int argc, char *argv[]) { + const char *host = argc > 1 ? argv[1] : "httpbin.org"; + const char *port = argc > 2 ? argv[2] : "80"; + const char *path = argc > 3 ? argv[3] : "/"; + printf("connecting to port %s on %s...\n", port, host); + + int fd = connect_to(host, port); + if (fd < 0) { + return 1; + } + printf("connected to %s, now get %s\n", host, path); + + if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) { + printf("failed to set socket to non-blocking\n"); + return 1; + } + + fd_set fds_read; + fd_set fds_write; + fd_set fds_excep; + + struct conn_data *conn = malloc(sizeof(struct conn_data)); + + conn->fd = fd; + conn->read_waker = NULL; + conn->write_waker = NULL; + + + // Hookup the IO + hyper_io *io = hyper_io_new(); + hyper_io_set_userdata(io, (void *)conn); + hyper_io_set_read(io, read_cb); + hyper_io_set_write(io, write_cb); + + printf("http handshake ...\n"); + + // We need an executor generally to poll futures + const hyper_executor *exec = hyper_executor_new(); + + // Prepare client options + hyper_clientconn_options *opts = hyper_clientconn_options_new(); + hyper_clientconn_options_exec(opts, exec); + + hyper_task *handshake = hyper_clientconn_handshake(io, opts); + hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE); + + // Let's wait for the handshake to finish... + hyper_executor_push(exec, handshake); + + // In case a task errors... + hyper_error *err; + + // The polling state machine! + while (1) { + // Poll all ready tasks and act on them... + while (1) { + hyper_task *task = hyper_executor_poll(exec); + if (!task) { + break; + } + switch ((example_id) hyper_task_userdata(task)) { + case EXAMPLE_HANDSHAKE: + ; + if (hyper_task_type(task) == HYPER_TASK_ERROR) { + printf("handshake error!\n"); + err = hyper_task_value(task); + goto fail; + } + assert(hyper_task_type(task) == HYPER_TASK_CLIENTCONN); + + printf("preparing http request ...\n"); + + hyper_clientconn *client = hyper_task_value(task); + hyper_task_free(task); + + // Prepare the request + hyper_request *req = hyper_request_new(); + if (hyper_request_set_method(req, STR_ARG("GET"))) { + printf("error setting method\n"); + return 1; + } + if (hyper_request_set_uri(req, STR_ARG(path))) { + printf("error setting uri\n"); + return 1; + } + + hyper_headers *req_headers = hyper_request_headers(req); + hyper_headers_set(req_headers, STR_ARG("host"), STR_ARG(host)); + + // Send it! + hyper_task *send = hyper_clientconn_send(client, req); + hyper_task_set_userdata(send, (void *)EXAMPLE_SEND); + printf("sending ...\n"); + hyper_executor_push(exec, send); + + // For this example, no longer need the client + hyper_clientconn_free(client); + + break; + case EXAMPLE_SEND: + ; + if (hyper_task_type(task) == HYPER_TASK_ERROR) { + printf("send error!\n"); + err = hyper_task_value(task); + goto fail; + } + assert(hyper_task_type(task) == HYPER_TASK_RESPONSE); + + // Take the results + hyper_response *resp = hyper_task_value(task); + hyper_task_free(task); + + uint16_t http_status = hyper_response_status(resp); + + printf("\nResponse Status: %d\n", http_status); + + hyper_headers *headers = hyper_response_headers(resp); + hyper_headers_foreach(headers, print_each_header, NULL); + printf("\n"); + + hyper_body *resp_body = hyper_response_body(resp); + hyper_task *foreach = hyper_body_foreach(resp_body, print_each_chunk, NULL); + hyper_task_set_userdata(foreach, (void *)EXAMPLE_RESP_BODY); + hyper_executor_push(exec, foreach); + + // No longer need the response + hyper_response_free(resp); + + break; + case EXAMPLE_RESP_BODY: + ; + if (hyper_task_type(task) == HYPER_TASK_ERROR) { + printf("body error!\n"); + err = hyper_task_value(task); + goto fail; + } + + assert(hyper_task_type(task) == HYPER_TASK_EMPTY); + + printf("\n -- Done! -- \n"); + + // Cleaning up before exiting + hyper_task_free(task); + hyper_executor_free(exec); + free_conn_data(conn); + + return 0; + case EXAMPLE_NOT_SET: + // A background task for hyper completed... + hyper_task_free(task); + break; + } + } + + // All futures are pending on IO work, so select on the fds. + + FD_ZERO(&fds_read); + FD_ZERO(&fds_write); + FD_ZERO(&fds_excep); + + if (conn->read_waker) { + FD_SET(conn->fd, &fds_read); + } + if (conn->write_waker) { + FD_SET(conn->fd, &fds_write); + } + + int sel_ret = select(conn->fd + 1, &fds_read, &fds_write, &fds_excep, NULL); + + if (sel_ret < 0) { + printf("select() error\n"); + return 1; + } else { + if (FD_ISSET(conn->fd, &fds_read)) { + hyper_waker_wake(conn->read_waker); + conn->read_waker = NULL; + } + if (FD_ISSET(conn->fd, &fds_write)) { + hyper_waker_wake(conn->write_waker); + conn->write_waker = NULL; + } + } + + } + + return 0; + +fail: + if (err) { + printf("error code: %d\n", hyper_error_code(err)); + // grab the error details + char errbuf [256]; + size_t errlen = hyper_error_print(err, errbuf, sizeof(errbuf)); + printf("details: %.*s\n", (int) errlen, errbuf); + + // clean up the error + hyper_error_free(err); + } + return 1; +} diff --git a/capi/examples/upload.c b/capi/examples/upload.c new file mode 100644 index 0000000000..ed6f37a709 --- /dev/null +++ b/capi/examples/upload.c @@ -0,0 +1,386 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "hyper.h" + + +struct conn_data { + int fd; + hyper_waker *read_waker; + hyper_waker *write_waker; +}; + +static size_t read_cb(void *userdata, hyper_context *ctx, uint8_t *buf, size_t buf_len) { + struct conn_data *conn = (struct conn_data *)userdata; + ssize_t ret = read(conn->fd, buf, buf_len); + + if (ret < 0) { + int err = errno; + if (err == EAGAIN) { + // would block, register interest + if (conn->read_waker != NULL) { + hyper_waker_free(conn->read_waker); + } + conn->read_waker = hyper_context_waker(ctx); + return HYPER_IO_PENDING; + } else { + // kaboom + return HYPER_IO_ERROR; + } + } else { + return ret; + } +} + +static size_t write_cb(void *userdata, hyper_context *ctx, const uint8_t *buf, size_t buf_len) { + struct conn_data *conn = (struct conn_data *)userdata; + ssize_t ret = write(conn->fd, buf, buf_len); + + if (ret < 0) { + int err = errno; + if (err == EAGAIN) { + // would block, register interest + if (conn->write_waker != NULL) { + hyper_waker_free(conn->write_waker); + } + conn->write_waker = hyper_context_waker(ctx); + return HYPER_IO_PENDING; + } else { + // kaboom + return HYPER_IO_ERROR; + } + } else { + return ret; + } +} + +static void free_conn_data(struct conn_data *conn) { + if (conn->read_waker) { + hyper_waker_free(conn->read_waker); + conn->read_waker = NULL; + } + if (conn->write_waker) { + hyper_waker_free(conn->write_waker); + conn->write_waker = NULL; + } + + free(conn); +} + +static int connect_to(const char *host, const char *port) { + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + struct addrinfo *result, *rp; + if (getaddrinfo(host, port, &hints, &result) != 0) { + printf("dns failed for %s\n", host); + return -1; + } + + int sfd; + for (rp = result; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1) { + break; + } else { + close(sfd); + } + } + + freeaddrinfo(result); + + // no address succeeded + if (rp == NULL) { + printf("connect failed for %s\n", host); + return -1; + } + + return sfd; +} + +struct upload_body { + int fd; + char *buf; + size_t len; +}; + +static int poll_req_upload(void *userdata, + hyper_context *ctx, + hyper_buf **chunk) { + struct upload_body* upload = userdata; + + ssize_t res = read(upload->fd, upload->buf, upload->len); + if (res < 0) { + printf("error reading upload file: %d", errno); + return HYPER_POLL_ERROR; + } else if (res == 0) { + // All done! + *chunk = NULL; + return HYPER_POLL_READY; + } else { + *chunk = hyper_buf_copy(upload->buf, res); + return HYPER_POLL_READY; + } +} + +static int print_each_header(void *userdata, + const uint8_t *name, + size_t name_len, + const uint8_t *value, + size_t value_len) { + printf("%.*s: %.*s\n", (int) name_len, name, (int) value_len, value); + return HYPER_ITER_CONTINUE; +} + +typedef enum { + EXAMPLE_NOT_SET = 0, // tasks we don't know about won't have a userdata set + EXAMPLE_HANDSHAKE, + EXAMPLE_SEND, + EXAMPLE_RESP_BODY +} example_id; + +#define STR_ARG(XX) (uint8_t *)XX, strlen(XX) + +int main(int argc, char *argv[]) { + const char *file = argc > 1 ? argv[1] : NULL; + const char *host = argc > 2 ? argv[2] : "httpbin.org"; + const char *port = argc > 3 ? argv[3] : "80"; + const char *path = argc > 4 ? argv[4] : "/post"; + + if (!file) { + printf("Pass a file path as the first argument.\n"); + return 1; + } + + struct upload_body upload; + upload.fd = open(file, O_RDONLY); + + if (upload.fd < 0) { + printf("error opening file to upload: %d", errno); + return 1; + } + printf("connecting to port %s on %s...\n", port, host); + + int fd = connect_to(host, port); + if (fd < 0) { + return 1; + } + printf("connected to %s, now upload to %s\n", host, path); + + if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) { + printf("failed to set socket to non-blocking\n"); + return 1; + } + + upload.len = 8192; + upload.buf = malloc(upload.len); + + fd_set fds_read; + fd_set fds_write; + fd_set fds_excep; + + struct conn_data *conn = malloc(sizeof(struct conn_data)); + + conn->fd = fd; + conn->read_waker = NULL; + conn->write_waker = NULL; + + + // Hookup the IO + hyper_io *io = hyper_io_new(); + hyper_io_set_userdata(io, (void *)conn); + hyper_io_set_read(io, read_cb); + hyper_io_set_write(io, write_cb); + + printf("http handshake ...\n"); + + // We need an executor generally to poll futures + const hyper_executor *exec = hyper_executor_new(); + + // Prepare client options + hyper_clientconn_options *opts = hyper_clientconn_options_new(); + hyper_clientconn_options_exec(opts, exec); + + hyper_task *handshake = hyper_clientconn_handshake(io, opts); + hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE); + + // Let's wait for the handshake to finish... + hyper_executor_push(exec, handshake); + + // This body will get filled in eventually... + hyper_body *resp_body = NULL; + + // The polling state machine! + while (1) { + // Poll all ready tasks and act on them... + while (1) { + hyper_task *task = hyper_executor_poll(exec); + if (!task) { + break; + } + hyper_task_return_type task_type = hyper_task_type(task); + + switch ((example_id) hyper_task_userdata(task)) { + case EXAMPLE_HANDSHAKE: + ; + if (task_type == HYPER_TASK_ERROR) { + printf("handshake error!\n"); + return 1; + } + assert(task_type == HYPER_TASK_CLIENTCONN); + + printf("preparing http request ...\n"); + + hyper_clientconn *client = hyper_task_value(task); + hyper_task_free(task); + + // Prepare the request + hyper_request *req = hyper_request_new(); + if (hyper_request_set_method(req, STR_ARG("POST"))) { + printf("error setting method\n"); + return 1; + } + if (hyper_request_set_uri(req, STR_ARG(path))) { + printf("error setting uri\n"); + return 1; + } + + hyper_headers *req_headers = hyper_request_headers(req); + hyper_headers_set(req_headers, STR_ARG("host"), STR_ARG(host)); + + // Prepare the req body + hyper_body *body = hyper_body_new(); + hyper_body_set_userdata(body, &upload); + hyper_body_set_data_func(body, poll_req_upload); + hyper_request_set_body(req, body); + + // Send it! + hyper_task *send = hyper_clientconn_send(client, req); + hyper_task_set_userdata(send, (void *)EXAMPLE_SEND); + printf("sending ...\n"); + hyper_executor_push(exec, send); + + // For this example, no longer need the client + hyper_clientconn_free(client); + + break; + case EXAMPLE_SEND: + ; + if (task_type == HYPER_TASK_ERROR) { + printf("send error!\n"); + return 1; + } + assert(task_type == HYPER_TASK_RESPONSE); + + // Take the results + hyper_response *resp = hyper_task_value(task); + hyper_task_free(task); + + uint16_t http_status = hyper_response_status(resp); + + printf("\nResponse Status: %d\n", http_status); + + hyper_headers *headers = hyper_response_headers(resp); + hyper_headers_foreach(headers, print_each_header, NULL); + printf("\n"); + + resp_body = hyper_response_body(resp); + + // Set us up to peel data from the body a chunk at a time + hyper_task *body_data = hyper_body_data(resp_body); + hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY); + hyper_executor_push(exec, body_data); + + // No longer need the response + hyper_response_free(resp); + + break; + case EXAMPLE_RESP_BODY: + ; + if (task_type == HYPER_TASK_ERROR) { + printf("body error!\n"); + return 1; + } + + if (task_type == HYPER_TASK_BUF) { + hyper_buf *chunk = hyper_task_value(task); + write(1, hyper_buf_bytes(chunk), hyper_buf_len(chunk)); + hyper_buf_free(chunk); + hyper_task_free(task); + + hyper_task *body_data = hyper_body_data(resp_body); + hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY); + hyper_executor_push(exec, body_data); + + break; + } else { + assert(task_type == HYPER_TASK_EMPTY); + hyper_task_free(task); + hyper_body_free(resp_body); + + printf("\n -- Done! -- \n"); + + // Cleaning up before exiting + hyper_executor_free(exec); + free_conn_data(conn); + free(upload.buf); + + return 0; + } + case EXAMPLE_NOT_SET: + // A background task for hyper completed... + hyper_task_free(task); + break; + } + } + + // All futures are pending on IO work, so select on the fds. + + FD_ZERO(&fds_read); + FD_ZERO(&fds_write); + FD_ZERO(&fds_excep); + + if (conn->read_waker) { + FD_SET(conn->fd, &fds_read); + } + if (conn->write_waker) { + FD_SET(conn->fd, &fds_write); + } + + int sel_ret = select(conn->fd + 1, &fds_read, &fds_write, &fds_excep, NULL); + + if (sel_ret < 0) { + printf("select() error\n"); + return 1; + } else { + if (FD_ISSET(conn->fd, &fds_read)) { + hyper_waker_wake(conn->read_waker); + conn->read_waker = NULL; + } + if (FD_ISSET(conn->fd, &fds_write)) { + hyper_waker_wake(conn->write_waker); + conn->write_waker = NULL; + } + } + + } + + + return 0; +} diff --git a/capi/gen_header.sh b/capi/gen_header.sh new file mode 100755 index 0000000000..4cd1a26c23 --- /dev/null +++ b/capi/gen_header.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash + +CAPI_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +WORK_DIR=`mktemp -d` + + +# check if tmp dir was created +if [[ ! "$WORK_DIR" || ! -d "$WORK_DIR" ]]; then + echo "Could not create temp dir" + exit 1 +fi + +header_file_backup="$CAPI_DIR/include/hyper.h.backup" + +function cleanup { + #echo "$WORK_DIR" + rm -rf "$WORK_DIR" + rm "$header_file_backup" +} + +trap cleanup EXIT + +mkdir "$WORK_DIR/src" + +# Fake a library +cat > "$WORK_DIR/src/lib.rs" << EOF +#[path = "$CAPI_DIR/../src/ffi/mod.rs"] +pub mod ffi; +EOF + +# And its Cargo.toml +cat > "$WORK_DIR/Cargo.toml" << EOF +[package] +name = "hyper" +version = "0.0.0" +edition = "2018" +publish = false + +[dependencies] +EOF + +cp "$CAPI_DIR/include/hyper.h" "$header_file_backup" + +#cargo metadata --no-default-features --features ffi --format-version 1 > "$WORK_DIR/metadata.json" + +cd $WORK_DIR + +# Expand just the ffi module +cargo rustc -- -Z unstable-options --pretty=expanded > expanded.rs 2>/dev/null + +# Replace the previous copy with the single expanded file +rm -rf ./src +mkdir src +mv expanded.rs src/lib.rs + + +# Bindgen! +cbindgen\ + -c "$CAPI_DIR/cbindgen.toml"\ + --lockfile "$CAPI_DIR/../Cargo.lock"\ + -o "$CAPI_DIR/include/hyper.h"\ + $1 + +bindgen_exit_code=$? + +if [[ "--verify" == "$1" && "$bindgen_exit_code" != 0 ]]; then + echo "diff generated (<) vs backup (>)" + diff "$CAPI_DIR/include/hyper.h" "$header_file_backup" +fi + +exit $bindgen_exit_code diff --git a/capi/include/hyper.h b/capi/include/hyper.h new file mode 100644 index 0000000000..f2a6f8dbfb --- /dev/null +++ b/capi/include/hyper.h @@ -0,0 +1,554 @@ +#ifndef _HYPER_H +#define _HYPER_H + +#include +#include + +#define HYPER_ITER_CONTINUE 0 + +#define HYPER_ITER_BREAK 1 + +#define HYPER_HTTP_VERSION_NONE 0 + +#define HYPER_HTTP_VERSION_1_0 10 + +#define HYPER_HTTP_VERSION_1_1 11 + +#define HYPER_HTTP_VERSION_2 20 + +#define HYPER_IO_PENDING 4294967295 + +#define HYPER_IO_ERROR 4294967294 + +#define HYPER_POLL_READY 0 + +#define HYPER_POLL_PENDING 1 + +#define HYPER_POLL_ERROR 3 + +typedef enum { + /* + All is well. + */ + HYPERE_OK, + /* + General error, details in the `hyper_error *`. + */ + HYPERE_ERROR, + /* + A function argument was invalid. + */ + HYPERE_INVALID_ARG, + /* + The IO transport returned an EOF when one wasn't expected. + + This typically means an HTTP request or response was expected, but the + connection closed cleanly without sending (all of) it. + */ + HYPERE_UNEXPECTED_EOF, + /* + Aborted by a user supplied callback. + */ + HYPERE_ABORTED_BY_CALLBACK, + /* + An optional hyper feature was not enabled. + */ + HYPERE_FEATURE_NOT_ENABLED, +} hyper_code; + +typedef enum { + /* + The value of this task is null (does not imply an error). + */ + HYPER_TASK_EMPTY, + /* + The value of this task is `hyper_error *`. + */ + HYPER_TASK_ERROR, + /* + The value of this task is `hyper_clientconn *`. + */ + HYPER_TASK_CLIENTCONN, + /* + The value of this task is `hyper_response *`. + */ + HYPER_TASK_RESPONSE, + /* + The value of this task is `hyper_buf *`. + */ + HYPER_TASK_BUF, +} hyper_task_return_type; + +typedef struct hyper_executor hyper_executor; + +typedef struct hyper_io hyper_io; + +typedef struct hyper_task hyper_task; + +typedef struct hyper_body hyper_body; + +typedef struct hyper_buf hyper_buf; + +typedef struct hyper_clientconn hyper_clientconn; + +typedef struct hyper_clientconn_options hyper_clientconn_options; + +typedef struct hyper_context hyper_context; + +typedef struct hyper_error hyper_error; + +typedef struct hyper_headers hyper_headers; + +typedef struct hyper_request hyper_request; + +typedef struct hyper_response hyper_response; + +typedef struct hyper_waker hyper_waker; + +typedef int (*hyper_body_foreach_callback)(void*, const hyper_buf*); + +typedef int (*hyper_body_data_callback)(void*, hyper_context*, hyper_buf**); + +typedef int (*hyper_headers_foreach_callback)(void*, const uint8_t*, size_t, const uint8_t*, size_t); + +typedef size_t (*hyper_io_read_callback)(void*, hyper_context*, uint8_t*, size_t); + +typedef size_t (*hyper_io_write_callback)(void*, hyper_context*, const uint8_t*, size_t); + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + +/* + Returns a static ASCII (null terminated) string of the hyper version. + */ +const char *hyper_version(void); + +/* + Create a new "empty" body. + + If not configured, this body acts as an empty payload. + */ +hyper_body *hyper_body_new(void); + +/* + Free a `hyper_body *`. + */ +void hyper_body_free(hyper_body *body); + +/* + Return a task that will poll the body for the next buffer of data. + + The task value may have different types depending on the outcome: + + - `HYPER_TASK_BUF`: Success, and more data was received. + - `HYPER_TASK_ERROR`: An error retrieving the data. + - `HYPER_TASK_EMPTY`: The body has finished streaming data. + + This does not consume the `hyper_body *`, so it may be used to again. + However, it MUST NOT be used or freed until the related task completes. + */ +hyper_task *hyper_body_data(hyper_body *body); + +/* + Return a task that will poll the body and execute the callback with each + body chunk that is received. + + The `hyper_buf` pointer is only a borrowed reference, it cannot live outside + the execution of the callback. You must make a copy to retain it. + + The callback should return `HYPER_ITER_CONTINUE` to continue iterating + chunks as they are received, or `HYPER_ITER_BREAK` to cancel. + + This will consume the `hyper_body *`, you shouldn't use it anymore or free it. + */ +hyper_task *hyper_body_foreach(hyper_body *body, hyper_body_foreach_callback func, void *userdata); + +/* + Set userdata on this body, which will be passed to callback functions. + */ +void hyper_body_set_userdata(hyper_body *body, void *userdata); + +/* + Set the data callback for this body. + + The callback is called each time hyper needs to send more data for the + body. It is passed the value from `hyper_body_set_userdata`. + + If there is data available, the `hyper_buf **` argument should be set + to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should + be returned. + + Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points + to `NULL` will indicate the body has completed all data. + + If there is more data to send, but it isn't yet available, a + `hyper_waker` should be saved from the `hyper_context *` argument, and + `HYPER_POLL_PENDING` should be returned. You must wake the saved waker + to signal the task when data is available. + + If some error has occurred, you can return `HYPER_POLL_ERROR` to abort + the body. + */ +void hyper_body_set_data_func(hyper_body *body, hyper_body_data_callback func); + +/* + Create a new `hyper_buf *` by copying the provided bytes. + + This makes an owned copy of the bytes, so the `buf` argument can be + freed or changed afterwards. + */ +hyper_buf *hyper_buf_copy(const uint8_t *buf, size_t len); + +/* + Get a pointer to the bytes in this buffer. + + This should be used in conjunction with `hyper_buf_len` to get the length + of the bytes data. + + This pointer is borrowed data, and not valid once the `hyper_buf` is + consumed/freed. + */ +const uint8_t *hyper_buf_bytes(const hyper_buf *buf); + +/* + Get the length of the bytes this buffer contains. + */ +size_t hyper_buf_len(const hyper_buf *buf); + +/* + Free this buffer. + */ +void hyper_buf_free(hyper_buf *buf); + +/* + Starts an HTTP client connection handshake using the provided IO transport + and options. + + Both the `io` and the `options` are consumed in this function call. + + The returned `hyper_task *` must be polled with an executor until the + handshake completes, at which point the value can be taken. + */ +hyper_task *hyper_clientconn_handshake(hyper_io *io, hyper_clientconn_options *options); + +/* + Send a request on the client connection. + + Returns a task that needs to be polled until it is ready. When ready, the + task yields a `hyper_response *`. + */ +hyper_task *hyper_clientconn_send(hyper_clientconn *conn, hyper_request *req); + +/* + Free a `hyper_clientconn *`. + */ +void hyper_clientconn_free(hyper_clientconn *conn); + +/* + Creates a new set of HTTP clientconn options to be used in a handshake. + */ +hyper_clientconn_options *hyper_clientconn_options_new(void); + +/* + Free a `hyper_clientconn_options *`. + */ +void hyper_clientconn_options_free(hyper_clientconn_options *opts); + +/* + Set the client background task executor. + + This does not consume the `options` or the `exec`. + */ +void hyper_clientconn_options_exec(hyper_clientconn_options *opts, const hyper_executor *exec); + +/* + Set the whether to use HTTP2. + + Pass `0` to disable, `1` to enable. + */ +hyper_code hyper_clientconn_options_http2(hyper_clientconn_options *opts, int enabled); + +/* + Frees a `hyper_error`. + */ +void hyper_error_free(hyper_error *err); + +/* + Get an equivalent `hyper_code` from this error. + */ +hyper_code hyper_error_code(const hyper_error *err); + +/* + Print the details of this error to a buffer. + + The `dst_len` value must be the maximum length that the buffer can + store. + + The return value is number of bytes that were written to `dst`. + */ +size_t hyper_error_print(const hyper_error *err, uint8_t *dst, size_t dst_len); + +/* + Construct a new HTTP request. + */ +hyper_request *hyper_request_new(void); + +/* + Free an HTTP request if not going to send it on a client. + */ +void hyper_request_free(hyper_request *req); + +/* + Set the HTTP Method of the request. + */ +hyper_code hyper_request_set_method(hyper_request *req, const uint8_t *method, size_t method_len); + +/* + Set the URI of the request. + */ +hyper_code hyper_request_set_uri(hyper_request *req, const uint8_t *uri, size_t uri_len); + +/* + Set the preferred HTTP version of the request. + + The version value should be one of the `HYPER_HTTP_VERSION_` constants. + + Note that this won't change the major HTTP version of the connection, + since that is determined at the handshake step. + */ +hyper_code hyper_request_set_version(hyper_request *req, int version); + +/* + Gets a reference to the HTTP headers of this request + + This is not an owned reference, so it should not be accessed after the + `hyper_request` has been consumed. + */ +hyper_headers *hyper_request_headers(hyper_request *req); + +/* + Set the body of the request. + + The default is an empty body. + + This takes ownership of the `hyper_body *`, you must not use it or + free it after setting it on the request. + */ +hyper_code hyper_request_set_body(hyper_request *req, hyper_body *body); + +/* + Free an HTTP response after using it. + */ +void hyper_response_free(hyper_response *resp); + +/* + Get the HTTP-Status code of this response. + + It will always be within the range of 100-599. + */ +uint16_t hyper_response_status(const hyper_response *resp); + +/* + Get the HTTP version used by this response. + + The returned value could be: + + - `HYPER_HTTP_VERSION_1_0` + - `HYPER_HTTP_VERSION_1_1` + - `HYPER_HTTP_VERSION_2` + - `HYPER_HTTP_VERSION_NONE` if newer (or older). + */ +int hyper_response_version(const hyper_response *resp); + +/* + Gets a reference to the HTTP headers of this response. + + This is not an owned reference, so it should not be accessed after the + `hyper_response` has been freed. + */ +hyper_headers *hyper_response_headers(hyper_response *resp); + +/* + Take ownership of the body of this response. + + It is safe to free the response even after taking ownership of its body. + */ +hyper_body *hyper_response_body(hyper_response *resp); + +/* + Iterates the headers passing each name and value pair to the callback. + + The `userdata` pointer is also passed to the callback. + + The callback should return `HYPER_ITER_CONTINUE` to keep iterating, or + `HYPER_ITER_BREAK` to stop. + */ +void hyper_headers_foreach(const hyper_headers *headers, + hyper_headers_foreach_callback func, + void *userdata); + +/* + Sets the header with the provided name to the provided value. + + This overwrites any previous value set for the header. + */ +hyper_code hyper_headers_set(hyper_headers *headers, + const uint8_t *name, + size_t name_len, + const uint8_t *value, + size_t value_len); + +/* + Adds the provided value to the list of the provided name. + + If there were already existing values for the name, this will append the + new value to the internal list. + */ +hyper_code hyper_headers_add(hyper_headers *headers, + const uint8_t *name, + size_t name_len, + const uint8_t *value, + size_t value_len); + +/* + Create a new IO type used to represent a transport. + + The read and write functions of this transport should be set with + `hyper_io_set_read` and `hyper_io_set_write`. + */ +hyper_io *hyper_io_new(void); + +/* + Free an unused `hyper_io *`. + + This is typically only useful if you aren't going to pass ownership + of the IO handle to hyper, such as with `hyper_clientconn_handshake()`. + */ +void hyper_io_free(hyper_io *io); + +/* + Set the user data pointer for this IO to some value. + + This value is passed as an argument to the read and write callbacks. + */ +void hyper_io_set_userdata(hyper_io *io, void *data); + +/* + Set the read function for this IO transport. + + Data that is read from the transport should be put in the `buf` pointer, + up to `buf_len` bytes. The number of bytes read should be the return value. + + It is undefined behavior to try to access the bytes in the `buf` pointer, + unless you have already written them yourself. It is also undefined behavior + to return that more bytes have been written than actually set on the `buf`. + + If there is no data currently available, a waker should be claimed from + the `ctx` and registered with whatever polling mechanism is used to signal + when data is available later on. The return value should be + `HYPER_IO_PENDING`. + + If there is an irrecoverable error reading data, then `HYPER_IO_ERROR` + should be the return value. + */ +void hyper_io_set_read(hyper_io *io, hyper_io_read_callback func); + +/* + Set the write function for this IO transport. + + Data from the `buf` pointer should be written to the transport, up to + `buf_len` bytes. The number of bytes written should be the return value. + + If no data can currently be written, the `waker` should be cloned and + registered with whatever polling mechanism is used to signal when data + is available later on. The return value should be `HYPER_IO_PENDING`. + + Yeet. + + If there is an irrecoverable error reading data, then `HYPER_IO_ERROR` + should be the return value. + */ +void hyper_io_set_write(hyper_io *io, hyper_io_write_callback func); + +/* + Creates a new task executor. + */ +const hyper_executor *hyper_executor_new(void); + +/* + Frees an executor and any incomplete tasks still part of it. + */ +void hyper_executor_free(const hyper_executor *exec); + +/* + Push a task onto the executor. + + The executor takes ownership of the task, it should not be accessed + again unless returned back to the user with `hyper_executor_poll`. + */ +hyper_code hyper_executor_push(const hyper_executor *exec, hyper_task *task); + +/* + Polls the executor, trying to make progress on any tasks that have notified + that they are ready again. + + If ready, returns a task from the executor that has completed. + + If there are no ready tasks, this returns `NULL`. + */ +hyper_task *hyper_executor_poll(const hyper_executor *exec); + +/* + Free a task. + */ +void hyper_task_free(hyper_task *task); + +/* + Takes the output value of this task. + + This must only be called once polling the task on an executor has finished + this task. + + Use `hyper_task_type` to determine the type of the `void *` return value. + */ +void *hyper_task_value(hyper_task *task); + +/* + Query the return type of this task. + */ +hyper_task_return_type hyper_task_type(hyper_task *task); + +/* + Set a user data pointer to be associated with this task. + + This value will be passed to task callbacks, and can be checked later + with `hyper_task_userdata`. + */ +void hyper_task_set_userdata(hyper_task *task, void *userdata); + +/* + Retrieve the userdata that has been set via `hyper_task_set_userdata`. + */ +void *hyper_task_userdata(hyper_task *task); + +/* + Copies a waker out of the task context. + */ +hyper_waker *hyper_context_waker(hyper_context *cx); + +/* + Free a waker that hasn't been woken. + */ +void hyper_waker_free(hyper_waker *waker); + +/* + Free a waker that hasn't been woken. + */ +void hyper_waker_wake(hyper_waker *waker); + +#ifdef __cplusplus +} // extern "C" +#endif // __cplusplus + +#endif /* _HYPER_H */ diff --git a/src/body/body.rs b/src/body/body.rs index 4a1d6210bc..e50e9f123e 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -51,6 +51,8 @@ enum Kind { content_length: DecodedLength, recv: h2::RecvStream, }, + #[cfg(feature = "ffi")] + Ffi(crate::ffi::UserBody), #[cfg(feature = "stream")] Wrapped( SyncWrapper< @@ -260,6 +262,21 @@ impl Body { } } + #[cfg(feature = "ffi")] + pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { + match self.kind { + Kind::Ffi(ref mut body) => return body, + _ => { + self.kind = Kind::Ffi(crate::ffi::UserBody::new()); + } + } + + match self.kind { + Kind::Ffi(ref mut body) => body, + _ => unreachable!(), + } + } + fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll>> { match self.kind { Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), @@ -294,6 +311,9 @@ impl Body { None => Poll::Ready(None), }, + #[cfg(feature = "ffi")] + Kind::Ffi(ref mut body) => body.poll_data(cx), + #[cfg(feature = "stream")] Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), @@ -348,6 +368,10 @@ impl HttpBody for Body { } Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), }, + + #[cfg(feature = "ffi")] + Kind::Ffi(ref mut body) => body.poll_trailers(cx), + _ => Poll::Ready(Ok(None)), } } @@ -358,6 +382,8 @@ impl HttpBody for Body { Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), + #[cfg(feature = "ffi")] + Kind::Ffi(..) => false, #[cfg(feature = "stream")] Kind::Wrapped(..) => false, } @@ -384,6 +410,8 @@ impl HttpBody for Body { Kind::Chan { content_length, .. } => opt_len!(content_length), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { content_length, .. } => opt_len!(content_length), + #[cfg(feature = "ffi")] + Kind::Ffi(..) => SizeHint::default(), } } } diff --git a/src/error.rs b/src/error.rs index 16bacd163e..68b042f0c8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -116,6 +116,10 @@ pub(crate) enum User { /// User polled for an upgrade, but low-level API is not using upgrades. #[cfg(feature = "http1")] ManualUpgrade, + + /// User aborted in an FFI callback. + #[cfg(feature = "ffi")] + AbortedByCallback, } // Sentinel type to indicate the error was caused by a timeout. @@ -179,8 +183,7 @@ impl Error { self } - #[cfg(feature = "http1")] - #[cfg(feature = "server")] + #[cfg(any(all(feature = "http1", feature = "server"), feature = "ffi"))] pub(crate) fn kind(&self) -> &Kind { &self.inner.kind } @@ -336,6 +339,11 @@ impl Error { Error::new(Kind::Shutdown).with(cause) } + #[cfg(feature = "ffi")] + pub(crate) fn new_user_aborted_by_callback() -> Error { + Error::new_user(User::AbortedByCallback) + } + #[cfg(feature = "http2")] pub(crate) fn new_h2(cause: ::h2::Error) -> Error { if cause.is_io() { @@ -406,6 +414,8 @@ impl Error { Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(feature = "http1")] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", + #[cfg(feature = "ffi")] + Kind::User(User::AbortedByCallback) => "operation aborted by an application callback", } } } diff --git a/src/ffi/body.rs b/src/ffi/body.rs new file mode 100644 index 0000000000..1c8f1a48c0 --- /dev/null +++ b/src/ffi/body.rs @@ -0,0 +1,233 @@ +use std::ffi::c_void; +use std::mem::ManuallyDrop; +use std::ptr; +use std::task::{Context, Poll}; + +use http::HeaderMap; +use libc::{c_int, size_t}; + +use super::task::{hyper_context, hyper_task_return_type, AsTaskType, Task}; +use super::{UserDataPointer, HYPER_ITER_CONTINUE}; +use crate::body::{Body, Bytes, HttpBody as _}; + +pub struct hyper_body(pub(super) Body); + +pub struct hyper_buf(pub(super) Bytes); + +pub(crate) struct UserBody { + data_func: hyper_body_data_callback, + userdata: *mut c_void, +} + +// ===== Body ===== + +type hyper_body_foreach_callback = extern "C" fn(*mut c_void, *const hyper_buf) -> c_int; + +type hyper_body_data_callback = + extern "C" fn(*mut c_void, *mut hyper_context, *mut *mut hyper_buf) -> c_int; + +ffi_fn! { + /// Create a new "empty" body. + /// + /// If not configured, this body acts as an empty payload. + fn hyper_body_new() -> *mut hyper_body { + Box::into_raw(Box::new(hyper_body(Body::empty()))) + } +} + +ffi_fn! { + /// Free a `hyper_body *`. + fn hyper_body_free(body: *mut hyper_body) { + if body.is_null() { + return; + } + + drop(unsafe { Box::from_raw(body) }); + } +} + +ffi_fn! { + /// Return a task that will poll the body for the next buffer of data. + /// + /// The task value may have different types depending on the outcome: + /// + /// - `HYPER_TASK_BUF`: Success, and more data was received. + /// - `HYPER_TASK_ERROR`: An error retrieving the data. + /// - `HYPER_TASK_EMPTY`: The body has finished streaming data. + /// + /// This does not consume the `hyper_body *`, so it may be used to again. + /// However, it MUST NOT be used or freed until the related task completes. + fn hyper_body_data(body: *mut hyper_body) -> *mut Task { + // This doesn't take ownership of the Body, so don't allow destructor + let mut body = ManuallyDrop::new(unsafe { Box::from_raw(body) }); + + Box::into_raw(Task::boxed(async move { + body.0.data().await.map(|res| res.map(hyper_buf)) + })) + } +} + +ffi_fn! { + /// Return a task that will poll the body and execute the callback with each + /// body chunk that is received. + /// + /// The `hyper_buf` pointer is only a borrowed reference, it cannot live outside + /// the execution of the callback. You must make a copy to retain it. + /// + /// The callback should return `HYPER_ITER_CONTINUE` to continue iterating + /// chunks as they are received, or `HYPER_ITER_BREAK` to cancel. + /// + /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it. + fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut Task { + if body.is_null() { + return ptr::null_mut(); + } + + let mut body = unsafe { Box::from_raw(body) }; + let userdata = UserDataPointer(userdata); + + Box::into_raw(Task::boxed(async move { + while let Some(item) = body.0.data().await { + let chunk = item?; + if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) { + return Err(crate::Error::new_user_aborted_by_callback()); + } + } + Ok(()) + })) + } +} + +ffi_fn! { + /// Set userdata on this body, which will be passed to callback functions. + fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) { + let b = unsafe { &mut *body }; + b.0.as_ffi_mut().userdata = userdata; + } +} + +ffi_fn! { + /// Set the data callback for this body. + /// + /// The callback is called each time hyper needs to send more data for the + /// body. It is passed the value from `hyper_body_set_userdata`. + /// + /// If there is data available, the `hyper_buf **` argument should be set + /// to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should + /// be returned. + /// + /// Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points + /// to `NULL` will indicate the body has completed all data. + /// + /// If there is more data to send, but it isn't yet available, a + /// `hyper_waker` should be saved from the `hyper_context *` argument, and + /// `HYPER_POLL_PENDING` should be returned. You must wake the saved waker + /// to signal the task when data is available. + /// + /// If some error has occurred, you can return `HYPER_POLL_ERROR` to abort + /// the body. + fn hyper_body_set_data_func(body: *mut hyper_body, func: hyper_body_data_callback) { + let b = unsafe { &mut *body }; + b.0.as_ffi_mut().data_func = func; + } +} + +// ===== impl UserBody ===== + +impl UserBody { + pub(crate) fn new() -> UserBody { + UserBody { + data_func: data_noop, + userdata: std::ptr::null_mut(), + } + } + + pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll>> { + let mut out = std::ptr::null_mut(); + match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) { + super::task::HYPER_POLL_READY => { + if out.is_null() { + Poll::Ready(None) + } else { + let buf = unsafe { Box::from_raw(out) }; + Poll::Ready(Some(Ok(buf.0))) + } + } + super::task::HYPER_POLL_PENDING => Poll::Pending, + super::task::HYPER_POLL_ERROR => { + Poll::Ready(Some(Err(crate::Error::new_body_write_aborted()))) + } + unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!( + "unexpected hyper_body_data_func return code {}", + unexpected + ))))), + } + } + + pub(crate) fn poll_trailers( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll>> { + Poll::Ready(Ok(None)) + } +} + +/// cbindgen:ignore +extern "C" fn data_noop( + _userdata: *mut c_void, + _: *mut hyper_context<'_>, + _: *mut *mut hyper_buf, +) -> c_int { + super::task::HYPER_POLL_READY +} + +unsafe impl Send for UserBody {} +unsafe impl Sync for UserBody {} + +// ===== Bytes ===== + +ffi_fn! { + /// Create a new `hyper_buf *` by copying the provided bytes. + /// + /// This makes an owned copy of the bytes, so the `buf` argument can be + /// freed or changed afterwards. + fn hyper_buf_copy(buf: *const u8, len: size_t) -> *mut hyper_buf { + let slice = unsafe { + std::slice::from_raw_parts(buf, len) + }; + Box::into_raw(Box::new(hyper_buf(Bytes::copy_from_slice(slice)))) + } +} + +ffi_fn! { + /// Get a pointer to the bytes in this buffer. + /// + /// This should be used in conjunction with `hyper_buf_len` to get the length + /// of the bytes data. + /// + /// This pointer is borrowed data, and not valid once the `hyper_buf` is + /// consumed/freed. + fn hyper_buf_bytes(buf: *const hyper_buf) -> *const u8 { + unsafe { (*buf).0.as_ptr() } + } +} + +ffi_fn! { + /// Get the length of the bytes this buffer contains. + fn hyper_buf_len(buf: *const hyper_buf) -> size_t { + unsafe { (*buf).0.len() } + } +} + +ffi_fn! { + /// Free this buffer. + fn hyper_buf_free(buf: *mut hyper_buf) { + drop(unsafe { Box::from_raw(buf) }); + } +} + +unsafe impl AsTaskType for hyper_buf { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_BUF + } +} diff --git a/src/ffi/client.rs b/src/ffi/client.rs new file mode 100644 index 0000000000..2c2ef6b262 --- /dev/null +++ b/src/ffi/client.rs @@ -0,0 +1,148 @@ +use std::sync::Arc; + +use libc::c_int; + +use crate::client::conn; +use crate::rt::Executor as _; + +use super::error::hyper_code; +use super::http_types::{hyper_request, hyper_response}; +use super::io::Io; +use super::task::{hyper_task_return_type, AsTaskType, Exec, Task, WeakExec}; + +pub struct hyper_clientconn_options { + builder: conn::Builder, + /// Use a `Weak` to prevent cycles. + exec: WeakExec, +} + +pub struct hyper_clientconn { + tx: conn::SendRequest, +} + +// ===== impl hyper_clientconn ===== + +ffi_fn! { + /// Starts an HTTP client connection handshake using the provided IO transport + /// and options. + /// + /// Both the `io` and the `options` are consumed in this function call. + /// + /// The returned `hyper_task *` must be polled with an executor until the + /// handshake completes, at which point the value can be taken. + fn hyper_clientconn_handshake(io: *mut Io, options: *mut hyper_clientconn_options) -> *mut Task { + if io.is_null() { + return std::ptr::null_mut(); + } + if options.is_null() { + return std::ptr::null_mut(); + } + + let options = unsafe { Box::from_raw(options) }; + let io = unsafe { Box::from_raw(io) }; + + Box::into_raw(Task::boxed(async move { + options.builder.handshake::<_, crate::Body>(io) + .await + .map(|(tx, conn)| { + options.exec.execute(Box::pin(async move { + let _ = conn.await; + })); + hyper_clientconn { tx } + }) + })) + } +} + +ffi_fn! { + /// Send a request on the client connection. + /// + /// Returns a task that needs to be polled until it is ready. When ready, the + /// task yields a `hyper_response *`. + fn hyper_clientconn_send(conn: *mut hyper_clientconn, req: *mut hyper_request) -> *mut Task { + if conn.is_null() { + return std::ptr::null_mut(); + } + if req.is_null() { + return std::ptr::null_mut(); + } + + let req = unsafe { Box::from_raw(req) }; + let fut = unsafe { &mut *conn }.tx.send_request(req.0); + + let fut = async move { + fut.await.map(hyper_response) + }; + + Box::into_raw(Task::boxed(fut)) + } +} + +ffi_fn! { + /// Free a `hyper_clientconn *`. + fn hyper_clientconn_free(conn: *mut hyper_clientconn) { + drop(unsafe { Box::from_raw(conn) }); + } +} + +unsafe impl AsTaskType for hyper_clientconn { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_CLIENTCONN + } +} + +// ===== impl hyper_clientconn_options ===== + +ffi_fn! { + /// Creates a new set of HTTP clientconn options to be used in a handshake. + fn hyper_clientconn_options_new() -> *mut hyper_clientconn_options { + Box::into_raw(Box::new(hyper_clientconn_options { + builder: conn::Builder::new(), + exec: WeakExec::new(), + })) + } +} + +ffi_fn! { + /// Free a `hyper_clientconn_options *`. + fn hyper_clientconn_options_free(opts: *mut hyper_clientconn_options) { + drop(unsafe { Box::from_raw(opts) }); + } +} + +ffi_fn! { + /// Set the client background task executor. + /// + /// This does not consume the `options` or the `exec`. + fn hyper_clientconn_options_exec(opts: *mut hyper_clientconn_options, exec: *const Exec) { + let opts = unsafe { &mut *opts }; + + let exec = unsafe { Arc::from_raw(exec) }; + let weak_exec = Exec::downgrade(&exec); + std::mem::forget(exec); + + opts.builder.executor(weak_exec.clone()); + opts.exec = weak_exec; + } +} + +ffi_fn! { + /// Set the whether to use HTTP2. + /// + /// Pass `0` to disable, `1` to enable. + fn hyper_clientconn_options_http2(opts: *mut hyper_clientconn_options, enabled: c_int) -> hyper_code { + #[cfg(feature = "http2")] + { + let opts = unsafe { &mut *opts }; + opts.builder.http2_only(enabled != 0); + hyper_code::HYPERE_OK + } + + #[cfg(not(feature = "http2"))] + { + drop(opts); + drop(enabled); + hyper_code::HYPERE_FEATURE_NOT_ENABLED + } + } +} diff --git a/src/ffi/error.rs b/src/ffi/error.rs new file mode 100644 index 0000000000..8cd672fe1e --- /dev/null +++ b/src/ffi/error.rs @@ -0,0 +1,80 @@ +use libc::size_t; + +pub struct hyper_error(crate::Error); + +#[repr(C)] +pub enum hyper_code { + /// All is well. + HYPERE_OK, + /// General error, details in the `hyper_error *`. + HYPERE_ERROR, + /// A function argument was invalid. + HYPERE_INVALID_ARG, + /// The IO transport returned an EOF when one wasn't expected. + /// + /// This typically means an HTTP request or response was expected, but the + /// connection closed cleanly without sending (all of) it. + HYPERE_UNEXPECTED_EOF, + /// Aborted by a user supplied callback. + HYPERE_ABORTED_BY_CALLBACK, + /// An optional hyper feature was not enabled. + #[cfg_attr(feature = "http2", allow(unused))] + HYPERE_FEATURE_NOT_ENABLED, +} + +// ===== impl hyper_error ===== + +impl hyper_error { + fn code(&self) -> hyper_code { + use crate::error::Kind as ErrorKind; + use crate::error::User; + + match self.0.kind() { + ErrorKind::IncompleteMessage => hyper_code::HYPERE_UNEXPECTED_EOF, + ErrorKind::User(User::AbortedByCallback) => hyper_code::HYPERE_ABORTED_BY_CALLBACK, + // TODO: add more variants + _ => hyper_code::HYPERE_ERROR + } + } + + fn print_to(&self, dst: &mut [u8]) -> usize { + use std::io::Write; + + let mut dst = std::io::Cursor::new(dst); + + // A write! error doesn't matter. As much as possible will have been + // written, and the Cursor position will know how far that is (even + // if that is zero). + let _ = write!(dst, "{}", &self.0); + dst.position() as usize + } +} + +ffi_fn! { + /// Frees a `hyper_error`. + fn hyper_error_free(err: *mut hyper_error) { + drop(unsafe { Box::from_raw(err) }); + } +} + +ffi_fn! { + /// Get an equivalent `hyper_code` from this error. + fn hyper_error_code(err: *const hyper_error) -> hyper_code { + unsafe { &*err }.code() + } +} + +ffi_fn! { + /// Print the details of this error to a buffer. + /// + /// The `dst_len` value must be the maximum length that the buffer can + /// store. + /// + /// The return value is number of bytes that were written to `dst`. + fn hyper_error_print(err: *const hyper_error, dst: *mut u8, dst_len: size_t) -> size_t { + let dst = unsafe { + std::slice::from_raw_parts_mut(dst, dst_len) + }; + unsafe { &*err }.print_to(dst) + } +} diff --git a/src/ffi/http_types.rs b/src/ffi/http_types.rs new file mode 100644 index 0000000000..49e2027cac --- /dev/null +++ b/src/ffi/http_types.rs @@ -0,0 +1,267 @@ +use libc::{c_int, size_t}; +use std::ffi::c_void; + +use super::body::hyper_body; +use super::error::hyper_code; +use super::task::{hyper_task_return_type, AsTaskType}; +use super::HYPER_ITER_CONTINUE; +use crate::header::{HeaderName, HeaderValue}; +use crate::{Body, HeaderMap, Method, Request, Response, Uri}; + +// ===== impl Request ===== + +pub struct hyper_request(pub(super) Request); + +pub struct hyper_response(pub(super) Response); + +pub struct hyper_headers(pub(super) HeaderMap); + +ffi_fn! { + /// Construct a new HTTP request. + fn hyper_request_new() -> *mut hyper_request { + Box::into_raw(Box::new(hyper_request(Request::new(Body::empty())))) + } +} + +ffi_fn! { + /// Free an HTTP request if not going to send it on a client. + fn hyper_request_free(req: *mut hyper_request) { + drop(unsafe { Box::from_raw(req) }); + } +} + +ffi_fn! { + /// Set the HTTP Method of the request. + fn hyper_request_set_method(req: *mut hyper_request, method: *const u8, method_len: size_t) -> hyper_code { + let bytes = unsafe { + std::slice::from_raw_parts(method, method_len as usize) + }; + match Method::from_bytes(bytes) { + Ok(m) => { + *unsafe { &mut *req }.0.method_mut() = m; + hyper_code::HYPERE_OK + }, + Err(_) => { + hyper_code::HYPERE_INVALID_ARG + } + } + } +} + +ffi_fn! { + /// Set the URI of the request. + fn hyper_request_set_uri(req: *mut hyper_request, uri: *const u8, uri_len: size_t) -> hyper_code { + let bytes = unsafe { + std::slice::from_raw_parts(uri, uri_len as usize) + }; + match Uri::from_maybe_shared(bytes) { + Ok(u) => { + *unsafe { &mut *req }.0.uri_mut() = u; + hyper_code::HYPERE_OK + }, + Err(_) => { + hyper_code::HYPERE_INVALID_ARG + } + } + } +} + +ffi_fn! { + /// Set the preferred HTTP version of the request. + /// + /// The version value should be one of the `HYPER_HTTP_VERSION_` constants. + /// + /// Note that this won't change the major HTTP version of the connection, + /// since that is determined at the handshake step. + fn hyper_request_set_version(req: *mut hyper_request, version: c_int) -> hyper_code { + use http::Version; + + *unsafe { &mut *req }.0.version_mut() = match version { + super::HYPER_HTTP_VERSION_NONE => Version::HTTP_11, + super::HYPER_HTTP_VERSION_1_0 => Version::HTTP_10, + super::HYPER_HTTP_VERSION_1_1 => Version::HTTP_11, + super::HYPER_HTTP_VERSION_2 => Version::HTTP_2, + _ => { + // We don't know this version + return hyper_code::HYPERE_INVALID_ARG; + } + }; + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Gets a reference to the HTTP headers of this request + /// + /// This is not an owned reference, so it should not be accessed after the + /// `hyper_request` has been consumed. + fn hyper_request_headers(req: *mut hyper_request) -> *mut hyper_headers { + hyper_headers::wrap(unsafe { &mut *req }.0.headers_mut()) + } +} + +ffi_fn! { + /// Set the body of the request. + /// + /// The default is an empty body. + /// + /// This takes ownership of the `hyper_body *`, you must not use it or + /// free it after setting it on the request. + fn hyper_request_set_body(req: *mut hyper_request, body: *mut hyper_body) -> hyper_code { + let body = unsafe { Box::from_raw(body) }; + *unsafe { &mut *req }.0.body_mut() = body.0; + hyper_code::HYPERE_OK + } +} + +// ===== impl Response ===== + +ffi_fn! { + /// Free an HTTP response after using it. + fn hyper_response_free(resp: *mut hyper_response) { + drop(unsafe { Box::from_raw(resp) }); + } +} + +ffi_fn! { + /// Get the HTTP-Status code of this response. + /// + /// It will always be within the range of 100-599. + fn hyper_response_status(resp: *const hyper_response) -> u16 { + unsafe { &*resp }.0.status().as_u16() + } +} + +ffi_fn! { + /// Get the HTTP version used by this response. + /// + /// The returned value could be: + /// + /// - `HYPER_HTTP_VERSION_1_0` + /// - `HYPER_HTTP_VERSION_1_1` + /// - `HYPER_HTTP_VERSION_2` + /// - `HYPER_HTTP_VERSION_NONE` if newer (or older). + fn hyper_response_version(resp: *const hyper_response) -> c_int { + use http::Version; + + match unsafe { &*resp }.0.version() { + Version::HTTP_10 => super::HYPER_HTTP_VERSION_1_0, + Version::HTTP_11 => super::HYPER_HTTP_VERSION_1_1, + Version::HTTP_2 => super::HYPER_HTTP_VERSION_2, + _ => super::HYPER_HTTP_VERSION_NONE, + } + } +} + +ffi_fn! { + /// Gets a reference to the HTTP headers of this response. + /// + /// This is not an owned reference, so it should not be accessed after the + /// `hyper_response` has been freed. + fn hyper_response_headers(resp: *mut hyper_response) -> *mut hyper_headers { + hyper_headers::wrap(unsafe { &mut *resp }.0.headers_mut()) + } +} + +ffi_fn! { + /// Take ownership of the body of this response. + /// + /// It is safe to free the response even after taking ownership of its body. + fn hyper_response_body(resp: *mut hyper_response) -> *mut hyper_body { + let body = std::mem::take(unsafe { &mut *resp }.0.body_mut()); + Box::into_raw(Box::new(hyper_body(body))) + } +} + +unsafe impl AsTaskType for hyper_response { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_RESPONSE + } +} + +// ===== impl Headers ===== + +type hyper_headers_foreach_callback = + extern "C" fn(*mut c_void, *const u8, size_t, *const u8, size_t) -> c_int; + +impl hyper_headers { + pub(crate) fn wrap(cx: &mut HeaderMap) -> &mut hyper_headers { + // A struct with only one field has the same layout as that field. + unsafe { std::mem::transmute::<&mut HeaderMap, &mut hyper_headers>(cx) } + } +} + +ffi_fn! { + /// Iterates the headers passing each name and value pair to the callback. + /// + /// The `userdata` pointer is also passed to the callback. + /// + /// The callback should return `HYPER_ITER_CONTINUE` to keep iterating, or + /// `HYPER_ITER_BREAK` to stop. + fn hyper_headers_foreach(headers: *const hyper_headers, func: hyper_headers_foreach_callback, userdata: *mut c_void) { + for (name, value) in unsafe { &*headers }.0.iter() { + let name_ptr = name.as_str().as_bytes().as_ptr(); + let name_len = name.as_str().as_bytes().len(); + let val_ptr = value.as_bytes().as_ptr(); + let val_len = value.as_bytes().len(); + + if HYPER_ITER_CONTINUE != func(userdata, name_ptr, name_len, val_ptr, val_len) { + break; + } + } + } +} + +ffi_fn! { + /// Sets the header with the provided name to the provided value. + /// + /// This overwrites any previous value set for the header. + fn hyper_headers_set(headers: *mut hyper_headers, name: *const u8, name_len: size_t, value: *const u8, value_len: size_t) -> hyper_code { + let headers = unsafe { &mut *headers }; + match unsafe { raw_name_value(name, name_len, value, value_len) } { + Ok((name, value)) => { + headers.0.insert(name, value); + hyper_code::HYPERE_OK + } + Err(code) => code, + } + } +} + +ffi_fn! { + /// Adds the provided value to the list of the provided name. + /// + /// If there were already existing values for the name, this will append the + /// new value to the internal list. + fn hyper_headers_add(headers: *mut hyper_headers, name: *const u8, name_len: size_t, value: *const u8, value_len: size_t) -> hyper_code { + let headers = unsafe { &mut *headers }; + + match unsafe { raw_name_value(name, name_len, value, value_len) } { + Ok((name, value)) => { + headers.0.append(name, value); + hyper_code::HYPERE_OK + } + Err(code) => code, + } + } +} + +unsafe fn raw_name_value( + name: *const u8, + name_len: size_t, + value: *const u8, + value_len: size_t, +) -> Result<(HeaderName, HeaderValue), hyper_code> { + let name = std::slice::from_raw_parts(name, name_len); + let name = match HeaderName::from_bytes(name) { + Ok(name) => name, + Err(_) => return Err(hyper_code::HYPERE_INVALID_ARG), + }; + let value = std::slice::from_raw_parts(value, value_len); + let value = match HeaderValue::from_bytes(value) { + Ok(val) => val, + Err(_) => return Err(hyper_code::HYPERE_INVALID_ARG), + }; + + Ok((name, value)) +} diff --git a/src/ffi/io.rs b/src/ffi/io.rs new file mode 100644 index 0000000000..5d84168486 --- /dev/null +++ b/src/ffi/io.rs @@ -0,0 +1,173 @@ +use std::ffi::c_void; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use libc::size_t; +use tokio::io::{AsyncRead, AsyncWrite}; + +use super::task::hyper_context; + +pub const HYPER_IO_PENDING: size_t = 0xFFFFFFFF; +pub const HYPER_IO_ERROR: size_t = 0xFFFFFFFE; + +type hyper_io_read_callback = + extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut u8, size_t) -> size_t; +type hyper_io_write_callback = + extern "C" fn(*mut c_void, *mut hyper_context<'_>, *const u8, size_t) -> size_t; + +pub struct Io { + read: hyper_io_read_callback, + write: hyper_io_write_callback, + userdata: *mut c_void, +} + +ffi_fn! { + /// Create a new IO type used to represent a transport. + /// + /// The read and write functions of this transport should be set with + /// `hyper_io_set_read` and `hyper_io_set_write`. + fn hyper_io_new() -> *mut Io { + Box::into_raw(Box::new(Io { + read: read_noop, + write: write_noop, + userdata: std::ptr::null_mut(), + })) + } +} + +ffi_fn! { + /// Free an unused `hyper_io *`. + /// + /// This is typically only useful if you aren't going to pass ownership + /// of the IO handle to hyper, such as with `hyper_clientconn_handshake()`. + fn hyper_io_free(io: *mut Io) { + drop(unsafe { Box::from_raw(io) }); + } +} + +ffi_fn! { + /// Set the user data pointer for this IO to some value. + /// + /// This value is passed as an argument to the read and write callbacks. + fn hyper_io_set_userdata(io: *mut Io, data: *mut c_void) { + unsafe { &mut *io }.userdata = data; + } +} + +ffi_fn! { + /// Set the read function for this IO transport. + /// + /// Data that is read from the transport should be put in the `buf` pointer, + /// up to `buf_len` bytes. The number of bytes read should be the return value. + /// + /// It is undefined behavior to try to access the bytes in the `buf` pointer, + /// unless you have already written them yourself. It is also undefined behavior + /// to return that more bytes have been written than actually set on the `buf`. + /// + /// If there is no data currently available, a waker should be claimed from + /// the `ctx` and registered with whatever polling mechanism is used to signal + /// when data is available later on. The return value should be + /// `HYPER_IO_PENDING`. + /// + /// If there is an irrecoverable error reading data, then `HYPER_IO_ERROR` + /// should be the return value. + fn hyper_io_set_read(io: *mut Io, func: hyper_io_read_callback) { + unsafe { &mut *io }.read = func; + } +} + +ffi_fn! { + /// Set the write function for this IO transport. + /// + /// Data from the `buf` pointer should be written to the transport, up to + /// `buf_len` bytes. The number of bytes written should be the return value. + /// + /// If no data can currently be written, the `waker` should be cloned and + /// registered with whatever polling mechanism is used to signal when data + /// is available later on. The return value should be `HYPER_IO_PENDING`. + /// + /// Yeet. + /// + /// If there is an irrecoverable error reading data, then `HYPER_IO_ERROR` + /// should be the return value. + fn hyper_io_set_write(io: *mut Io, func: hyper_io_write_callback) { + unsafe { &mut *io }.write = func; + } +} + +/// cbindgen:ignore +extern "C" fn read_noop( + _userdata: *mut c_void, + _: *mut hyper_context<'_>, + _buf: *mut u8, + _buf_len: size_t, +) -> size_t { + 0 +} + +/// cbindgen:ignore +extern "C" fn write_noop( + _userdata: *mut c_void, + _: *mut hyper_context<'_>, + _buf: *const u8, + _buf_len: size_t, +) -> size_t { + 0 +} + +impl AsyncRead for Io { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let buf_ptr = unsafe { buf.unfilled_mut() }.as_mut_ptr() as *mut u8; + let buf_len = buf.remaining(); + + match (self.read)(self.userdata, hyper_context::wrap(cx), buf_ptr, buf_len) { + HYPER_IO_PENDING => Poll::Pending, + HYPER_IO_ERROR => Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "io error", + ))), + ok => { + // We have to trust that the user's read callback actually + // filled in that many bytes... :( + unsafe { buf.assume_init(ok) }; + buf.advance(ok); + Poll::Ready(Ok(())) + } + } + } +} + +impl AsyncWrite for Io { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let buf_ptr = buf.as_ptr(); + let buf_len = buf.len(); + + match (self.write)(self.userdata, hyper_context::wrap(cx), buf_ptr, buf_len) { + HYPER_IO_PENDING => Poll::Pending, + HYPER_IO_ERROR => Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "io error", + ))), + ok => Poll::Ready(Ok(ok)), + } + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +unsafe impl Send for Io {} +unsafe impl Sync for Io {} diff --git a/src/ffi/macros.rs b/src/ffi/macros.rs new file mode 100644 index 0000000000..f4e031a07d --- /dev/null +++ b/src/ffi/macros.rs @@ -0,0 +1,23 @@ +macro_rules! ffi_fn { + ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),*) -> $ret:ty $body:block) => { + $(#[$doc])* + #[no_mangle] + pub extern fn $name($($arg: $arg_ty),*) -> $ret { + use std::panic::{self, AssertUnwindSafe}; + + match panic::catch_unwind(AssertUnwindSafe(move || $body)) { + Ok(v) => v, + Err(_) => { + // TODO: We shouldn't abort, but rather figure out how to + // convert into the return type that the function errored. + eprintln!("panic unwind caught, aborting"); + std::process::abort(); + } + } + } + }; + + ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),*) $body:block) => { + ffi_fn!($(#[$doc])* fn $name($($arg: $arg_ty),*) -> () $body); + }; +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs new file mode 100644 index 0000000000..cee653d7d0 --- /dev/null +++ b/src/ffi/mod.rs @@ -0,0 +1,55 @@ +// We have a lot of c-types in here, stop warning about their names! +#![allow(non_camel_case_types)] + +// We may eventually allow the FFI to be enabled without `client` or `http1`, +// that is why we don't auto enable them as `ffi = ["client", "http1"]` in +// the `Cargo.toml`. +// +// But for now, give a clear message that this compile error is expected. +#[cfg(not(all(feature = "client", feature = "http1")))] +compile_error!("The `ffi` feature currently requires the `client` and `http1` features."); + +#[cfg(not(hyper_unstable_ffi))] +compile_error!( + "\ + The `ffi` feature is unstable, and requires the \ + `RUSTFLAGS='--cfg hyper_unstable_ffi'` environment variable to be set.\ +" +); + +#[macro_use] +mod macros; + +mod body; +mod client; +mod error; +mod http_types; +mod io; +mod task; + +pub(crate) use self::body::UserBody; + +pub const HYPER_ITER_CONTINUE: libc::c_int = 0; +#[allow(unused)] +pub const HYPER_ITER_BREAK: libc::c_int = 1; + +pub const HYPER_HTTP_VERSION_NONE: libc::c_int = 0; +pub const HYPER_HTTP_VERSION_1_0: libc::c_int = 10; +pub const HYPER_HTTP_VERSION_1_1: libc::c_int = 11; +pub const HYPER_HTTP_VERSION_2: libc::c_int = 20; + +struct UserDataPointer(*mut std::ffi::c_void); + +// We don't actually know anything about this pointer, it's up to the user +// to do the right thing. +unsafe impl Send for UserDataPointer {} + +/// cbindgen:ignore +static VERSION_CSTR: &str = concat!(env!("CARGO_PKG_VERSION"), "\0"); + +ffi_fn! { + /// Returns a static ASCII (null terminated) string of the hyper version. + fn hyper_version() -> *const libc::c_char { + VERSION_CSTR.as_ptr() as _ + } +} diff --git a/src/ffi/task.rs b/src/ffi/task.rs new file mode 100644 index 0000000000..61641bd193 --- /dev/null +++ b/src/ffi/task.rs @@ -0,0 +1,415 @@ +use std::ffi::c_void; +use std::future::Future; +use std::pin::Pin; +use std::ptr; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, Weak, +}; +use std::task::{Context, Poll}; + +use futures_util::stream::{FuturesUnordered, Stream}; +use libc::c_int; + +use super::error::hyper_code; +use super::UserDataPointer; + +type BoxFuture = Pin + Send>>; +type BoxAny = Box; + +pub const HYPER_POLL_READY: c_int = 0; +pub const HYPER_POLL_PENDING: c_int = 1; +pub const HYPER_POLL_ERROR: c_int = 3; + +pub struct Exec { + /// The executor of all task futures. + /// + /// There should never be contention on the mutex, as it is only locked + /// to drive the futures. However, we cannot gaurantee proper usage from + /// `hyper_executor_poll()`, which in C could potentially be called inside + /// one of the stored futures. The mutex isn't re-entrant, so doing so + /// would result in a deadlock, but that's better than data corruption. + driver: Mutex>, + + /// The queue of futures that need to be pushed into the `driver`. + /// + /// This is has a separate mutex since `spawn` could be called from inside + /// a future, which would mean the driver's mutex is already locked. + spawn_queue: Mutex>, + + /// This is used to track when a future calls `wake` while we are within + /// `Exec::poll_next`. + is_woken: Arc, +} + +#[derive(Clone)] +pub(crate) struct WeakExec(Weak); + +struct ExecWaker(AtomicBool); + +pub struct Task { + future: BoxFuture, + output: Option, + userdata: UserDataPointer, +} + +struct TaskFuture { + task: Option>, +} + +pub struct hyper_context<'a>(Context<'a>); + +pub struct hyper_waker { + waker: std::task::Waker, +} + +#[repr(C)] +pub enum hyper_task_return_type { + /// The value of this task is null (does not imply an error). + HYPER_TASK_EMPTY, + /// The value of this task is `hyper_error *`. + HYPER_TASK_ERROR, + /// The value of this task is `hyper_clientconn *`. + HYPER_TASK_CLIENTCONN, + /// The value of this task is `hyper_response *`. + HYPER_TASK_RESPONSE, + /// The value of this task is `hyper_buf *`. + HYPER_TASK_BUF, +} + +pub(crate) unsafe trait AsTaskType { + fn as_task_type(&self) -> hyper_task_return_type; +} + +pub(crate) trait IntoDynTaskType { + fn into_dyn_task_type(self) -> BoxAny; +} + +// ===== impl Exec ===== + +impl Exec { + fn new() -> Arc { + Arc::new(Exec { + driver: Mutex::new(FuturesUnordered::new()), + spawn_queue: Mutex::new(Vec::new()), + is_woken: Arc::new(ExecWaker(AtomicBool::new(false))), + }) + } + + pub(crate) fn downgrade(exec: &Arc) -> WeakExec { + WeakExec(Arc::downgrade(exec)) + } + + fn spawn(&self, task: Box) { + self.spawn_queue + .lock() + .unwrap() + .push(TaskFuture { task: Some(task) }); + } + + fn poll_next(&self) -> Option> { + // Drain the queue first. + self.drain_queue(); + + let waker = futures_util::task::waker_ref(&self.is_woken); + let mut cx = Context::from_waker(&waker); + + loop { + match Pin::new(&mut *self.driver.lock().unwrap()).poll_next(&mut cx) { + Poll::Ready(val) => return val, + Poll::Pending => { + // Check if any of the pending tasks tried to spawn + // some new tasks. If so, drain into the driver and loop. + if self.drain_queue() { + continue; + } + + // If the driver called `wake` while we were polling, + // we should poll again immediately! + if self.is_woken.0.swap(false, Ordering::SeqCst) { + continue; + } + + return None; + } + } + } + } + + fn drain_queue(&self) -> bool { + let mut queue = self.spawn_queue.lock().unwrap(); + if queue.is_empty() { + return false; + } + + let driver = self.driver.lock().unwrap(); + + for task in queue.drain(..) { + driver.push(task); + } + + true + } +} + +impl futures_util::task::ArcWake for ExecWaker { + fn wake_by_ref(me: &Arc) { + me.0.store(true, Ordering::SeqCst); + } +} + +// ===== impl WeakExec ===== + +impl WeakExec { + pub(crate) fn new() -> Self { + WeakExec(Weak::new()) + } +} + +impl crate::rt::Executor> for WeakExec { + fn execute(&self, fut: BoxFuture<()>) { + if let Some(exec) = self.0.upgrade() { + exec.spawn(Task::boxed(fut)); + } + } +} + +ffi_fn! { + /// Creates a new task executor. + fn hyper_executor_new() -> *const Exec { + Arc::into_raw(Exec::new()) + } +} + +ffi_fn! { + /// Frees an executor and any incomplete tasks still part of it. + fn hyper_executor_free(exec: *const Exec) { + drop(unsafe { Arc::from_raw(exec) }); + } +} + +ffi_fn! { + /// Push a task onto the executor. + /// + /// The executor takes ownership of the task, it should not be accessed + /// again unless returned back to the user with `hyper_executor_poll`. + fn hyper_executor_push(exec: *const Exec, task: *mut Task) -> hyper_code { + if exec.is_null() || task.is_null() { + return hyper_code::HYPERE_INVALID_ARG; + } + let exec = unsafe { &*exec }; + let task = unsafe { Box::from_raw(task) }; + exec.spawn(task); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Polls the executor, trying to make progress on any tasks that have notified + /// that they are ready again. + /// + /// If ready, returns a task from the executor that has completed. + /// + /// If there are no ready tasks, this returns `NULL`. + fn hyper_executor_poll(exec: *const Exec) -> *mut Task { + // We only want an `&Arc` in here, so wrap in a `ManuallyDrop` so we + // don't accidentally trigger a ref_dec of the Arc. + let exec = unsafe { &*exec }; + match exec.poll_next() { + Some(task) => Box::into_raw(task), + None => ptr::null_mut(), + } + } +} + +// ===== impl Task ===== + +impl Task { + pub(crate) fn boxed(fut: F) -> Box + where + F: Future + Send + 'static, + F::Output: IntoDynTaskType + Send + Sync + 'static, + { + Box::new(Task { + future: Box::pin(async move { fut.await.into_dyn_task_type() }), + output: None, + userdata: UserDataPointer(ptr::null_mut()), + }) + } + + fn output_type(&self) -> hyper_task_return_type { + match self.output { + None => hyper_task_return_type::HYPER_TASK_EMPTY, + Some(ref val) => val.as_task_type(), + } + } +} + +impl Future for TaskFuture { + type Output = Box; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) { + Poll::Ready(val) => { + let mut task = self.task.take().unwrap(); + task.output = Some(val); + Poll::Ready(task) + } + Poll::Pending => Poll::Pending, + } + } +} + +ffi_fn! { + /// Free a task. + fn hyper_task_free(task: *mut Task) { + drop(unsafe { Box::from_raw(task) }); + } +} + +ffi_fn! { + /// Takes the output value of this task. + /// + /// This must only be called once polling the task on an executor has finished + /// this task. + /// + /// Use `hyper_task_type` to determine the type of the `void *` return value. + fn hyper_task_value(task: *mut Task) -> *mut c_void { + if task.is_null() { + return ptr::null_mut(); + } + + let task = unsafe { &mut *task }; + + if let Some(val) = task.output.take() { + let p = Box::into_raw(val) as *mut c_void; + // protect from returning fake pointers to empty types + if p == std::ptr::NonNull::::dangling().as_ptr() { + ptr::null_mut() + } else { + p + } + } else { + ptr::null_mut() + } + } +} + +ffi_fn! { + /// Query the return type of this task. + fn hyper_task_type(task: *mut Task) -> hyper_task_return_type { + if task.is_null() { + // instead of blowing up spectacularly, just say this null task + // doesn't have a value to retrieve. + return hyper_task_return_type::HYPER_TASK_EMPTY; + } + + unsafe { &*task }.output_type() + } +} + +ffi_fn! { + /// Set a user data pointer to be associated with this task. + /// + /// This value will be passed to task callbacks, and can be checked later + /// with `hyper_task_userdata`. + fn hyper_task_set_userdata(task: *mut Task, userdata: *mut c_void) { + if task.is_null() { + return; + } + + unsafe { (*task).userdata = UserDataPointer(userdata) }; + } +} + +ffi_fn! { + /// Retrieve the userdata that has been set via `hyper_task_set_userdata`. + fn hyper_task_userdata(task: *mut Task) -> *mut c_void { + if task.is_null() { + return ptr::null_mut(); + } + + unsafe { &*task }.userdata.0 + } +} + +// ===== impl AsTaskType ===== + +unsafe impl AsTaskType for () { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_EMPTY + } +} + +unsafe impl AsTaskType for crate::Error { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_ERROR + } +} + +impl IntoDynTaskType for T +where + T: AsTaskType + Send + Sync + 'static, +{ + fn into_dyn_task_type(self) -> BoxAny { + Box::new(self) + } +} + +impl IntoDynTaskType for crate::Result +where + T: IntoDynTaskType + Send + Sync + 'static, +{ + fn into_dyn_task_type(self) -> BoxAny { + match self { + Ok(val) => val.into_dyn_task_type(), + Err(err) => Box::new(err), + } + } +} + +impl IntoDynTaskType for Option +where + T: IntoDynTaskType + Send + Sync + 'static, +{ + fn into_dyn_task_type(self) -> BoxAny { + match self { + Some(val) => val.into_dyn_task_type(), + None => ().into_dyn_task_type(), + } + } +} + +// ===== impl hyper_context ===== + +impl hyper_context<'_> { + pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> { + // A struct with only one field has the same layout as that field. + unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) } + } +} + +ffi_fn! { + /// Copies a waker out of the task context. + fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker { + let waker = unsafe { &mut *cx }.0.waker().clone(); + Box::into_raw(Box::new(hyper_waker { waker })) + } +} + +// ===== impl hyper_waker ===== + +ffi_fn! { + /// Free a waker that hasn't been woken. + fn hyper_waker_free(waker: *mut hyper_waker) { + drop(unsafe { Box::from_raw(waker) }); + } +} + +ffi_fn! { + /// Free a waker that hasn't been woken. + fn hyper_waker_wake(waker: *mut hyper_waker) { + let waker = unsafe { Box::from_raw(waker) }; + waker.waker.wake(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 7268725e5f..e9b04229ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,6 +87,9 @@ pub mod rt; pub mod service; pub mod upgrade; +#[cfg(feature = "ffi")] +mod ffi; + cfg_proto! { mod headers; mod proto; diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index b9f658af9f..ab8616fec6 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -58,10 +58,10 @@ cfg_client! { impl Dispatcher where D: Dispatch< - PollItem = MessageHead, - PollBody = Bs, - RecvItem = MessageHead, - > + Unpin, + PollItem = MessageHead, + PollBody = Bs, + RecvItem = MessageHead, + > + Unpin, D::PollError: Into>, I: AsyncRead + AsyncWrite + Unpin, T: Http1Transaction + Unpin, @@ -405,10 +405,10 @@ where impl Future for Dispatcher where D: Dispatch< - PollItem = MessageHead, - PollBody = Bs, - RecvItem = MessageHead, - > + Unpin, + PollItem = MessageHead, + PollBody = Bs, + RecvItem = MessageHead, + > + Unpin, D::PollError: Into>, I: AsyncRead + AsyncWrite + Unpin, T: Http1Transaction + Unpin,