Skip to content

Commit

Permalink
Support machines with multiple NICs (#576)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
waahm7 and graebm authored Jul 11, 2024
1 parent 717e4d2 commit 97b7189
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 37 deletions.
19 changes: 16 additions & 3 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
from awscrt import NativeResource
from awscrt.http import HttpRequest
from awscrt.io import ClientBootstrap, TlsConnectionOptions
from awscrt.auth import AwsCredentials, AwsCredentialsProvider, AwsSignatureType, AwsSignedBodyHeaderType, AwsSignedBodyValue, AwsSigningAlgorithm, AwsSigningConfig
from awscrt.auth import AwsCredentialsProvider, AwsSignatureType, AwsSignedBodyHeaderType, AwsSignedBodyValue, \
AwsSigningAlgorithm, AwsSigningConfig
import awscrt.exceptions
import threading
from dataclasses import dataclass
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Sequence
from enum import IntEnum


Expand Down Expand Up @@ -205,6 +204,13 @@ class S3Client(NativeResource):
client can use for buffering data for requests.
Default values scale with target throughput and are currently
between 2GiB and 8GiB (may change in future)
network_interface_names: (Optional[Sequence(str)]) A sequence of network interface names. The client will distribute the
connections across network interfaces. If any interface name is invalid, goes down,
or has any issues like network access, you will see connection failures.
This option is only supported on Linux, MacOS, and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It
is not supported on Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms. On
Linux, SO_BINDTODEVICE is used and requires kernel version >= 5.7 or root privileges.
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -222,7 +228,8 @@ def __init__(
multipart_upload_threshold=None,
throughput_target_gbps=None,
enable_s3express=False,
memory_limit=None):
memory_limit=None,
network_interface_names: Optional[Sequence[str]] = None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
assert isinstance(signing_config, AwsSigningConfig) or signing_config is None
Expand All @@ -235,6 +242,7 @@ def __init__(
throughput_target_gbps,
float) or throughput_target_gbps is None
assert isinstance(enable_s3express, bool) or enable_s3express is None
assert isinstance(network_interface_names, Sequence) or network_interface_names is None

if credential_provider and signing_config:
raise ValueError("'credential_provider' has been deprecated in favor of 'signing_config'. "
Expand Down Expand Up @@ -270,6 +278,10 @@ def on_shutdown():
throughput_target_gbps = 0
if memory_limit is None:
memory_limit = 0
if network_interface_names is not None:
# ensure this is a list, so it's simpler to process in C
if not isinstance(network_interface_names, list):
network_interface_names = list(network_interface_names)

self._binding = _awscrt.s3_client_new(
bootstrap,
Expand All @@ -284,6 +296,7 @@ def on_shutdown():
throughput_target_gbps,
enable_s3express,
memory_limit,
network_interface_names,
s3_client_core)

def make_request(
Expand Down
88 changes: 62 additions & 26 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,24 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *network_interface_names_py; /* O */
PyObject *py_core; /* O */

if (!PyArg_ParseTuple(
args,
"OOOOOs#iKKdpKO",
"OOOOOs#iKKdpKOO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -274,6 +276,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&throughput_target_gbps,
&enable_s3express,
&mem_limit,
&network_interface_names_py,
&py_core)) {
return NULL;
}
Expand Down Expand Up @@ -304,10 +307,16 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_signing_config_aws *signing_config = NULL;
struct aws_credentials *anonymous_credentials = NULL;
struct aws_byte_cursor *network_interface_names = NULL;
size_t num_network_interface_names = 0;
PyObject *capsule = NULL;
/* From hereon, we need to clean up if errors occur */
bool success = false;

if (signing_config_py != Py_None) {
signing_config = aws_py_get_signing_config(signing_config_py);
if (!signing_config) {
return NULL;
goto cleanup;
}
} else if (credential_provider) {
aws_s3_init_default_signing_config(&default_signing_config, region, credential_provider);
Expand All @@ -321,13 +330,10 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct s3_client_binding *s3_client = aws_mem_calloc(allocator, 1, sizeof(struct s3_client_binding));

/* From hereon, we need to clean up if errors occur */

PyObject *capsule = PyCapsule_New(s3_client, s_capsule_name_s3_client, s_s3_client_capsule_destructor);
capsule = PyCapsule_New(s3_client, s_capsule_name_s3_client, s_s3_client_capsule_destructor);
if (!capsule) {
aws_credentials_release(anonymous_credentials);
aws_mem_release(allocator, s3_client);
return NULL;
goto cleanup;
}

s3_client->on_shutdown = on_shutdown_py;
Expand All @@ -336,6 +342,31 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
s3_client->py_core = py_core;
Py_INCREF(s3_client->py_core);

if (network_interface_names_py != Py_None) {
if (!PyList_Check(network_interface_names_py)) {
PyErr_SetString(PyExc_TypeError, "Expected network_interface_names to be a sequence.");
goto cleanup;
}
Py_ssize_t list_size = PyList_Size(network_interface_names_py);
if (list_size < 0) {
goto cleanup;
}
num_network_interface_names = (size_t)list_size;
network_interface_names =
aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor));
for (size_t i = 0; i < num_network_interface_names; ++i) {
PyObject *str_obj = PyList_GetItem(network_interface_names_py, i); /* Borrowed reference */
if (!str_obj) {
goto cleanup;
}
network_interface_names[i] = aws_byte_cursor_from_pyunicode(str_obj);
if (network_interface_names[i].ptr == NULL) {
PyErr_SetString(PyExc_TypeError, "Expected all network_interface_names elements to be strings.");
goto cleanup;
}
}
}

struct aws_s3_client_config s3_config = {
.region = region,
.client_bootstrap = bootstrap,
Expand All @@ -349,18 +380,23 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.shutdown_callback = s_s3_client_shutdown,
.shutdown_callback_user_data = s3_client,
.enable_s3express = enable_s3express,
.network_interface_names_array = network_interface_names,
.num_network_interface_names = num_network_interface_names,
};

s3_client->native = aws_s3_client_new(allocator, &s3_config);
if (s3_client->native == NULL) {
PyErr_SetAwsLastError();
goto error;
goto cleanup;
}
aws_credentials_release(anonymous_credentials);
return capsule;
success = true;

error:
cleanup:
aws_credentials_release(anonymous_credentials);
Py_DECREF(capsule);
return NULL;
aws_mem_release(allocator, network_interface_names);
if (!success) {
Py_XDECREF(capsule);
return NULL;
}
return capsule;
}
14 changes: 9 additions & 5 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
from multiprocessing import Process

from awscrt.http import HttpHeaders, HttpRequest
from awscrt.s3 import S3Client, S3RequestType, create_default_s3_signing_config
from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions
from awscrt.auth import AwsCredentials, AwsCredentialsProvider, AwsSignatureType, AwsSignedBodyHeaderType, AwsSignedBodyValue, AwsSigningAlgorithm, AwsSigningConfig
from awscrt.auth import AwsCredentials
from awscrt.s3 import (
S3ChecksumAlgorithm,
S3ChecksumConfig,
Expand Down Expand Up @@ -158,7 +156,8 @@ def s3_client_new(
part_size=0,
is_cancel_test=False,
enable_s3express=False,
mem_limit=None):
mem_limit=None,
network_interface_names=None):

if is_cancel_test:
# for cancellation tests, make things slow, so it's less likely that
Expand Down Expand Up @@ -189,7 +188,8 @@ def s3_client_new(
part_size=part_size,
throughput_target_gbps=throughput_target_gbps,
enable_s3express=enable_s3express,
memory_limit=mem_limit)
memory_limit=mem_limit,
network_interface_names=network_interface_names)

return s3_client

Expand Down Expand Up @@ -221,6 +221,10 @@ def test_sanity_secure(self):
s3_client = s3_client_new(True, self.region)
self.assertIsNotNone(s3_client)

def test_sanity_network_interface_names(self):
s3_client = s3_client_new(True, self.region, network_interface_names=("eth0", "eth1"))
self.assertIsNotNone(s3_client)

def test_wait_shutdown(self):
s3_client = s3_client_new(False, self.region)
self.assertIsNotNone(s3_client)
Expand Down

0 comments on commit 97b7189

Please sign in to comment.