Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind out cross-process lock with unit tests. #519

Merged
merged 12 commits into from
Nov 7, 2023
31 changes: 31 additions & 0 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,37 @@
from enum import IntEnum


class CrossProcessLock(NativeResource):
"""
Class representing an exclusive cross-process lock, scoped by `lock_scope_name`

Recommended usage is to either explicitly call acquire() followed by release() when the lock is no longer required, or use this in a 'with' statement.

acquire() will throw a RuntimeError with AWS_MUTEX_CALLER_NOT_OWNER as the error code, if the lock could not be acquired.

If the lock has not been explicitly released when the process exits, it will be released by the operating system.

Keyword Args:
lock_scope_name (str): Unique string identifying the caller holding the lock.
"""

def __init__(self, lock_scope_name):
super().__init__()
self._binding = _awscrt.s3_cross_process_lock_new(lock_scope_name)

def acquire(self):
_awscrt.s3_cross_process_lock_acquire(self._binding)

def __enter__(self):
self.acquire()

def release(self):
_awscrt.s3_cross_process_lock_release(self._binding)

def __exit__(self, exc_type, exc_value, exc_tb):
self.release()


class S3RequestType(IntEnum):
"""The type of the AWS S3 request"""

Expand Down
3 changes: 3 additions & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,9 @@ static PyMethodDef s_module_methods[] = {
AWS_PY_METHOD_DEF(s3_meta_request_cancel, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_get_ec2_instance_type, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_is_crt_s3_optimized_for_system, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_new, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_acquire, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_release, METH_VARARGS),

/* WebSocket */
AWS_PY_METHOD_DEF(websocket_client_connect, METH_VARARGS),
Expand Down
4 changes: 4 additions & 0 deletions source/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args);

PyObject *aws_py_s3_meta_request_cancel(PyObject *self, PyObject *args);

PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args);
PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args);
PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args);

struct aws_s3_client *aws_py_get_s3_client(PyObject *s3_client);
struct aws_s3_meta_request *aws_py_get_s3_meta_request(PyObject *s3_client);

Expand Down
99 changes: 99 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#include "auth.h"
#include "io.h"
#include <aws/common/cross_process_lock.h>
#include <aws/s3/s3_client.h>

static const char *s_capsule_name_s3_client = "aws_s3_client";
static const char *s_capsule_name_s3_instance_lock = "aws_cross_process_lock";

PyObject *aws_py_s3_get_ec2_instance_type(PyObject *self, PyObject *args) {
(void)self;
Expand Down Expand Up @@ -37,6 +39,103 @@ PyObject *aws_py_s3_is_crt_s3_optimized_for_system(PyObject *self, PyObject *arg
Py_RETURN_FALSE;
}

struct cross_process_lock_binding {
struct aws_cross_process_lock *lock;
struct aws_string *name;
};

/* Invoked when the python object gets cleaned up */
static void s_s3_cross_process_lock_destructor(PyObject *capsule) {
struct cross_process_lock_binding *lock_binding = PyCapsule_GetPointer(capsule, s_capsule_name_s3_instance_lock);

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

if (lock_binding->name) {
aws_string_destroy(lock_binding->name);
}

aws_mem_release(aws_py_get_allocator(), lock_binding);
}

PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

struct aws_byte_cursor lock_name; /* s# */

if (!PyArg_ParseTuple(args, "s#", &lock_name.ptr, &lock_name.len)) {
return NULL;
}

struct cross_process_lock_binding *binding =
aws_mem_calloc(allocator, 1, sizeof(struct cross_process_lock_binding));
binding->name = aws_string_new_from_cursor(allocator, &lock_name);

PyObject *capsule = PyCapsule_New(binding, s_capsule_name_s3_instance_lock, s_s3_cross_process_lock_destructor);
if (!capsule) {
aws_string_destroy(binding->name);
aws_mem_release(allocator, binding);
return PyErr_AwsLastError();
}

return capsule;
}

PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct cross_process_lock_binding *lock_binding =
PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
if (!lock_binding) {
return NULL;
}

if (!lock_binding->lock) {
struct aws_cross_process_lock *lock =
aws_cross_process_lock_try_acquire(allocator, aws_byte_cursor_from_string(lock_binding->name));

if (!lock) {
return PyErr_AwsLastError();
}
lock_binding->lock = lock;
}

Py_RETURN_NONE;
}

PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args) {
PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct cross_process_lock_binding *lock_binding =
PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
if (!lock_binding) {
return NULL;
}

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

Py_RETURN_NONE;
}

struct s3_client_binding {
struct aws_s3_client *native;

Expand Down
54 changes: 54 additions & 0 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import tempfile
import math
import shutil
import time
from test import NativeResourceTest
from concurrent.futures import Future
from multiprocessing import Process

from awscrt.http import HttpHeaders, HttpRequest
from awscrt.s3 import (
Expand All @@ -18,6 +20,7 @@
S3ChecksumLocation,
S3Client,
S3RequestType,
CrossProcessLock,
create_default_s3_signing_config,
)
from awscrt.io import (
Expand All @@ -41,6 +44,57 @@
MB = 1024 ** 2
GB = 1024 ** 3

cross_process_lock_name = "instance_lock_test"


def cross_proc_task():
try:
lock = CrossProcessLock(cross_process_lock_name)
lock.acquire()
lock.release()
exit(0)
except RuntimeError as e:
exit(-1)


class CrossProcessLockTest(NativeResourceTest):
def setUp(self):
self.nonce = time.time()
super().setUp()

def test_with_statement(self):
nonce_str = f'lock_a_{self.nonce}'
with CrossProcessLock(nonce_str) as lock:
try:
new_lock = CrossProcessLock(nonce_str)
new_lock.acquire()
self.fail("Acquiring a lock by the same nonce should fail when it's already held")
except RuntimeError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the CRT have any formalized error hierarchy in Python? I haven't dug too deeply yet. My only concern here is RuntimeError is very generic so it will be difficult to do control flow handling with this without message introspection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graebm any thoughts here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not great. It's the same problem we have in every language binding. We didn't code-gen error classes, so catching specific CRT errors means users must catch a RuntimeError and introspect it.

In this specific case, we could dance around the issue and just return None or False if the C function returns NULL, since we do often expect it to fail 🤷‍♀️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy about this. I really want to do a mildly-breaking-change to all our bindings and address this issue. But it's non-trivial :|

unique_nonce_str = f'lock_b{self.nonce}'
new_lock = CrossProcessLock(unique_nonce_str)
new_lock.acquire()
new_lock.release()

lock_after_with_same_nonce = CrossProcessLock(nonce_str)
lock_after_with_same_nonce.acquire()
lock_after_with_same_nonce.release()

def test_cross_proc(self):
with CrossProcessLock(cross_process_lock_name) as lock:
process = Process(target=cross_proc_task)
process.start()
process.join()
# aquiring this lock in a sub-process should fail since we
# already hold the lock in this process.
self.assertNotEqual(0, process.exitcode)

# now that we've released the lock above, the same sub-process path
# should now succeed.
unlocked_process = Process(target=cross_proc_task)
unlocked_process.start()
unlocked_process.join()
self.assertEqual(0, unlocked_process.exitcode)


class FileCreator(object):
def __init__(self):
Expand Down
Loading