Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

无法使用智能指针 #120

Open
pengrui2009 opened this issue Jun 13, 2024 · 0 comments
Open

无法使用智能指针 #120

pengrui2009 opened this issue Jun 13, 2024 · 0 comments

Comments

@pengrui2009
Copy link

测试系统:Ubuntu 20.04
测试代码:demo/msg_que/main.cpp(修改msg_que_t 为std::shared_ptr<msg_que_t>)
具体代码:


#include <signal.h>

#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>

#include "libipc/ipc.h"
#include "capo/random.hpp"

namespace {

constexpr char const name__  [] = "ipc-msg-que";
constexpr char const mode_s__[] = "s";
constexpr char const mode_r__[] = "r";

constexpr std::size_t const min_sz = 128;
constexpr std::size_t const max_sz = 1024 * 16;

std::atomic<bool> is_quit__ {false};
std::atomic<std::size_t> size_counter__ {0};

using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;

// msg_que_t que__{ name__ };
std::shared_ptr<msg_que_t> que_ptr__{nullptr};
ipc::byte_t buff__[max_sz];
capo::random<> rand__{ 
    static_cast<int>(min_sz), 
    static_cast<int>(max_sz)
};

inline std::string str_of_size(std::size_t sz) noexcept {
    if (sz > 1024 * 1024) {
        return std::to_string(sz / (1024 * 1024)) + " MB";
    }
    if (sz > 1024) {
        return std::to_string(sz / 1024) + " KB";
    }
    return std::to_string(sz) + " bytes";
}

inline std::string speed_of(std::size_t sz) noexcept {
    return str_of_size(sz) + "/s";
}

void do_counting() {
    for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms
        if (i % 10) continue;
        i = 0;
        std::cout
            << speed_of(size_counter__.exchange(0, std::memory_order_relaxed))
            << std::endl;
    }
}

void do_send() {
    std::cout 
        << __func__ << ": start [" 
        << str_of_size(min_sz) << " - " << str_of_size(max_sz) 
        << "]...\n";
    if (!que_ptr__->reconnect(ipc::sender)) {
        std::cerr << __func__ << ": connect failed.\n";
    }
    else {
        std::thread counting{ do_counting };
        while (!is_quit__.load(std::memory_order_acquire)) {
            std::size_t sz = static_cast<std::size_t>(rand__());
            if (!que_ptr__->send(ipc::buff_t(buff__, sz))) {
                std::cerr << __func__ << ": send failed.\n";
                std::cout << __func__ << ": waiting for receiver...\n";
                if (!que_ptr__->wait_for_recv(1)) {
                    std::cerr << __func__ << ": wait receiver failed.\n";
                    is_quit__.store(true, std::memory_order_release);
                    break;
                }
            }
            size_counter__.fetch_add(sz, std::memory_order_relaxed);
            std::this_thread::yield();
        }
        counting.join();
    }
    std::cout << __func__ << ": quit...\n";
}

void do_recv() {
    std::cout
        << __func__ << ": start ["
        << str_of_size(min_sz) << " - " << str_of_size(max_sz)
        << "]...\n";
    if (!que_ptr__->reconnect(ipc::receiver)) {
        std::cerr << __func__ << ": connect failed.\n";
    }
    else {
        std::thread counting{ do_counting };
        while (!is_quit__.load(std::memory_order_acquire)) {
            auto msg = que_ptr__->recv();
            if (msg.empty()) break;
            size_counter__.fetch_add(msg.size(), std::memory_order_relaxed);
        }
        counting.join();
    }
    std::cout << __func__ << ": quit...\n";
}

} // namespace

int main(int argc, char ** argv) {
    if (argc < 2) return 0;

    auto exit = [](int) {
        is_quit__.store(true, std::memory_order_release);
        que_ptr__->disconnect();
    };
    ::signal(SIGINT  , exit);
    ::signal(SIGABRT , exit);
    ::signal(SIGSEGV , exit);
    ::signal(SIGTERM , exit);
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
    defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
    defined(WINCE) || defined(_WIN32_WCE)
    ::signal(SIGBREAK, exit);
#else
    ::signal(SIGHUP  , exit);
#endif
    
    que_ptr__ = std::make_shared<msg_que_t>(name__ );
    std::string mode {argv[1]};
    if (mode == mode_s__) {
        do_send();
    } else if (mode == mode_r__) {
        do_recv();
    }
    return 0;
}

运行结果:

do_recv: start [128 bytes - 16 KB]...
0 bytes/s
^Cdo_recv: quit...
Segmentation fault (core dumped)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant