Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws_async_input_stream #573

Merged
merged 6 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- 'main'

env:
BUILDER_VERSION: v0.9.43
BUILDER_VERSION: v0.9.46
BUILDER_SOURCE: releases
BUILDER_HOST: https://d19elf31gohf1l.cloudfront.net
PACKAGE_NAME: aws-c-io
Expand Down
5 changes: 3 additions & 2 deletions codebuild/linux-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ phases:
pre_build:
commands:
- export CC=gcc-7
- export BUILDER_VERSION=v0.9.29
- export BUILDER_SOURCE=releases
- export BUILDER_VERSION=$(cat .github/workflows/ci.yml | grep 'BUILDER_VERSION:' | sed 's/\s*BUILDER_VERSION:\s*\(.*\)/\1/')
- export BUILDER_SOURCE=$(cat .github/workflows/ci.yml | grep 'BUILDER_SOURCE:' | sed 's/\s*BUILDER_SOURCE:\s*\(.*\)/\1/')
- echo "Using builder version='${BUILDER_VERSION}' source='${BUILDER_SOURCE}'"
- export BUILDER_HOST=https://d19elf31gohf1l.cloudfront.net
build:
commands:
Expand Down
123 changes: 123 additions & 0 deletions include/aws/io/async_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_IO_ASYNC_STREAM_H
#define AWS_IO_ASYNC_STREAM_H

/**
* THIS IS AN EXPERIMENTAL AND UNSTABLE API
* TODO: logging
* TODO: modify API to return byte-bufs, instead of filling in the provided byte-buf?
* this would avoid a copy in the use-cases we know of, but it's more complex
* TODO: vtable acquire()/release()?
* TODO: protect against simultaneous reads?
* TODO: check results of vtable->read() (i.e. 0 byte reads not allowed)?
* this would require 1 or 2 additional allocations per read
*/

#include <aws/io/io.h>

#include <aws/common/ref_count.h>

AWS_PUSH_SANE_WARNING_LEVEL

struct aws_async_input_stream;
struct aws_byte_buf;
struct aws_future_bool;
struct aws_input_stream;

struct aws_async_input_stream {
const struct aws_async_input_stream_vtable *vtable;
struct aws_allocator *alloc;
struct aws_ref_count ref_count;
void *impl;
};

struct aws_async_input_stream_vtable {
/**
* Destroy the stream, its refcount has reached 0.
*/
void (*destroy)(struct aws_async_input_stream *stream);

/**
* Read once into the buffer.
* Complete the read when at least 1 byte is read, the buffer is full, or EOF is reached.
* Do not resize the buffer (do not use "aws_byte_buf_xyz_dynamic()" functions)
* Do not assume that buffer len starts at 0.
* You may assume that read() won't be called again until the current one completes.
* You may assume that the buffer has some space available.
* Return a future, which will contain an error code if something went wrong,
* or a result bool indicating whether EOF has been reached.
*/
struct aws_future_bool *(*read)(struct aws_async_input_stream *stream, struct aws_byte_buf *dest);
};

AWS_EXTERN_C_BEGIN

/**
* Initialize aws_async_input_stream "base class"
*/
AWS_IO_API
void aws_async_input_stream_init_base(
struct aws_async_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_async_input_stream_vtable *vtable,
void *impl);

/**
* Increment reference count.
* You may pass in NULL (has no effect).
* Returns whatever pointer was passed in.
*/
AWS_IO_API
struct aws_async_input_stream *aws_async_input_stream_acquire(struct aws_async_input_stream *stream);

/**
* Decrement reference count.
* You may pass in NULL (has no effect).
* Always returns NULL.
*/
AWS_IO_API
struct aws_async_input_stream *aws_async_input_stream_release(struct aws_async_input_stream *stream);

/**
* Read once from the async stream into the buffer.
* The read completes when at least 1 byte is read, the buffer is full, or EOF is reached.
* Depending on implementation, the read could complete at any time.
* It may complete synchronously. It may complete on another thread.
* Returns a future, which will contain an error code if something went wrong,
* or a result bool indicating whether EOF has been reached.
*
* WARNING: Do not call read() again until the previous read() is done.
graebm marked this conversation as resolved.
Show resolved Hide resolved
*/
AWS_IO_API
struct aws_future_bool *aws_async_input_stream_read(struct aws_async_input_stream *stream, struct aws_byte_buf *dest);

