Skip to content

Commit

Permalink
[experimental] rgw/sfs: add DBConn::lock
Browse files Browse the repository at this point in the history
This is for use around transactions so we don't accidentally end up with
transactions inside transations across multiple threads.

Signed-off-by: Tim Serong <[email protected]>
  • Loading branch information
tserong committed Oct 13, 2023
1 parent 08f5a25 commit 2567cee
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/rgw/driver/sfs/sqlite/dbconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class DBConn {
sqlite3* first_sqlite_conn;
CephContext* const cct;
const bool profile_enabled;
ceph::mutex lock = ceph::make_mutex("sfs_db_lock");

DBConn(CephContext* _cct);
virtual ~DBConn() = default;
Expand Down
3 changes: 3 additions & 0 deletions src/rgw/driver/sfs/sqlite/sqlite_buckets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
RetrySQLite<DBDeletedObjectItems> retry([&]() {
bucket_deleted = false;
DBDeletedObjectItems ret_values;
std::lock_guard l(conn->lock);
// FIXME: looks like this transaction will never rollback/commit if anything below throws.
// FIXME: I assume this wants to be storage.transaction_guard()?
storage.begin_transaction();
// first get all the objects and versions for that bucket
ret_values = storage.select(
Expand Down
10 changes: 10 additions & 0 deletions src/rgw/driver/sfs/sqlite/sqlite_multipart.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ int SQLiteMultipart::abort_multiparts_by_bucket_id(const std::string& bucket_id
) const {
auto& storage = conn->get_storage();
uint64_t num_changes = 0;
std::lock_guard l(conn->lock);
storage.transaction([&]() mutable {
storage.update_all(
set(c(&DBMultipart::state) = MultipartState::ABORTED,
Expand Down Expand Up @@ -223,6 +224,7 @@ std::optional<DBMultipartPart> SQLiteMultipart::create_or_reset_part(
auto& storage = conn->get_storage();

RetrySQLite<std::optional<DBMultipartPart>> retry([&]() {
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
std::optional<DBMultipartPart> entry = std::nullopt;
auto cnt = storage.count<DBMultipart>(where(
Expand Down Expand Up @@ -302,6 +304,7 @@ bool SQLiteMultipart::finish_part(
uint64_t bytes_written
) const {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
bool committed = storage.transaction([&]() mutable {
storage.update_all(
set(c(&DBMultipartPart::etag) = etag,
Expand All @@ -323,6 +326,7 @@ bool SQLiteMultipart::finish_part(

bool SQLiteMultipart::abort(const std::string& upload_id) const {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto committed = storage.transaction([&]() mutable {
storage.update_all(
set(c(&DBMultipart::state) = MultipartState::ABORTED,
Expand Down Expand Up @@ -361,6 +365,7 @@ static int _mark_complete(

bool SQLiteMultipart::mark_complete(const std::string& upload_id) const {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto committed = storage.transaction([&]() mutable {
auto num_complete = _mark_complete(storage, upload_id);
if (num_complete == 0) {
Expand All @@ -378,6 +383,7 @@ bool SQLiteMultipart::mark_complete(
) const {
ceph_assert(duplicate != nullptr);
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto committed = storage.transaction([&]() mutable {
auto entries = storage.get_all<DBMultipart>(
where(is_equal(&DBMultipart::upload_id, upload_id))
Expand All @@ -404,6 +410,7 @@ bool SQLiteMultipart::mark_complete(

bool SQLiteMultipart::mark_aggregating(const std::string& upload_id) const {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto committed = storage.transaction([&]() mutable {
storage.update_all(
set(c(&DBMultipart::state) = MultipartState::AGGREGATING,
Expand All @@ -426,6 +433,7 @@ bool SQLiteMultipart::mark_aggregating(const std::string& upload_id) const {

bool SQLiteMultipart::mark_done(const std::string& upload_id) const {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto committed = storage.transaction([&]() mutable {
storage.update_all(
set(c(&DBMultipart::state) = MultipartState::DONE,
Expand Down Expand Up @@ -467,6 +475,7 @@ SQLiteMultipart::remove_multiparts_by_bucket_id_transact(
DBDeletedMultipartItems ret_parts;
auto& storage = conn->get_storage();
RetrySQLite<DBDeletedMultipartItems> retry([&]() {
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
// get first the list of parts to be deleted up to max_items
ret_parts = storage.select(
Expand Down Expand Up @@ -533,6 +542,7 @@ SQLiteMultipart::remove_done_or_aborted_multiparts_transact(uint max_items
DBDeletedMultipartItems ret_parts;
auto& storage = conn->get_storage();
RetrySQLite<DBDeletedMultipartItems> retry([&]() {
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
// get first the list of parts to be deleted up to max_items
ret_parts = storage.select(
Expand Down
6 changes: 6 additions & 0 deletions src/rgw/driver/sfs/sqlite/sqlite_versioned_objects.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ bool SQLiteVersionedObjects::store_versioned_object_if_state(
const DBVersionedObject& object, std::vector<ObjectState> allowed_states
) const {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
transaction.commit_on_destroy = true;
storage.update_all(
Expand Down Expand Up @@ -148,6 +149,7 @@ bool SQLiteVersionedObjects::
) const {
auto& storage = conn->get_storage();
RetrySQLite<bool> retry([&]() {
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
storage.update_all(
set(c(&DBVersionedObject::object_id) = object.object_id,
Expand Down Expand Up @@ -299,6 +301,7 @@ SQLiteVersionedObjects::delete_version_and_get_previous_transact(
) const {
try {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
std::optional<DBVersionedObject> ret_value = std::nullopt;
storage.remove<DBVersionedObject>(id);
Expand Down Expand Up @@ -337,6 +340,7 @@ uint SQLiteVersionedObjects::add_delete_marker_transact(
added = false;
try {
auto& storage = conn->get_storage();
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
auto last_version_select = storage.get_all<DBVersionedObject>(
where(
Expand Down Expand Up @@ -450,6 +454,7 @@ SQLiteVersionedObjects::create_new_versioned_object_transact(
) const {
auto& storage = conn->get_storage();
RetrySQLite<DBVersionedObject> retry([&]() {
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
auto objs = storage.select(
columns(&DBObject::uuid),
Expand Down Expand Up @@ -492,6 +497,7 @@ SQLiteVersionedObjects::remove_deleted_versions_transact(uint max_objects
DBDeletedObjectItems ret_objs;
auto& storage = conn->get_storage();
RetrySQLite<DBDeletedObjectItems> retry([&]() {
std::lock_guard l(conn->lock);
auto transaction = storage.transaction_guard();
// get first the list of objects to be deleted up to max_objects
// order by size so when we delete the versions data we are more efficient
Expand Down

0 comments on commit 2567cee

Please sign in to comment.