Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Condition variable and Mutex blocks after one process crashes and restart #200

Open
lioriz opened this issue Aug 24, 2023 · 1 comment
Open

Comments

@lioriz
Copy link

lioriz commented Aug 24, 2023

See: Stack overflow question

I have an interprocess condition variable to sync my processes, there is one notifier that notify and two waiters that timed_wait for notification or timeout.

If one of the waiters crashes and restarts, the three processes are blocked.

The way to fix it is to kill them, delete the shared memory, and restart them.

Processes sync shared memory class:

class ProcessSync {
public:
    static ProcessSync *OpenOrCreate(bip::managed_shared_memory &sync_segment, const std::string& sync_name) {
        while (true) {
            std::string object_name = sync_name + OBJECT_NAME;
            std::string lock_name = sync_name + LOCK;
            try {
                bip::named_mutex sync_lock{bip::open_or_create, lock_name.c_str()};
                bip::scoped_lock<bip::named_mutex> lock(sync_lock, boost::get_system_time() + boost::posix_time::seconds(1));
                if (lock) {
                    sync_segment = {bip::open_or_create, sync_name.c_str(),
                                               int(sizeof(ProcessSync) * 10)};
                    return (sync_segment.find_or_construct<ProcessSync>(
                        (object_name.c_str()))());
                } else {
                    LOG(WARNING) << "can't acquire lock for opening the " + sync_name;
                    throw std::exception();
                }
            } catch (std::exception &e) {
                LOG(WARNING) << "ProcessSync could not be created.";
                LOG(WARNING) << e.what();
                RemoveShmByName(sync_name);
                RemoveLockByName(lock_name);
                sleep(10);
            }
        }
    }

    static ProcessSync *Open(bip::managed_shared_memory &sync_segment, const std::string& sync_name) {
        while (true) {
            std::string object_name = sync_name + OBJECT_NAME;
            std::string lock_name = sync_name + LOCK;
            try {
                bip::named_mutex sync_lock{bip::open_only, lock_name.c_str()};
                bip::scoped_lock<bip::named_mutex> lock(sync_lock, bip::try_to_lock);
                if (lock) {
                    sync_segment = bip::managed_shared_memory(bip::open_only, sync_name.c_str());
                    return sync_segment.find<ProcessSync>(object_name.c_str()).first;
                } else {
                    LOG(WARNING) << "can't acquire lock for opening the shm " + sync_name;
                    throw std::exception();
                }
            } catch (std::exception &e) {
                LOG(WARNING) << sync_name + " shm not set yet.";
                LOG(WARNING) << e.what();
                sleep(5);
            }
        }
    }

    static void RemoveLockByName(const std::string & lock_name) {
        try {
            bip::named_mutex::remove(lock_name.c_str());
            LOG(INFO) << lock_name + " deleted.";
        } catch (std::exception &e) {
            LOG(WARNING) << lock_name + " NOT deleted.";
            LOG(WARNING) << e.what();
        }
    }
    
    static void RemoveShmByName(const std::string & sync_name) {
        try {
            bip::shared_memory_object::remove(sync_name.c_str());
            LOG(INFO) << sync_name + " deleted.";
        } catch (std::exception &e) {
            LOG(WARNING) << sync_name + " NOT deleted.";
            LOG(WARNING) << e.what();
        }
    
    }

    void WaitForSyncNotification(int timeout = CV_WAIT_TIMEOUT) {
        bip::scoped_lock<boost::interprocess::interprocess_mutex> lock(process_sync_cv_mutex_);
        bpt::ptime wait_timeout = bpt::second_clock::local_time() + bpt::seconds(timeout);
        process_sync_cv_.timed_wait(lock, wait_timeout);
    }

    void NotifySync(){
        process_sync_cv_.notify_all();
    }

private:
    bip::interprocess_mutex process_sync_cv_mutex_;
    bip::interprocess_condition process_sync_cv_;
};

First process notifier:

bip::managed_shared_memory sync_segment;
ProcessSync *sync = ProcessSync::OpenOrCreate(sync_segment, "sync");
while(){
    //do something
    sync->NotifySync();
}

Second/Third processes waiters:

bip::managed_shared_memory sync_segment;
ProcessSync *sync = ProcessSync::Open(sync_segment, "sync");
while(){
    sync->WaitForSyncNotification();
    //do something
}

Boost version 1.78.0

Linux Ubuntu 18.04.5 LTS

Arch aarch64

On a normal run, everything works as expected, and when one waiter crashes (non-boost related issues) and restarts the three processes are blocked on the condition variable, notifier blocked on notify, first waiter blocked on timed_wait, crashed waiter blocked on mutex lock.

In case the notifier is up but not in the while loop yet, the waiters can crash and restart, and the behavior of waiting until timeout will continue, but when the notifier enters the while loop and notify the three of them get stuck, notifier block on notify, first waiter block on timed_wait, crashed waiter block on mutex lock.

It happened just when I crashed one of the waiters when it blocked on timed_wait, not when I crashed the notifier.

@wangbinio
Copy link

i have the same question with the simple code.

#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <thread>
#include <exception>
#include <iostream>

using namespace std;
using namespace boost::interprocess;

int main(int argc, char *argv[])
{
    try {
    if (argc == 1) {
        named_condition::remove("my_named_condition");
        named_mutex::remove("my_named_mutex");
        named_mutex mutex(create_only, "my_named_mutex");
        named_condition condition(create_only, "my_named_condition");

        while (true) {
            cout << "work..." << endl;
            this_thread::sleep_for(chrono::seconds(1));
            condition.notify_all();
        }
    } else {
        named_mutex mutex(open_only, "my_named_mutex");
        named_condition condition(open_only, "my_named_condition");
        while (true) {
            cout << "wait..." << endl;
            scoped_lock<named_mutex> lock(mutex);
            condition.wait(lock);
            cout << "notified" << endl;
        }
    }
    } catch (std::exception &e) {
        std::cout << e.what() << std::endl;
    }
    return 0;
}

one notifier and some waiter, when terminate a waiter, then the notifier and others all blocking.
i want use named_condition to sync process, but meet this question, and i don't konw how to solve it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants