Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在bthread_start_background(...)创建bthread后,在内部起pthread是定义良好的行为吗,pthread执行中使用CountdownEvent涉及butex_wait是走pthread还是bthread的逻辑呢? #2818

Open
imdouyu opened this issue Nov 8, 2024 · 5 comments

Comments

@imdouyu
Copy link
Contributor

imdouyu commented Nov 8, 2024

Describe the bug (描述bug)

boost::lockfree::queue queue;

class Producer {
public:
  Producer(int worker_size) {
    if (bthread_start_background(&bid_, NULL, call_back, nullptr) != 0) {
      cout << "Fail to create bthread" << endl;
    }
    // init worker
  }

  void *call_back() {
    foo();
    return nullptr;
  }

  void foo() {
    boost::asio::thread_pool pool(3);
    while (!bthread_stopped(bid_)) {
      DataPtr data;
      data = std::move(kafka->consume());
      boost::asio::post(pool, [data]() { bar(data); });
    }
    pool.join();
  }

  void bar(DataPtr data) {
    auto idx = rand();
    while (true) {
      auto &worker = workers[idx % workers.size()];
      if (worker->Submit(data)) {
        break;
      } else {
        idx++;
      }
    }
  }
  bthread_t bid_;
  vector<WorkerPtr> workers;
};

class Worker {
public:
  Worker(int batch_size) {
    // init member
  }
  bool Submit(DataPtr data) {
    auto idx = index.fetch_add(1);
    if (idx < batch_size) {
      extract(data, idx);
      return true;
    } else if (idx == batch_size) {
      batch_count_down_event.wait();
      queue.push(std::move(batch_data));
      batch_data.resize(batch_size);
      index = 0;
      batch_count_down_event.reset(batch_size);
      return false;
    }
    return false;
  }

  void extractData(DataPtr data, int idx) {
    batch_data[idx] = do_sth(data);
    batch_count_down_event.signal(1);
  }

  int batch_size;
  vector<DataProcessed> batch_data;
  bthread::CountdownEvent batch_count_down_event;
  std::atomic_uint64_t index{0};
};

int main() { producer = new Producer(); }

To Reproduce (复现方法)

Expected behavior (期望行为)

Versions (各种版本)
OS:
Compiler:
brpc:
protobuf:

Additional context/screenshots (更多上下文/截图)

@Huixxi
Copy link
Contributor

Huixxi commented Nov 9, 2024

辛苦贴下具体代码

@imdouyu
Copy link
Contributor Author

imdouyu commented Nov 11, 2024

辛苦贴下具体代码

已更新

@chenBright
Copy link
Contributor

  1. bthread内可以起pthread。但是pthread_join会阻塞worker线程,慎用。
  2. bthread同步原语支持bthread和pthread。在pthread执行的是pthread的同步逻辑。

@imdouyu
Copy link
Contributor Author

imdouyu commented Nov 12, 2024

  1. bthread内可以起pthread。但是pthread_join会阻塞worker线程,慎用。
  2. bthread同步原语支持bthread和pthread。在pthread执行的是pthread的同步逻辑。

这里说的worker线程指bthread运行的线程吗

@chenBright
Copy link
Contributor

是的

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants