From 491dc725869d104de354a82707e3f6af59d123b4 Mon Sep 17 00:00:00 2001 From: Olli Saarikivi Date: Tue, 22 Aug 2023 11:02:23 -0700 Subject: [PATCH] Implementation of API changes compiles Tests are broken still --- include/mscclpp/core.hpp | 80 ++++++++++++++------------- python/core_py.cpp | 5 +- src/communicator.cc | 102 ++++++++++++++++------------------- src/connection.cc | 23 ++++---- src/context.cc | 11 ++-- src/include/communicator.hpp | 19 ++++--- src/include/connection.hpp | 9 ++-- src/include/context.hpp | 6 +-- src/include/endpoint.hpp | 2 +- src/registered_memory.cc | 3 +- src/semaphore.cc | 10 ++-- 11 files changed, 127 insertions(+), 143 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 4ebc4591d..ebd65ca15 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -301,15 +301,11 @@ std::string getIBDeviceName(Transport ibTransport); /// @return The InfiniBand transport associated with the specified device name. Transport getIBTransportByDeviceName(const std::string& ibDeviceName); -class Communicator; -class Connection; - /// Represents a block of memory that has been registered to a @ref Communicator. class RegisteredMemory { - protected: + public: struct Impl; - public: /// Default constructor. RegisteredMemory() = default; @@ -347,14 +343,7 @@ class RegisteredMemory { /// @return A deserialized RegisteredMemory object. static RegisteredMemory deserialize(const std::vector& data); - friend class Connection; - friend class Context; - friend class IBConnection; - friend class Communicator; - - private: - // A shared_ptr is used since RegisteredMemory is functionally immutable, although internally some state is populated - // lazily. + /// Pointer to the internal implementation of the RegisteredMemory class. A shared_ptr is used since RegisteredMemory is immutable. std::shared_ptr pimpl; }; @@ -391,21 +380,17 @@ class Connection { /// /// @return The transport used by the remote process. virtual Transport remoteTransport() = 0; - - protected: - /// Get the implementation object associated with a @ref RegisteredMemory object. - /// - /// @param memory The @ref RegisteredMemory object. - /// @return A shared pointer to the implementation object. - static std::shared_ptr getRegisteredMemoryImpl(RegisteredMemory& memory); }; /// Represents one end of a connection. class Endpoint { - protected: - struct Impl; - public: + /// Default constructor. + Endpoint() = default; + + /// Destructor. + ~Endpoint(); + /// Get the transport used. /// /// @return The transport used. @@ -421,16 +406,20 @@ class Endpoint { /// @param data A vector of characters representing a serialized Endpoint object. /// @return A deserialized Endpoint object. static Endpoint deserialize(const std::vector& data); + + /// The interal implementation of the Endpoint class. + struct Impl; - private: - // A shared_ptr is used since Endpoint is immutable. + /// Constructor that takes a shared pointer to an implementation object. + /// + /// @param pimpl A shared pointer to an implementation object. + Endpoint(std::shared_ptr pimpl); + + /// Pointer to the internal implementation of the Endpoint class. A shared_ptr is used since Endpoint is immutable. std::shared_ptr pimpl; }; class Context { - protected: - struct Impl; - public: /// Create a context. /// @@ -461,10 +450,10 @@ class Context { /// @return std::shared_ptr A shared pointer to the connection. std::shared_ptr connect(Endpoint localEndpoint, Endpoint remoteEndpoint); - friend class RegisteredMemory::Impl; + /// The interal implementation of the Context class. + struct Impl; - private: - /// Unique pointer to the implementation of the Communicator class. + /// Pointer to the internal implementation of the Context class. std::unique_ptr pimpl; }; @@ -531,9 +520,6 @@ class NonblockingFuture { /// 6. All done; use connections and registered memories to build channels. /// class Communicator { - protected: - struct Impl; - public: /// Initializes the communicator with a given bootstrap implementation. /// @@ -553,6 +539,14 @@ class Communicator { /// @return std::shared_ptr The context held by this communicator. std::shared_ptr context(); + /// Register a region of GPU memory for use in this communicator's context. + /// + /// @param ptr Base pointer to the memory. + /// @param size Size of the memory region in bytes. + /// @param transports Transport flags. + /// @return RegisteredMemory A handle to the buffer. + RegisteredMemory registerMemory(void* ptr, size_t size, TransportFlags transports); + /// Send information of a registered memory to the remote side on setup. /// /// This function registers a send to a remote process that will happen by a following call of @ref setup(). The send @@ -587,6 +581,18 @@ class Communicator { /// @return NonblockingFuture> A non-blocking future of shared pointer to the connection. NonblockingFuture> connectOnSetup(int remoteRank, int tag, Transport transport); + /// Get the remote rank a connection is connected to. + /// + /// @param connection The connection to get the remote rank for. + /// @return The remote rank the connection is connected to. + int remoteRankOf(const Connection& connection); + + /// Get the tag a connection was made with. + /// + /// @param connection The connection to get the tag for. + /// @return The tag the connection was made with. + int tagOf(const Connection& connection); + /// Add a custom Setuppable object to a list of objects to be setup later, when @ref setup() is called. /// /// @param setuppable A shared pointer to the Setuppable object. @@ -599,10 +605,10 @@ class Communicator { /// that have been registered after the (n-1)-th call. void setup(); - friend class IBConnection; + /// The interal implementation of the Communicator class. + struct Impl; - private: - /// Unique pointer to the implementation of the Communicator class. + /// Pointer to the internal implementation of the Communicator class. std::unique_ptr pimpl; }; diff --git a/python/core_py.cpp b/python/core_py.cpp index fb5052887..e009121ad 100644 --- a/python/core_py.cpp +++ b/python/core_py.cpp @@ -104,7 +104,6 @@ void register_core(nb::module_& m) { .def(nb::init<>()) .def("data", &RegisteredMemory::data) .def("size", &RegisteredMemory::size) - .def("rank", &RegisteredMemory::rank) .def("transports", &RegisteredMemory::transports) .def("serialize", &RegisteredMemory::serialize) .def_static("deserialize", &RegisteredMemory::deserialize, nb::arg("data")); @@ -119,8 +118,6 @@ void register_core(nb::module_& m) { }, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("src"), nb::arg("newValue")) .def("flush", &Connection::flush) - .def("remote_rank", &Connection::remoteRank) - .def("tag", &Connection::tag) .def("transport", &Connection::transport) .def("remote_transport", &Connection::remoteTransport); @@ -140,6 +137,8 @@ void register_core(nb::module_& m) { .def("recv_memory_on_setup", &Communicator::recvMemoryOnSetup, nb::arg("remoteRank"), nb::arg("tag")) .def("connect_on_setup", &Communicator::connectOnSetup, nb::arg("remoteRank"), nb::arg("tag"), nb::arg("transport")) + .def("remote_rank_of", &Communicator::remoteRankOf) + .def("tag_of", &Communicator::tagOf) .def("setup", &Communicator::setup); } diff --git a/src/communicator.cc b/src/communicator.cc index 0480f0231..d5c750f08 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -15,46 +15,27 @@ namespace mscclpp { -Communicator::Impl::Impl(std::shared_ptr bootstrap) : bootstrap_(bootstrap) { - rankToHash_.resize(bootstrap->getNranks()); - auto hostHash = getHostHash(); - INFO(MSCCLPP_INIT, "Host hash: %lx", hostHash); - rankToHash_[bootstrap->getRank()] = hostHash; - bootstrap->allGather(rankToHash_.data(), sizeof(uint64_t)); - - MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&ipcStream_, cudaStreamNonBlocking)); -} - -Communicator::Impl::~Impl() { - ibContexts_.clear(); - - cudaStreamDestroy(ipcStream_); -} - -IbCtx* Communicator::Impl::getIbContext(Transport ibTransport) { - // Find IB context or create it - auto it = ibContexts_.find(ibTransport); - if (it == ibContexts_.end()) { - auto ibDev = getIBDeviceName(ibTransport); - ibContexts_[ibTransport] = std::make_unique(ibDev); - return ibContexts_[ibTransport].get(); +Communicator::Impl::Impl(std::shared_ptr bootstrap, std::shared_ptr context) : bootstrap_(bootstrap) { + if (!context) { + context_ = std::make_shared(); } else { - return it->second.get(); + context_ = context; } -} -cudaStream_t Communicator::Impl::getIpcStream() { return ipcStream_; } + rankToHash_.resize(bootstrap->getNranks()); + rankToHash_[bootstrap->getRank()] = context_->pimpl->hostHash_; + bootstrap->allGather(rankToHash_.data(), sizeof(uint64_t)); +} MSCCLPP_API_CPP Communicator::~Communicator() = default; -MSCCLPP_API_CPP Communicator::Communicator(std::shared_ptr bootstrap) - : pimpl(std::make_unique(bootstrap)) {} +MSCCLPP_API_CPP Communicator::Communicator(std::shared_ptr bootstrap, std::shared_ptr context) + : pimpl(std::make_unique(bootstrap, context)) {} MSCCLPP_API_CPP std::shared_ptr Communicator::bootstrap() { return pimpl->bootstrap_; } MSCCLPP_API_CPP RegisteredMemory Communicator::registerMemory(void* ptr, size_t size, TransportFlags transports) { - return RegisteredMemory( - std::make_shared(ptr, size, pimpl->bootstrap_->getRank(), transports, *pimpl)); + return context()->registerMemory(ptr, size, transports); } struct MemorySender : public Setuppable { @@ -94,34 +75,41 @@ MSCCLPP_API_CPP NonblockingFuture Communicator::recvMemoryOnSe return NonblockingFuture(memoryReceiver->memoryPromise_.get_future()); } -MSCCLPP_API_CPP std::shared_ptr Communicator::connectOnSetup(int remoteRank, int tag, Transport transport) { - std::shared_ptr conn; - if (transport == Transport::CudaIpc) { - // sanity check: make sure the IPC connection is being made within a node - if (pimpl->rankToHash_[remoteRank] != pimpl->rankToHash_[pimpl->bootstrap_->getRank()]) { - std::stringstream ss; - ss << "Cuda IPC connection can only be made within a node: " << remoteRank << "(" << std::hex - << pimpl->rankToHash_[remoteRank] << ") != " << pimpl->bootstrap_->getRank() << "(" << std::hex - << pimpl->rankToHash_[pimpl->bootstrap_->getRank()] << ")"; - throw mscclpp::Error(ss.str(), ErrorCode::InvalidUsage); - } - auto cudaIpcConn = std::make_shared(remoteRank, tag, pimpl->getIpcStream()); - conn = cudaIpcConn; - INFO(MSCCLPP_P2P, "Cuda IPC connection between rank %d(%lx) and remoteRank %d(%lx) created", - pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], remoteRank, - pimpl->rankToHash_[remoteRank]); - } else if (AllIBTransports.has(transport)) { - auto ibConn = std::make_shared(remoteRank, tag, transport, *pimpl); - conn = ibConn; - INFO(MSCCLPP_NET, "IB connection between rank %d(%lx) via %s and remoteRank %d(%lx) created", - pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], - getIBDeviceName(transport).c_str(), remoteRank, pimpl->rankToHash_[remoteRank]); - } else { - throw mscclpp::Error("Unsupported transport", ErrorCode::InternalError); +struct Connector : public Setuppable { + Connector(Communicator& comm, int remoteRank, int tag, Transport transport) : comm_(comm), remoteRank_(remoteRank), tag_(tag), localEndpoint_(comm.context()->createEndpoint(transport)) {} + + void beginSetup(std::shared_ptr bootstrap) override { + bootstrap->send(localEndpoint_.serialize(), remoteRank_, tag_); } - pimpl->connections_.push_back(conn); - onSetup(conn); - return conn; + + void endSetup(std::shared_ptr bootstrap) override { + std::vector data; + bootstrap->recv(data, remoteRank_, tag_); + auto remoteEndpoint = Endpoint::deserialize(data); + auto connection = comm_.context()->connect(localEndpoint_, remoteEndpoint); + comm_.pimpl->connectionInfos_[connection.get()] = {remoteRank_, tag_}; + connectionPromise_.set_value(connection); + } + + std::promise> connectionPromise_; + Communicator& comm_; + int remoteRank_; + int tag_; + Endpoint localEndpoint_; +}; + +MSCCLPP_API_CPP NonblockingFuture> Communicator::connectOnSetup(int remoteRank, int tag, Transport transport) { + auto connector = std::make_shared(*this, remoteRank, tag, transport); + onSetup(connector); + return NonblockingFuture>(connector->connectionPromise_.get_future()); +} + +MSCCLPP_API_CPP int Communicator::remoteRankOf(const Connection& connection) { + return pimpl->connectionInfos_.at(&connection).remoteRank; +} + +MSCCLPP_API_CPP int Communicator::tagOf(const Connection& connection) { + return pimpl->connectionInfos_.at(&connection).tag; } MSCCLPP_API_CPP void Communicator::onSetup(std::shared_ptr setuppable) { diff --git a/src/connection.cc b/src/connection.cc index ec967dcec..1db24f57b 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -3,8 +3,10 @@ #include "connection.hpp" +#include #include +#include "endpoint.hpp" #include "debug.h" #include "infiniband/verbs.h" #include "npkit/npkit.h" @@ -18,16 +20,10 @@ void validateTransport(RegisteredMemory mem, Transport transport) { } } -// Connection - -std::shared_ptr Connection::getRegisteredMemoryImpl(RegisteredMemory& memory) { - return memory.pimpl; -} - // CudaIpcConnection -CudaIpcConnection::CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Context::Impl& contextImpl) - : stream_(contextImpl->getIpcStream()) { +CudaIpcConnection::CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, cudaStream_t stream) + : stream_(stream) { if (localEndpoint.transport() != Transport::CudaIpc) { throw mscclpp::Error("Cuda IPC connection can only be made from a Cuda IPC endpoint", ErrorCode::InvalidUsage); } @@ -83,7 +79,7 @@ void CudaIpcConnection::flush() { // IBConnection -IBConnection::IBConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Context::Impl& contextImpl) +IBConnection::IBConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Context& context) : transport_(localEndpoint.transport()), remoteTransport_(remoteEndpoint.transport()), numSignaledSends(0), @@ -91,8 +87,7 @@ IBConnection::IBConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Cont qp = localEndpoint.pimpl->ibQp_; qp->rtr(remoteEndpoint.pimpl->ibQpInfo_); qp->rts(); - dummyAtomicSourceMem_ = RegisteredMemory(std::make_shared( - dummyAtomicSource_.get(), sizeof(uint64_t), transport_, contextImpl)); + dummyAtomicSourceMem_ = context.registerMemory(dummyAtomicSource_.get(), sizeof(uint64_t), transport_); INFO(MSCCLPP_NET, "IB connection via %s created", getIBDeviceName(transport_).c_str()); } @@ -105,11 +100,11 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem validateTransport(dst, remoteTransport()); validateTransport(src, transport()); - auto dstTransportInfo = getRegisteredMemoryImpl(dst)->getTransportInfo(remoteTransport()); + auto dstTransportInfo = dst.pimpl->getTransportInfo(remoteTransport()); if (dstTransportInfo.ibLocal) { throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage); } - auto srcTransportInfo = getRegisteredMemoryImpl(src)->getTransportInfo(transport()); + auto srcTransportInfo = src.pimpl->getTransportInfo(transport()); if (!srcTransportInfo.ibLocal) { throw Error("src is remote, which is not supported", ErrorCode::InvalidUsage); } @@ -129,7 +124,7 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { validateTransport(dst, remoteTransport()); - auto dstTransportInfo = getRegisteredMemoryImpl(dst)->getTransportInfo(remoteTransport()); + auto dstTransportInfo = dst.pimpl->getTransportInfo(remoteTransport()); if (dstTransportInfo.ibLocal) { throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage); } diff --git a/src/context.cc b/src/context.cc index 4e89cb609..4cafffc95 100644 --- a/src/context.cc +++ b/src/context.cc @@ -10,11 +10,12 @@ #include "debug.h" #include "registered_memory.hpp" #include "utils_internal.hpp" +#include "endpoint.hpp" namespace mscclpp { Context::Impl::Impl() : ipcStream_(cudaStreamNonBlocking), hostHash_(getHostHash()) { - INFO(MSCCLPP_INIT, "Host hash: %lx", hostHash); + INFO(MSCCLPP_INIT, "Host hash: %lx", hostHash_); } IbCtx* Context::Impl::getIbContext(Transport ibTransport) { @@ -38,16 +39,16 @@ MSCCLPP_API_CPP RegisteredMemory Context::registerMemory(void* ptr, size_t size, std::make_shared(ptr, size, -1, transports, *pimpl)); } -MSCCLPP_API_CPP Endpoint createEndpoint(Transport transport) { +MSCCLPP_API_CPP Endpoint Context::createEndpoint(Transport transport) { return Endpoint(std::make_shared(transport, *pimpl)); } MSCCLPP_API_CPP std::shared_ptr Context::connect(Endpoint localEndpoint, Endpoint remoteEndpoint) { - std::shared_ptr conn; + std::shared_ptr conn; if (localEndpoint.transport() == Transport::CudaIpc) { - conn = std::make_shared(localEndpoint, remoteEndpoint, *pimpl); + conn = std::make_shared(localEndpoint, remoteEndpoint, pimpl->ipcStream_); } else if (AllIBTransports.has(localEndpoint.transport())) { - conn = std::make_shared(localEndpoint, remoteEndpoint, *pimpl); + conn = std::make_shared(localEndpoint, remoteEndpoint, *this); } else { throw mscclpp::Error("Unsupported transport", ErrorCode::InternalError); } diff --git a/src/include/communicator.hpp b/src/include/communicator.hpp index 858a77cec..9c7d06e01 100644 --- a/src/include/communicator.hpp +++ b/src/include/communicator.hpp @@ -17,20 +17,19 @@ namespace mscclpp { class ConnectionBase; +struct ConnectionInfo { + int remoteRank; + int tag; +}; + struct Communicator::Impl { - std::vector> connections_; - std::vector> toSetup_; - std::unordered_map> ibContexts_; - cudaStream_t ipcStream_; std::shared_ptr bootstrap_; + std::shared_ptr context_; + std::unordered_map connectionInfos_; + std::vector> toSetup_; std::vector rankToHash_; - Impl(std::shared_ptr bootstrap); - - ~Impl(); - - IbCtx* getIbContext(Transport ibTransport); - cudaStream_t getIpcStream(); + Impl(std::shared_ptr bootstrap, std::shared_ptr context); }; } // namespace mscclpp diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 346a6b8c2..a939744f6 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -12,6 +12,7 @@ #include #include "communicator.hpp" +#include "context.hpp" #include "ib.hpp" #include "registered_memory.hpp" @@ -21,7 +22,7 @@ class CudaIpcConnection : public Connection { cudaStream_t stream_; public: - CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Context::Impl& contextImpl); + CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, cudaStream_t stream); Transport transport() override; @@ -44,7 +45,7 @@ class IBConnection : public Connection { mscclpp::TransportInfo dstTransportInfo_; public: - IBConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Context::Impl& contextImpl); + IBConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, Context& context); Transport transport() override; @@ -55,10 +56,6 @@ class IBConnection : public Connection { void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; - - void beginSetup(std::shared_ptr bootstrap) override; - - void endSetup(std::shared_ptr bootstrap) override; }; } // namespace mscclpp diff --git a/src/include/context.hpp b/src/include/context.hpp index 5ed46ef57..c55a78360 100644 --- a/src/include/context.hpp +++ b/src/include/context.hpp @@ -14,10 +14,8 @@ namespace mscclpp { -class ConnectionBase; - -struct Communicator::Impl { - std::vector> connections_; +struct Context::Impl { + std::vector> connections_; std::unordered_map> ibContexts_; CudaStreamWithFlags ipcStream_; uint64_t hostHash_; diff --git a/src/include/endpoint.hpp b/src/include/endpoint.hpp index 94143bd07..48573b881 100644 --- a/src/include/endpoint.hpp +++ b/src/include/endpoint.hpp @@ -21,7 +21,7 @@ struct Endpoint::Impl { // The following are only used for IB and are undefined for other transports. bool ibLocal_; - const IbQp* ibQp_; + IbQp* ibQp_; IbQpInfo ibQpInfo_; }; diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 2a3aab665..53bdc2025 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -8,6 +8,7 @@ #include #include +#include "context.hpp" #include "api.h" #include "debug.h" #include "utils_internal.hpp" @@ -60,8 +61,6 @@ MSCCLPP_API_CPP void* RegisteredMemory::data() { return pimpl->data; } MSCCLPP_API_CPP size_t RegisteredMemory::size() { return pimpl->size; } -MSCCLPP_API_CPP int RegisteredMemory::rank() { return pimpl->rank; } - MSCCLPP_API_CPP TransportFlags RegisteredMemory::transports() { return pimpl->transports; } MSCCLPP_API_CPP std::vector RegisteredMemory::serialize() { diff --git a/src/semaphore.cc b/src/semaphore.cc index 0b5269cab..921d32aac 100644 --- a/src/semaphore.cc +++ b/src/semaphore.cc @@ -12,8 +12,10 @@ static NonblockingFuture setupInboundSemaphoreId(Communicator& void* localInboundSemaphoreId) { auto localInboundSemaphoreIdsRegMem = communicator.registerMemory(localInboundSemaphoreId, sizeof(uint64_t), connection->transport()); - communicator.sendMemoryOnSetup(localInboundSemaphoreIdsRegMem, connection->remoteRank(), connection->tag()); - return communicator.recvMemoryOnSetup(connection->remoteRank(), connection->tag()); + int remoteRank = communicator.remoteRankOf(*connection); + int tag = communicator.tagOf(*connection); + communicator.sendMemoryOnSetup(localInboundSemaphoreIdsRegMem, remoteRank, tag); + return communicator.recvMemoryOnSetup(remoteRank, tag); } MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(Communicator& communicator, @@ -69,12 +71,12 @@ MSCCLPP_API_CPP SmDevice2DeviceSemaphore::SmDevice2DeviceSemaphore(Communicator& remoteInboundSemaphoreIdsRegMem_ = setupInboundSemaphoreId(communicator, connection.get(), localInboundSemaphore_.get()); INFO(MSCCLPP_INIT, "Creating a direct semaphore for CudaIPC transport from %d to %d", - communicator.bootstrap()->getRank(), connection->remoteRank()); + communicator.bootstrap()->getRank(), communicator.remoteRankOf(*connection)); isRemoteInboundSemaphoreIdSet_ = true; } else if (AllIBTransports.has(connection->transport())) { // We don't need to really with any of the IB transports, since the values will be local INFO(MSCCLPP_INIT, "Creating a direct semaphore for IB transport from %d to %d", - communicator.bootstrap()->getRank(), connection->remoteRank()); + communicator.bootstrap()->getRank(), communicator.remoteRankOf(*connection)); isRemoteInboundSemaphoreIdSet_ = false; } }