/**
* Read repeatedly from the async stream until the buffer is full, or EOF is reached.
* Depending on implementation, this could complete at any time.
* It may complete synchronously. It may complete on another thread.
* Returns a future, which will contain an error code if something went wrong,
* or a result bool indicating whether EOF has been reached.
*/
AWS_IO_API
struct aws_future_bool *aws_async_input_stream_read_to_fill(
struct aws_async_input_stream *stream,
struct aws_byte_buf *dest);

/**
* Create a new async stream, which wraps a synchronous aws_input_stream.
* The new stream acquires a reference to the `source` stream.
* This function cannot fail.
*/
AWS_IO_API
struct aws_async_input_stream *aws_async_input_stream_new_from_synchronous(
struct aws_allocator *alloc,
struct aws_input_stream *source);

AWS_EXTERN_C_END
AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_IO_ASYNC_STREAM_H */
209 changes: 209 additions & 0 deletions source/async_stream.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/io/async_stream.h>

#include <aws/common/byte_buf.h>
#include <aws/io/future.h>
#include <aws/io/stream.h>

void aws_async_input_stream_init_base(
struct aws_async_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_async_input_stream_vtable *vtable,
void *impl) {

AWS_ZERO_STRUCT(*stream);
stream->alloc = alloc;
stream->vtable = vtable;
stream->impl = impl;
aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy);
}

struct aws_async_input_stream *aws_async_input_stream_acquire(struct aws_async_input_stream *stream) {
if (stream != NULL) {
aws_ref_count_acquire(&stream->ref_count);
}
return stream;
}

struct aws_async_input_stream *aws_async_input_stream_release(struct aws_async_input_stream *stream) {
if (stream) {
aws_ref_count_release(&stream->ref_count);
}
return NULL;
}

struct aws_future_bool *aws_async_input_stream_read(struct aws_async_input_stream *stream, struct aws_byte_buf *dest) {
/* Deal with this edge case here, instead of relying on every implementation to do it right. */
graebm marked this conversation as resolved.
Show resolved Hide resolved
if (dest->len == dest->capacity) {
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
return future;
}

return stream->vtable->read(stream, dest);
}

/* Data to perform the aws_async_input_stream_read_to_fill() job */
struct aws_async_input_stream_fill_job {
struct aws_allocator *alloc;
struct aws_async_input_stream *stream;
struct aws_byte_buf *dest;
/* Future for each read() step */
struct aws_future_bool *read_future;
/* Future to set when this job completes */
struct aws_future_bool *my_future;
graebm marked this conversation as resolved.
Show resolved Hide resolved
};

static void s_async_stream_fill_job_complete(struct aws_async_input_stream_fill_job *job, bool eof, int error_code) {
if (error_code) {
aws_future_bool_set_error(job->my_future, error_code);
} else {
aws_future_bool_set_result(job->my_future, eof);
}
aws_future_bool_release(job->my_future);
aws_async_input_stream_release(job->stream);
aws_mem_release(job->alloc, job);
}

/* Call read() in a loop.
* It would be simpler to set a completion callback for each read() call,
* but this risks our call stack growing large if there are many small, synchronous, reads.
* So be complicated and loop until a read() ) call is actually async,
* and only then set the completion callback (which is this same function, where we resume looping). */
static void s_async_stream_fill_job_loop(void *user_data) {
struct aws_async_input_stream_fill_job *job = user_data;

while (true) {
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
/* Process read_future from previous iteration of loop.
* It's NULL the first time the job ever enters the loop.
* But it's set in subsequent runs of the loop, and when this is a read_future completion callback. */
if (job->read_future) {
if (aws_future_bool_register_callback_if_not_done(job->read_future, s_async_stream_fill_job_loop, job)) {
/* not done, we'll resume this loop when callback fires */
return;
}

/* read_future is done */
int error_code = aws_future_bool_get_error(job->read_future);
bool eof = error_code ? false : aws_future_bool_get_result(job->read_future);
bool reached_capacity = job->dest->len == job->dest->capacity;
job->read_future = aws_future_bool_release(job->read_future); /* release and NULL */

if (error_code || eof || reached_capacity) {
/* job complete! */
s_async_stream_fill_job_complete(job, eof, error_code);
return;
}
waahm7 marked this conversation as resolved.
Show resolved Hide resolved
}

/* Kick off a read, which may or may not complete async */
job->read_future = aws_async_input_stream_read(job->stream, job->dest);
}
}

struct aws_future_bool *aws_async_input_stream_read_to_fill(
struct aws_async_input_stream *stream,
struct aws_byte_buf *dest) {

struct aws_future_bool *future = aws_future_bool_new(stream->alloc);

/* Deal with this edge case here, instead of relying on every implementation to do it right. */
if (dest->len == dest->capacity) {
aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
return future;
}

struct aws_async_input_stream_fill_job *job =
aws_mem_calloc(stream->alloc, 1, sizeof(struct aws_async_input_stream_fill_job));
job->alloc = stream->alloc;
job->stream = aws_async_input_stream_acquire(stream);
job->dest = dest;
job->my_future = aws_future_bool_acquire(future);

/* Kick off work */
s_async_stream_fill_job_loop(job);

return future;
}

/*******************************************************************************
* aws_async_stream_wrapping_synchronous
******************************************************************************/

struct aws_async_stream_wrapping_synchronous {
struct aws_async_input_stream base;
struct aws_input_stream *source;
};

static void s_async_wrapping_synchronous_stream_destroy(struct aws_async_input_stream *async_stream) {
struct aws_async_stream_wrapping_synchronous *async_impl = async_stream->impl;
aws_input_stream_release(async_impl->source);
aws_mem_release(async_stream->alloc, async_impl);
}

static struct aws_future_bool *s_async_wrapping_synchronous_stream_read(
struct aws_async_input_stream *async_stream,
struct aws_byte_buf *dest) {

struct aws_async_stream_wrapping_synchronous *async_impl = async_stream->impl;

struct aws_future_bool *future = aws_future_bool_new(async_stream->alloc);

/* Keep calling read() until we get some data, or hit EOF.
* We do this because the synchronous aws_input_stream API allows
* 0 byte reads, but the aws_async_input_stream API does not.
*
* The synchronous aws_input_stream API allows 0 bytes reads because we
* didn't used to have an async API, and 0 byte reads were the way to report
* "data not available yet".
*
* Continually calling read() is a waste of CPU, but I'm not sure tricks to
* work around this (sleeping between retries, retrying from a thread, etc)
* are worth the complexity. The real solution to an aws_input_stream
* doing 0 byte reads is to replace it with an actual async stream */
size_t prev_len = dest->len;
struct aws_stream_status status = {.is_end_of_stream = false, .is_valid = true};
while (!status.is_end_of_stream && (dest->len == prev_len)) {
/* read from stream */
if (aws_input_stream_read(async_impl->source, dest) != AWS_OP_SUCCESS) {
aws_future_bool_set_error(future, aws_last_error());
goto done;
}

/* check if stream is done */
if (aws_input_stream_get_status(async_impl->source, &status) != AWS_OP_SUCCESS) {
aws_future_bool_set_error(future, aws_last_error());
goto done;
}
}

aws_future_bool_set_result(future, status.is_end_of_stream);
done:
return future;
}

static const struct aws_async_input_stream_vtable s_async_stream_wrapping_input_stream_vtable = {
.destroy = s_async_wrapping_synchronous_stream_destroy,
.read = s_async_wrapping_synchronous_stream_read,
};

struct aws_async_input_stream *aws_async_input_stream_new_from_synchronous(
struct aws_allocator *alloc,
struct aws_input_stream *source) {

AWS_PRECONDITION(source);
graebm marked this conversation as resolved.
Show resolved Hide resolved

struct aws_async_stream_wrapping_synchronous *async_impl =
aws_mem_calloc(alloc, 1, sizeof(struct aws_async_stream_wrapping_synchronous));

aws_async_input_stream_init_base(
&async_impl->base, alloc, &s_async_stream_wrapping_input_stream_vtable, async_impl);

async_impl->source = aws_input_stream_acquire(source);

return &async_impl->base;
}