From 77dcd2f3ec79f4d3ac77f768eeea7e1a6b556606 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 2 Apr 2024 01:23:04 +0000 Subject: [PATCH 01/15] EthernetFeature v0 --- include/mscclpp/core.hpp | 12 +++- src/bootstrap/socket.cc | 32 +++++++++ src/connection.cc | 101 +++++++++++++++++++++++++++++ src/context.cc | 9 ++- src/core.cc | 2 +- src/endpoint.cc | 18 +++++ src/include/connection.hpp | 24 +++++++ src/include/endpoint.hpp | 9 +++ src/include/socket.h | 1 + src/registered_memory.cc | 4 +- test/mp_unit/communicator_tests.cu | 41 ++++++++++-- test/mp_unit/mp_unit_tests.hpp | 2 +- 12 files changed, 242 insertions(+), 13 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 02c277a3e..bd971cd3d 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -138,14 +138,15 @@ enum class Transport { IB5, // InfiniBand device 5 transport type. IB6, // InfiniBand device 6 transport type. IB7, // InfiniBand device 7 transport type. - NumTransports // The number of transports. + Ethernet, // Ethernet transport type. + NumTransports, // The number of transports. }; const std::string TransportNames[] = {"UNK", "IPC", "NVLS", "IB0", "IB1", "IB2", - "IB3", "IB4", "IB5", "IB6", "IB7", "NUM"}; + "IB3", "IB4", "IB5", "IB6", "IB7", "ETH", "NUM"}; namespace detail { -const size_t TransportFlagsSize = 11; +const size_t TransportFlagsSize = 12; static_assert(TransportFlagsSize == static_cast(Transport::NumTransports), "TransportFlagsSize must match the number of transports"); /// Bitset for storing transport flags. @@ -333,6 +334,11 @@ class RegisteredMemory { /// @return A pointer to the memory block. void* data() const; + /// Get a pointer to the original memory block. + /// + /// @return A pointer to the original memory block. + void* originalDataPtr() const; + /// Get the size of the memory block. /// /// @return The size of the memory block. diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index 2267af9b3..38d697b4c 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -539,6 +539,38 @@ void Socket::recv(void* ptr, int size) { socketWait(MSCCLPP_SOCKET_RECV, ptr, size, &offset); } +void Socket::recvUntilEnd(void *ptr, int size, int* closed){ + int offset = 0; + *closed = 0; + if (state_ != SocketStateReady) { + std::stringstream ss; + ss << "socket state (" << state_ << ") is not ready"; + throw Error(ss.str(), ErrorCode::InternalError); + } + + int bytes = 0; + char* data = (char*)ptr; + + do { + bytes = ::recv(fd_, data + (offset), size - (offset), 0); + if (bytes == 0) { + *closed = 1; + return; + } + if (bytes == -1) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN && state_ != SocketStateClosed) { + throw SysError("recv until end failed", errno); + } else { + bytes = 0; + } + } + (offset) += bytes; + if (abortFlag_ && *abortFlag_ != 0) { + throw Error("aborted", ErrorCode::Aborted); + } + } while (bytes > 0 && (offset) < size); +} + void Socket::close() { if (fd_ >= 0) ::close(fd_); state_ = SocketStateClosed; diff --git a/src/connection.cc b/src/connection.cc index 65b76b33f..a79439d7c 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -5,6 +5,7 @@ #include #include +#include #include "debug.h" #include "endpoint.hpp" @@ -180,4 +181,104 @@ void IBConnection::flush(int64_t timeoutUsec) { // npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT); } +// EthernetConnection + +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : stopRcvMessages(false), abortFlag_(0) { + // Validating Transport Protocol + if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { + throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); + } + + // Creating Thread to Accept the Connection + auto parameter = (getImpl(localEndpoint)->socket_).get(); + std::thread t([this, parameter]() { + rcvSocket = std::make_unique(nullptr, MSCCLPP_SOCKET_MAGIC, SocketTypeUnknown, abortFlag_); + rcvSocket->accept(parameter); + }); + + // Starting Connection + sendSocket = std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_); + sendSocket->connect(); + + // Ensure the Connection was Established + t.join(); + + // Starting Thread to Receive Messages + threadRcvMessages = std::thread([this]() { + int* response = (int*)malloc(sizeof(int)); + uint64_t size; + int* buffer; + int closed = 0; + bool received; + + while (!stopRcvMessages) { + received = true; + + if(closed == 0) rcvSocket->recvUntilEnd(&response, sizeof(int*), &closed); + received &= !closed; + + if(closed == 0) rcvSocket->recvUntilEnd(&size, sizeof(uint64_t), &closed); + received &= !closed; + + buffer = (int*)malloc(size); + if(closed == 0) rcvSocket->recvUntilEnd(buffer, size, &closed); + received &= !closed; + + if(received) mscclpp::memcpyCuda(response, buffer, size/sizeof(int), cudaMemcpyHostToDevice); + } + }); + + INFO(MSCCLPP_NET, "Ethernet connection created"); +} + +EthernetConnection::~EthernetConnection(){ + sendSocket->close(); + stopRcvMessages = true; + rcvSocket->close(); + threadRcvMessages.join(); +} + +Transport EthernetConnection::transport() { return Transport::Ethernet; } + +Transport EthernetConnection::remoteTransport() { return Transport::Ethernet; } + +void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) { + // Validating Transport Protocol + validateTransport(dst, remoteTransport()); + validateTransport(src, transport()); + + // Initalizing Variables + int* srcPtr = reinterpret_cast(src.data()) + srcOffset/sizeof(int); + int* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset/sizeof(int); + int* data; + + // Getting Data From GPU + data = (int*)malloc(size); + mscclpp::memcpyCuda(data, (int*)srcPtr, size/sizeof(int), cudaMemcpyDeviceToHost); + + // Sending Data + sendSocket->send(&dstPtr, sizeof(int*)); + sendSocket->send(&size, sizeof(uint64_t)); + sendSocket->send(data, size); + + INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size); +} + +void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { + validateTransport(dst, remoteTransport()); + uint64_t oldValue = *src; + *src = newValue; + uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.data()) + dstOffset); + + *dstPtr = newValue; + INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, + newValue); +} + +void EthernetConnection::flush(int64_t timeoutUsec) { + + INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); +} + } // namespace mscclpp diff --git a/src/context.cc b/src/context.cc index d04a8e32c..3a8ff4aa6 100644 --- a/src/context.cc +++ b/src/context.cc @@ -49,9 +49,16 @@ MSCCLPP_API_CPP std::shared_ptr Context::connect(Endpoint localEndpo throw mscclpp::Error("Local transport is IB but remote is not", ErrorCode::InvalidUsage); } conn = std::make_shared(localEndpoint, remoteEndpoint, *this); - } else { + } else if(localEndpoint.transport() == Transport::Ethernet) { + if (remoteEndpoint.transport() != Transport::Ethernet) { + throw mscclpp::Error("Local transport is Ethernet but remote is not", ErrorCode::InvalidUsage); + } + conn = std::make_shared(localEndpoint, remoteEndpoint); + } + else { throw mscclpp::Error("Unsupported transport", ErrorCode::InternalError); } + pimpl_->connections_.push_back(conn); return conn; } diff --git a/src/core.cc b/src/core.cc index 4d89250d0..32b67c3d2 100644 --- a/src/core.cc +++ b/src/core.cc @@ -87,7 +87,7 @@ const TransportFlags NoTransports = TransportFlags(); const TransportFlags AllIBTransports = Transport::IB0 | Transport::IB1 | Transport::IB2 | Transport::IB3 | Transport::IB4 | Transport::IB5 | Transport::IB6 | Transport::IB7; -const TransportFlags AllTransports = AllIBTransports | Transport::CudaIpc; +const TransportFlags AllTransports = AllIBTransports | Transport::CudaIpc | Transport::Ethernet; void Setuppable::beginSetup(std::shared_ptr) {} diff --git a/src/endpoint.cc b/src/endpoint.cc index dbc773898..f53bf622c 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -16,6 +16,17 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl) ->createQp(config.ibMaxCqSize, config.ibMaxCqPollNum, config.ibMaxSendWr, 0, config.ibMaxWrPerSend); ibQpInfo_ = ibQp_->getInfo(); } + else if(transport_ == Transport::Ethernet) { + // Configuring Ethernet Interfaces + abortFlag_ = 0; + int ret = FindInterfaces(netIfName_, &socketAddress_, MAX_IF_NAME_SIZE, 1, ""); + if (ret <= 0) throw Error("NET/Socket", ErrorCode::InternalError); + + // Starting Server Socket + socket_ = std::make_unique(&socketAddress_, 0xdeadbeef, SocketTypeBootstrap, abortFlag_); + socket_->listen(); + socketAddress_ = socket_->getAddr(); + } } MSCCLPP_API_CPP Transport Endpoint::transport() { return pimpl_->transport_; } @@ -27,6 +38,9 @@ MSCCLPP_API_CPP std::vector Endpoint::serialize() { if (AllIBTransports.has(pimpl_->transport_)) { std::copy_n(reinterpret_cast(&pimpl_->ibQpInfo_), sizeof(pimpl_->ibQpInfo_), std::back_inserter(data)); } + if ((pimpl_->transport_) == Transport::Ethernet) { + std::copy_n(reinterpret_cast(&pimpl_->socketAddress_), sizeof(pimpl_->socketAddress_), std::back_inserter(data)); + } return data; } @@ -45,6 +59,10 @@ Endpoint::Impl::Impl(const std::vector& serialization) { std::copy_n(it, sizeof(ibQpInfo_), reinterpret_cast(&ibQpInfo_)); it += sizeof(ibQpInfo_); } + if (transport_ == Transport::Ethernet){ + std::copy_n(it, sizeof(socketAddress_), reinterpret_cast(&socketAddress_)); + it += sizeof(socketAddress_); + } } MSCCLPP_API_CPP Endpoint::Endpoint(std::shared_ptr pimpl) : pimpl_(pimpl) {} diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 47b154758..61f99139a 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -11,6 +11,7 @@ #include "context.hpp" #include "ib.hpp" #include "registered_memory.hpp" +#include "socket.h" namespace mscclpp { @@ -53,6 +54,29 @@ class IBConnection : public Connection { void flush(int64_t timeoutUsec) override; }; +class EthernetConnection : public Connection { + std::unique_ptr sendSocket; + std::unique_ptr rcvSocket; + std::thread threadRcvMessages; + bool stopRcvMessages; + volatile uint32_t* abortFlag_; + + public: + EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint); + + ~EthernetConnection(); + + Transport transport() override; + + Transport remoteTransport() override; + + void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) override; + void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + + void flush(int64_t timeoutUsec) override; +}; + } // namespace mscclpp #endif // MSCCLPP_CONNECTION_HPP_ diff --git a/src/include/endpoint.hpp b/src/include/endpoint.hpp index 311fa9982..734a6c1bd 100644 --- a/src/include/endpoint.hpp +++ b/src/include/endpoint.hpp @@ -8,6 +8,9 @@ #include #include "ib.hpp" +#include "socket.h" + +#define MAX_IF_NAME_SIZE 16 namespace mscclpp { @@ -22,6 +25,12 @@ struct Endpoint::Impl { bool ibLocal_; IbQp* ibQp_; IbQpInfo ibQpInfo_; + + // The following are only used for Ethernet and are undefined for other transports. + std::unique_ptr socket_; + SocketAddress socketAddress_; + volatile uint32_t* abortFlag_; + char netIfName_[MAX_IF_NAME_SIZE + 1]; }; } // namespace mscclpp diff --git a/src/include/socket.h b/src/include/socket.h index 9f043414e..742a1cfe6 100644 --- a/src/include/socket.h +++ b/src/include/socket.h @@ -67,6 +67,7 @@ class Socket { void accept(const Socket* listenSocket, int64_t timeout = -1); void send(void* ptr, int size); void recv(void* ptr, int size); + void recvUntilEnd(void *ptr, int size, int* closed); void close(); int getFd() const { return fd_; } diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 6d5fd79f5..737385214 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -62,6 +62,8 @@ MSCCLPP_API_CPP RegisteredMemory::~RegisteredMemory() = default; MSCCLPP_API_CPP void* RegisteredMemory::data() const { return pimpl_->data; } +MSCCLPP_API_CPP void* RegisteredMemory::originalDataPtr() const { return pimpl_->originalDataPtr; } + MSCCLPP_API_CPP size_t RegisteredMemory::size() { return pimpl_->size; } MSCCLPP_API_CPP TransportFlags RegisteredMemory::transports() { return pimpl_->transports; } @@ -139,7 +141,7 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { } // Next decide how to set this->data - if (getHostHash() == this->hostHash && getPidHash() == this->pidHash) { + if ((getHostHash() == this->hostHash && getPidHash() == this->pidHash)) { // The memory is local to the process, so originalDataPtr is valid as is this->data = this->originalDataPtr; } else if (transports.has(Transport::CudaIpc) && getHostHash() == this->hostHash) { diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 30727667d..6d4e75954 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -42,15 +42,19 @@ void CommunicatorTestBase::TearDown() { void CommunicatorTestBase::setNumRanksToUse(int num) { numRanksToUse = num; } -void CommunicatorTestBase::connectMesh(bool useIbOnly) { +void CommunicatorTestBase::connectMesh(bool useIpc, bool useIb, bool useEthernet) { std::vector>> connectionFutures(numRanksToUse); for (int i = 0; i < numRanksToUse; i++) { if (i != gEnv->rank) { - if ((rankToNode(i) == rankToNode(gEnv->rank)) && !useIbOnly) { + if ((rankToNode(i) == rankToNode(gEnv->rank)) && useIpc) { connectionFutures[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc); - } else { + } + else if(useIb) { connectionFutures[i] = communicator->connectOnSetup(i, 0, ibTransport); } + else if(useEthernet) { + connectionFutures[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::Ethernet); + } } } communicator->setup(); @@ -97,7 +101,7 @@ void CommunicatorTest::SetUp() { ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); - connectMesh(); + connectMesh(false, false, true); devicePtr.resize(numBuffers); localMemory.resize(numBuffers); @@ -112,8 +116,10 @@ void CommunicatorTest::SetUp() { for (size_t n = 0; n < numBuffers; n++) { devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); - registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, - localMemory[n], remoteMemory[n]); + //registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, + // localMemory[n], remoteMemory[n]); + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, + localMemory[n], remoteMemory[n]); } } @@ -282,3 +288,26 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) { ASSERT_TRUE(testWriteCorrectness()); communicator->bootstrap()->barrier(); } + +TEST_F(CommunicatorTest, TestEthernetConnection) { + if (gEnv->rank >= numRanksToUse) return; + + deviceBufferInit(); + communicator->bootstrap()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + communicator->bootstrap()->barrier(); + + // polling until it becomes ready + bool ready = false; + int niter = 0; + do { + ready = testWriteCorrectness(); + niter++; + if (niter == 10000) { + FAIL() << "Polling is stuck."; + } + } while (!ready); + communicator->bootstrap()->barrier(); + +} \ No newline at end of file diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index e934dee49..ec89f8111 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -92,7 +92,7 @@ class CommunicatorTestBase : public MultiProcessTest { void TearDown() override; void setNumRanksToUse(int num); - void connectMesh(bool useIbOnly = false); + void connectMesh(bool useIpc = true, bool useIb = true, bool useEthernet = false); // Register a local memory and receive corresponding remote memories void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, From 9913b7ae9b8a58fc0fcfa6eccf0953c3bd50ad9c Mon Sep 17 00:00:00 2001 From: root Date: Tue, 2 Apr 2024 01:36:56 +0000 Subject: [PATCH 02/15] merging main --- src/endpoint.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/endpoint.cc b/src/endpoint.cc index f53bf622c..c5462ac98 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -5,6 +5,7 @@ #include "api.h" #include "context.hpp" #include "utils_internal.hpp" +#include "socket.h" namespace mscclpp { @@ -24,7 +25,7 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl) // Starting Server Socket socket_ = std::make_unique(&socketAddress_, 0xdeadbeef, SocketTypeBootstrap, abortFlag_); - socket_->listen(); + socket_->bindAndListen(); socketAddress_ = socket_->getAddr(); } } From 9f3e1af85382eee2083d4e9875fd6395f61cbab2 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 2 Apr 2024 22:21:12 +0000 Subject: [PATCH 03/15] organizing the code --- src/connection.cc | 88 +++++++++++++++++++++----------------- src/include/connection.hpp | 12 ++++-- 2 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index a79439d7c..f392a8172 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -183,7 +183,7 @@ void IBConnection::flush(int64_t timeoutUsec) { // EthernetConnection -EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : stopRcvMessages(false), abortFlag_(0) { +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : stopRcvMessages_(false), abortFlag_(0) { // Validating Transport Protocol if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); @@ -192,50 +192,28 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn // Creating Thread to Accept the Connection auto parameter = (getImpl(localEndpoint)->socket_).get(); std::thread t([this, parameter]() { - rcvSocket = std::make_unique(nullptr, MSCCLPP_SOCKET_MAGIC, SocketTypeUnknown, abortFlag_); - rcvSocket->accept(parameter); + rcvSocket_ = std::make_unique(nullptr, MSCCLPP_SOCKET_MAGIC, SocketTypeUnknown, abortFlag_); + rcvSocket_->accept(parameter); }); // Starting Connection - sendSocket = std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_); - sendSocket->connect(); + sendSocket_ = std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_); + sendSocket_->connect(); // Ensure the Connection was Established t.join(); // Starting Thread to Receive Messages - threadRcvMessages = std::thread([this]() { - int* response = (int*)malloc(sizeof(int)); - uint64_t size; - int* buffer; - int closed = 0; - bool received; - - while (!stopRcvMessages) { - received = true; - - if(closed == 0) rcvSocket->recvUntilEnd(&response, sizeof(int*), &closed); - received &= !closed; - - if(closed == 0) rcvSocket->recvUntilEnd(&size, sizeof(uint64_t), &closed); - received &= !closed; - - buffer = (int*)malloc(size); - if(closed == 0) rcvSocket->recvUntilEnd(buffer, size, &closed); - received &= !closed; - - if(received) mscclpp::memcpyCuda(response, buffer, size/sizeof(int), cudaMemcpyHostToDevice); - } - }); + threadRcvMessages_ = std::thread(&EthernetConnection::rcvMessages, this); INFO(MSCCLPP_NET, "Ethernet connection created"); } EthernetConnection::~EthernetConnection(){ - sendSocket->close(); - stopRcvMessages = true; - rcvSocket->close(); - threadRcvMessages.join(); + sendSocket_->close(); + stopRcvMessages_ = true; + rcvSocket_->close(); + threadRcvMessages_.join(); } Transport EthernetConnection::transport() { return Transport::Ethernet; } @@ -249,18 +227,18 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe validateTransport(src, transport()); // Initalizing Variables - int* srcPtr = reinterpret_cast(src.data()) + srcOffset/sizeof(int); - int* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset/sizeof(int); - int* data; + char* srcPtr = reinterpret_cast(src.data()) + srcOffset/sizeof(char); + char* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset/sizeof(char); + char* data; // Getting Data From GPU - data = (int*)malloc(size); - mscclpp::memcpyCuda(data, (int*)srcPtr, size/sizeof(int), cudaMemcpyDeviceToHost); + data = (char*)malloc(size); + mscclpp::memcpyCuda(data, (char*)srcPtr, size/sizeof(char), cudaMemcpyDeviceToHost); // Sending Data - sendSocket->send(&dstPtr, sizeof(int*)); - sendSocket->send(&size, sizeof(uint64_t)); - sendSocket->send(data, size); + sendSocket_->send(&dstPtr, sizeof(char*)); + sendSocket_->send(&size, sizeof(uint64_t)); + sendSocket_->send(data, size); INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size); } @@ -281,4 +259,34 @@ void EthernetConnection::flush(int64_t timeoutUsec) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); } +void EthernetConnection::rcvMessages(){ + // Declarating Variables + char* ptr = (char*)malloc(sizeof(char)); + char* buffer; + uint64_t size; + int closed = 0; + bool received; + + // Receiving Messages Until Connection is Closed + while (!stopRcvMessages_) { + received = true; + + // Receiving Data Address + if(closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); + received &= !closed; + + // Receiving data size + if(closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); + received &= !closed; + + // Receiving Data + buffer = (char*)malloc(size); + if(closed == 0) rcvSocket_->recvUntilEnd(buffer, size, &closed); + received &= !closed; + + // Copying Data to GPU + if(received) mscclpp::memcpyCuda(ptr, buffer, size/sizeof(char), cudaMemcpyHostToDevice); + } +} + } // namespace mscclpp diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 61f99139a..e928af62b 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -55,10 +55,11 @@ class IBConnection : public Connection { }; class EthernetConnection : public Connection { - std::unique_ptr sendSocket; - std::unique_ptr rcvSocket; - std::thread threadRcvMessages; - bool stopRcvMessages; + std::unique_ptr sendSocket_; + std::unique_ptr rcvSocket_; + cudaStream_t stream_; + std::thread threadRcvMessages_; + bool stopRcvMessages_; volatile uint32_t* abortFlag_; public: @@ -75,6 +76,9 @@ class EthernetConnection : public Connection { void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush(int64_t timeoutUsec) override; + + private: + void rcvMessages(); }; } // namespace mscclpp From 1c9a9f3379aa609afb5b86b1bee7b78ab4ee39f0 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 5 Apr 2024 01:19:55 +0000 Subject: [PATCH 04/15] implementing and proxytest of updateAndSync in EthernetConnection --- src/bootstrap/socket.cc | 2 +- src/connection.cc | 28 ++++++++++-------- test/mp_unit/mp_unit_tests.hpp | 8 +++--- test/mp_unit/proxy_channel_tests.cu | 44 ++++++++++++++++++----------- 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index 0c92652ea..2dbc30103 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -548,7 +548,7 @@ void Socket::recvUntilEnd(void *ptr, int size, int* closed){ *closed = 0; if (state_ != SocketStateReady) { std::stringstream ss; - ss << "socket state (" << state_ << ") is not ready"; + ss << "socket state (" << state_ << ") is not ready in recvUntilEnd"; throw Error(ss.str(), ErrorCode::InternalError); } diff --git a/src/connection.cc b/src/connection.cc index f392a8172..8c04c1b7e 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -244,12 +244,20 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe } void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { + // Validating Transport Protocol validateTransport(dst, remoteTransport()); + + // Initalizing Variables uint64_t oldValue = *src; + uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.originalDataPtr()) + dstOffset); + uint64_t size = sizeof(uint64_t); *src = newValue; - uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.data()) + dstOffset); - *dstPtr = newValue; + // Sending Data + sendSocket_->send(&dstPtr, sizeof(char*)); + sendSocket_->send(&size, sizeof(uint64_t)); + sendSocket_->send(src, size); + INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, newValue); } @@ -260,17 +268,15 @@ void EthernetConnection::flush(int64_t timeoutUsec) { } void EthernetConnection::rcvMessages(){ - // Declarating Variables - char* ptr = (char*)malloc(sizeof(char)); - char* buffer; - uint64_t size; - int closed = 0; - bool received; - // Receiving Messages Until Connection is Closed while (!stopRcvMessages_) { - received = true; - + // Declarating Variables + char* ptr; + char* buffer; + uint64_t size; + int closed = 0; + bool received = true; + // Receiving Data Address if(closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); received &= !closed; diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index ec89f8111..8a3946d00 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -132,10 +132,10 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase { void SetUp() override; void TearDown() override; - void setupMeshConnections(std::vector& proxyChannels, bool useIbOnly, void* sendBuff, - size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); - void testPingPong(bool useIbOnly, bool waitWithPoll); - void testPingPongPerf(bool useIbOnly, bool waitWithPoll); + void setupMeshConnections(std::vector& proxyChannels, bool useIPC, bool useIb, bool useEthernet, + void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); + void testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll); + void testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll); void testPacketPingPong(bool useIbOnly); void testPacketPingPongPerf(bool useIbOnly); diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 796a565d4..8724e07a8 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -16,12 +16,16 @@ void ProxyChannelOneToOneTest::SetUp() { void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } void ProxyChannelOneToOneTest::setupMeshConnections(std::vector& proxyChannels, - bool useIbOnly, void* sendBuff, size_t sendBuffBytes, + bool useIPC, bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); - mscclpp::TransportFlags transport = (useIbOnly) ? ibTransport : (mscclpp::Transport::CudaIpc | ibTransport); + mscclpp::TransportFlags transport; + + if(useIPC) transport |= mscclpp::Transport::CudaIpc; + if(useIb) transport |= ibTransport; + if(useEthernet) transport |= mscclpp::Transport::Ethernet; std::vector>> connectionFutures(worldSize); std::vector> remoteMemFutures(worldSize); @@ -36,11 +40,14 @@ void ProxyChannelOneToOneTest::setupMeshConnections(std::vectorrank)) && !useIbOnly) { + if ((rankToNode(r) == rankToNode(gEnv->rank)) && useIPC) { connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::CudaIpc); - } else { + } else if(useIb) { connectionFutures[r] = communicator->connectOnSetup(r, 0, ibTransport); } + else if(useEthernet) { + connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::Ethernet); + } if (isInPlace) { communicator->sendMemoryOnSetup(sendBufRegMem, r, 0); @@ -145,14 +152,14 @@ __global__ void kernelProxyPingPong(int* buff, int rank, int nElem, bool waitWit } } -void ProxyChannelOneToOneTest::testPingPong(bool useIbOnly, bool waitWithPoll) { +void ProxyChannelOneToOneTest::testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll) { if (gEnv->rank >= numRanksToUse) return; const int nElem = 4 * 1024 * 1024; std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); - setupMeshConnections(proxyChannels, useIbOnly, buff.get(), nElem * sizeof(int)); + setupMeshConnections(proxyChannels, useIPC, useIB, useEthernet, buff.get(), nElem * sizeof(int)); std::vector> proxyChannelHandles; for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle()); @@ -190,14 +197,15 @@ void ProxyChannelOneToOneTest::testPingPong(bool useIbOnly, bool waitWithPoll) { proxyService->stopProxy(); } -void ProxyChannelOneToOneTest::testPingPongPerf(bool useIbOnly, bool waitWithPoll) { +void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll) { if (gEnv->rank >= numRanksToUse) return; const int nElem = 4 * 1024 * 1024; + //const int nElem = 256000000; std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); - setupMeshConnections(proxyChannels, useIbOnly, buff.get(), nElem * sizeof(int)); + setupMeshConnections(proxyChannels, useIPC, useIB, useEthernet, buff.get(), nElem * sizeof(int)); std::vector> proxyChannelHandles; for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle()); @@ -234,17 +242,21 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIbOnly, bool waitWithPol proxyService->stopProxy(); } -TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(true, true, false, false); } + +TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true , false, false); } + +TEST_F(ProxyChannelOneToOneTest, PingPongEthernet) { testPingPong(false, false, true, false); } -TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(true, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongWithPoll) { testPingPong(true, true, false, true); } -TEST_F(ProxyChannelOneToOneTest, PingPongWithPoll) { testPingPong(false, true); } +TEST_F(ProxyChannelOneToOneTest, PingPongIbWithPoll) { testPingPong(false, true, false, true); } -TEST_F(ProxyChannelOneToOneTest, PingPongIbWithPoll) { testPingPong(true, true); } +TEST_F(ProxyChannelOneToOneTest, PingPongPerf) { testPingPongPerf(true, true, false, false); } -TEST_F(ProxyChannelOneToOneTest, PingPongPerf) { testPingPongPerf(false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongPerfIb) { testPingPongPerf(false, true, false, false); } -TEST_F(ProxyChannelOneToOneTest, PingPongPerfIb) { testPingPongPerf(true, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongPerfEthernet) { testPingPongPerf(false, false, true, false); } __device__ mscclpp::DeviceSyncer gChannelOneToOneTestProxyChansSyncer; @@ -324,7 +336,7 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) { auto putPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); auto getPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); - setupMeshConnections(proxyChannels, useIbOnly, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), + setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); ASSERT_EQ(proxyChannels.size(), 1); @@ -391,7 +403,7 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) { auto putPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); auto getPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); - setupMeshConnections(proxyChannels, useIbOnly, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), + setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); ASSERT_EQ(proxyChannels.size(), 1); From db263e7507c712a945cde76cb79f53824fcc438d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Apr 2024 21:41:49 +0000 Subject: [PATCH 05/15] Implemented message chunking for large data transfers --- src/connection.cc | 39 ++++++++++++++++++----------- src/include/connection.hpp | 9 +++++-- test/mp_unit/communicator_tests.cu | 1 - test/mp_unit/proxy_channel_tests.cu | 1 - 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 8c04c1b7e..71f940054 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -189,6 +189,10 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); } + // Instanciating Buffers + sendBuffer_ = new char[sendBufferSize_]; + rcvBuffer_ = new char[rcvBufferSize_]; + // Creating Thread to Accept the Connection auto parameter = (getImpl(localEndpoint)->socket_).get(); std::thread t([this, parameter]() { @@ -229,16 +233,19 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe // Initalizing Variables char* srcPtr = reinterpret_cast(src.data()) + srcOffset/sizeof(char); char* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset/sizeof(char); - char* data; - - // Getting Data From GPU - data = (char*)malloc(size); - mscclpp::memcpyCuda(data, (char*)srcPtr, size/sizeof(char), cudaMemcpyDeviceToHost); + uint64_t sendSize = 0; - // Sending Data + // Sending Info Data sendSocket_->send(&dstPtr, sizeof(char*)); sendSocket_->send(&size, sizeof(uint64_t)); - sendSocket_->send(data, size); + + // Getting Data From GPU and Sending Data + while(sendSize < size){ + uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize)/sizeof(char)) * sizeof(char); + mscclpp::memcpyCuda(sendBuffer_, (char*)srcPtr + (sendSize/sizeof(char)), messageSize, cudaMemcpyDeviceToHost); + sendSocket_->send(sendBuffer_, messageSize); + sendSize += messageSize; + } INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size); } @@ -272,8 +279,8 @@ void EthernetConnection::rcvMessages(){ while (!stopRcvMessages_) { // Declarating Variables char* ptr; - char* buffer; uint64_t size; + uint64_t rcvSize = 0; int closed = 0; bool received = true; @@ -285,13 +292,15 @@ void EthernetConnection::rcvMessages(){ if(closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); received &= !closed; - // Receiving Data - buffer = (char*)malloc(size); - if(closed == 0) rcvSocket_->recvUntilEnd(buffer, size, &closed); - received &= !closed; - - // Copying Data to GPU - if(received) mscclpp::memcpyCuda(ptr, buffer, size/sizeof(char), cudaMemcpyHostToDevice); + // Receiving Data and Copying Data yo GPU + while(rcvSize < size && closed == 0){ + uint64_t messageSize = std::min(rcvBufferSize_, (size - rcvSize)/sizeof(char)) * sizeof(char); + rcvSocket_->recvUntilEnd(rcvBuffer_, messageSize, &closed); + received &= !closed; + + if(received) mscclpp::memcpyCuda((char*)ptr + (rcvSize/sizeof(char)), rcvBuffer_, messageSize, cudaMemcpyHostToDevice); + rcvSize += messageSize; + } } } diff --git a/src/include/connection.hpp b/src/include/connection.hpp index e928af62b..297e155b4 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -57,11 +57,14 @@ class IBConnection : public Connection { class EthernetConnection : public Connection { std::unique_ptr sendSocket_; std::unique_ptr rcvSocket_; - cudaStream_t stream_; std::thread threadRcvMessages_; bool stopRcvMessages_; volatile uint32_t* abortFlag_; - + static const uint64_t sendBufferSize_ = 256000000; + static const uint64_t rcvBufferSize_ = 256000000; + char *sendBuffer_; + char *rcvBuffer_; + public: EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint); @@ -79,6 +82,8 @@ class EthernetConnection : public Connection { private: void rcvMessages(); + + void sendMessage(); }; } // namespace mscclpp diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 6d4e75954..10526b0ed 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -309,5 +309,4 @@ TEST_F(CommunicatorTest, TestEthernetConnection) { } } while (!ready); communicator->bootstrap()->barrier(); - } \ No newline at end of file diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 8724e07a8..7bf52308b 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -201,7 +201,6 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool us if (gEnv->rank >= numRanksToUse) return; const int nElem = 4 * 1024 * 1024; - //const int nElem = 256000000; std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); From 249ad84d3a46dc1b7fd4259ac3a8ab2fae2caf11 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Apr 2024 22:13:10 +0000 Subject: [PATCH 06/15] adjusting some tests --- test/mp_unit/communicator_tests.cu | 14 +++++++------- test/mp_unit/proxy_channel_tests.cu | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 10526b0ed..c06977a87 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -101,7 +101,7 @@ void CommunicatorTest::SetUp() { ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); - connectMesh(false, false, true); + connectMesh(true, true, false); devicePtr.resize(numBuffers); localMemory.resize(numBuffers); @@ -116,10 +116,10 @@ void CommunicatorTest::SetUp() { for (size_t n = 0; n < numBuffers; n++) { devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); - //registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, - // localMemory[n], remoteMemory[n]); - registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, - localMemory[n], remoteMemory[n]); + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, + localMemory[n], remoteMemory[n]); + //registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, + // localMemory[n], remoteMemory[n]); } } @@ -289,7 +289,7 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) { communicator->bootstrap()->barrier(); } -TEST_F(CommunicatorTest, TestEthernetConnection) { +/*TEST_F(CommunicatorTest, TestEthernetConnection) { if (gEnv->rank >= numRanksToUse) return; deviceBufferInit(); @@ -309,4 +309,4 @@ TEST_F(CommunicatorTest, TestEthernetConnection) { } } while (!ready); communicator->bootstrap()->barrier(); -} \ No newline at end of file +}*/ \ No newline at end of file diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 7bf52308b..ce13db580 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -219,7 +219,7 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool us auto* testInfo = ::testing::UnitTest::GetInstance()->current_test_info(); const std::string testName = std::string(testInfo->test_suite_name()) + "." + std::string(testInfo->name()); - const int nTries = 1000000; + const int nTries = 1000; // Warm-up kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, waitWithPoll, nTries, ret.get()); From 63c3ec66ddb58bd5b78fd9be018d9605e967f412 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Apr 2024 22:37:02 +0000 Subject: [PATCH 07/15] adjust EthernetConnections --- src/include/connection.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 297e155b4..e57ecd348 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -60,8 +60,8 @@ class EthernetConnection : public Connection { std::thread threadRcvMessages_; bool stopRcvMessages_; volatile uint32_t* abortFlag_; - static const uint64_t sendBufferSize_ = 256000000; - static const uint64_t rcvBufferSize_ = 256000000; + const uint64_t sendBufferSize_ = 256000000; + const uint64_t rcvBufferSize_ = 256000000; char *sendBuffer_; char *rcvBuffer_; From 9448acd145add92661e9cd12c2f3565d628e9230 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Apr 2024 22:55:51 +0000 Subject: [PATCH 08/15] adjust format --- src/connection.cc | 4 ++-- src/include/connection.hpp | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 71f940054..71c220a29 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -230,7 +230,7 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe validateTransport(dst, remoteTransport()); validateTransport(src, transport()); - // Initalizing Variables + // Initializing Variables char* srcPtr = reinterpret_cast(src.data()) + srcOffset/sizeof(char); char* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset/sizeof(char); uint64_t sendSize = 0; @@ -254,7 +254,7 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, // Validating Transport Protocol validateTransport(dst, remoteTransport()); - // Initalizing Variables + // Initializing Variables uint64_t oldValue = *src; uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.originalDataPtr()) + dstOffset); uint64_t size = sizeof(uint64_t); diff --git a/src/include/connection.hpp b/src/include/connection.hpp index e57ecd348..43dfbd69e 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -62,10 +62,10 @@ class EthernetConnection : public Connection { volatile uint32_t* abortFlag_; const uint64_t sendBufferSize_ = 256000000; const uint64_t rcvBufferSize_ = 256000000; - char *sendBuffer_; - char *rcvBuffer_; + char* sendBuffer_; + char* rcvBuffer_; - public: + public: EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint); ~EthernetConnection(); @@ -80,7 +80,7 @@ class EthernetConnection : public Connection { void flush(int64_t timeoutUsec) override; - private: + private: void rcvMessages(); void sendMessage(); From 4966faab55ddfe4eb618b1c0ae5867772c2f8d59 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Apr 2024 23:15:58 +0000 Subject: [PATCH 09/15] fixing format --- include/mscclpp/core.hpp | 28 +++++++++--------- src/bootstrap/socket.cc | 2 +- src/connection.cc | 44 +++++++++++++++-------------- src/context.cc | 5 ++-- src/endpoint.cc | 10 +++---- src/include/socket.h | 2 +- test/mp_unit/communicator_tests.cu | 10 +++---- test/mp_unit/mp_unit_tests.hpp | 5 ++-- test/mp_unit/proxy_channel_tests.cu | 27 +++++++++--------- 9 files changed, 66 insertions(+), 67 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 4fccdfce4..257d0ad35 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -127,23 +127,23 @@ class TcpBootstrap : public Bootstrap { /// Enumerates the available transport types. enum class Transport { - Unknown, // Unknown transport type. - CudaIpc, // CUDA IPC transport type. - Nvls, // NVLS transport type. - IB0, // InfiniBand device 0 transport type. - IB1, // InfiniBand device 1 transport type. - IB2, // InfiniBand device 2 transport type. - IB3, // InfiniBand device 3 transport type. - IB4, // InfiniBand device 4 transport type. - IB5, // InfiniBand device 5 transport type. - IB6, // InfiniBand device 6 transport type. - IB7, // InfiniBand device 7 transport type. - Ethernet, // Ethernet transport type. + Unknown, // Unknown transport type. + CudaIpc, // CUDA IPC transport type. + Nvls, // NVLS transport type. + IB0, // InfiniBand device 0 transport type. + IB1, // InfiniBand device 1 transport type. + IB2, // InfiniBand device 2 transport type. + IB3, // InfiniBand device 3 transport type. + IB4, // InfiniBand device 4 transport type. + IB5, // InfiniBand device 5 transport type. + IB6, // InfiniBand device 6 transport type. + IB7, // InfiniBand device 7 transport type. + Ethernet, // Ethernet transport type. NumTransports, // The number of transports. }; -const std::string TransportNames[] = {"UNK", "IPC", "NVLS", "IB0", "IB1", "IB2", - "IB3", "IB4", "IB5", "IB6", "IB7", "ETH", "NUM"}; +const std::string TransportNames[] = {"UNK", "IPC", "NVLS", "IB0", "IB1", "IB2", "IB3", + "IB4", "IB5", "IB6", "IB7", "ETH", "NUM"}; namespace detail { const size_t TransportFlagsSize = 12; diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index 2dbc30103..9e5913403 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -543,7 +543,7 @@ void Socket::recv(void* ptr, int size) { socketWait(MSCCLPP_SOCKET_RECV, ptr, size, &offset); } -void Socket::recvUntilEnd(void *ptr, int size, int* closed){ +void Socket::recvUntilEnd(void* ptr, int size, int* closed) { int offset = 0; *closed = 0; if (state_ != SocketStateReady) { diff --git a/src/connection.cc b/src/connection.cc index 71c220a29..0b3ff3c93 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -183,7 +183,8 @@ void IBConnection::flush(int64_t timeoutUsec) { // EthernetConnection -EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : stopRcvMessages_(false), abortFlag_(0) { +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) + : stopRcvMessages_(false), abortFlag_(0) { // Validating Transport Protocol if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); @@ -201,7 +202,8 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn }); // Starting Connection - sendSocket_ = std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_); + sendSocket_ = + std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_); sendSocket_->connect(); // Ensure the Connection was Established @@ -213,7 +215,7 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn INFO(MSCCLPP_NET, "Ethernet connection created"); } -EthernetConnection::~EthernetConnection(){ +EthernetConnection::~EthernetConnection() { sendSocket_->close(); stopRcvMessages_ = true; rcvSocket_->close(); @@ -231,8 +233,8 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe validateTransport(src, transport()); // Initializing Variables - char* srcPtr = reinterpret_cast(src.data()) + srcOffset/sizeof(char); - char* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset/sizeof(char); + char* srcPtr = reinterpret_cast(src.data()) + srcOffset / sizeof(char); + char* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset / sizeof(char); uint64_t sendSize = 0; // Sending Info Data @@ -240,9 +242,10 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe sendSocket_->send(&size, sizeof(uint64_t)); // Getting Data From GPU and Sending Data - while(sendSize < size){ - uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize)/sizeof(char)) * sizeof(char); - mscclpp::memcpyCuda(sendBuffer_, (char*)srcPtr + (sendSize/sizeof(char)), messageSize, cudaMemcpyDeviceToHost); + while (sendSize < size) { + uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize) / sizeof(char)) * sizeof(char); + mscclpp::memcpyCuda(sendBuffer_, (char*)srcPtr + (sendSize / sizeof(char)), messageSize, + cudaMemcpyDeviceToHost); sendSocket_->send(sendBuffer_, messageSize); sendSize += messageSize; } @@ -264,17 +267,14 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, sendSocket_->send(&dstPtr, sizeof(char*)); sendSocket_->send(&size, sizeof(uint64_t)); sendSocket_->send(src, size); - + INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, newValue); } -void EthernetConnection::flush(int64_t timeoutUsec) { - - INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); -} +void EthernetConnection::flush(int64_t timeoutUsec) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); } -void EthernetConnection::rcvMessages(){ +void EthernetConnection::rcvMessages() { // Receiving Messages Until Connection is Closed while (!stopRcvMessages_) { // Declarating Variables @@ -285,20 +285,22 @@ void EthernetConnection::rcvMessages(){ bool received = true; // Receiving Data Address - if(closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); + if (closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); received &= !closed; - + // Receiving data size - if(closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); + if (closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); received &= !closed; - + // Receiving Data and Copying Data yo GPU - while(rcvSize < size && closed == 0){ - uint64_t messageSize = std::min(rcvBufferSize_, (size - rcvSize)/sizeof(char)) * sizeof(char); + while (rcvSize < size && closed == 0) { + uint64_t messageSize = std::min(rcvBufferSize_, (size - rcvSize) / sizeof(char)) * sizeof(char); rcvSocket_->recvUntilEnd(rcvBuffer_, messageSize, &closed); received &= !closed; - if(received) mscclpp::memcpyCuda((char*)ptr + (rcvSize/sizeof(char)), rcvBuffer_, messageSize, cudaMemcpyHostToDevice); + if (received) + mscclpp::memcpyCuda((char*)ptr + (rcvSize / sizeof(char)), rcvBuffer_, messageSize, + cudaMemcpyHostToDevice); rcvSize += messageSize; } } diff --git a/src/context.cc b/src/context.cc index 3a8ff4aa6..f8bb3ec83 100644 --- a/src/context.cc +++ b/src/context.cc @@ -49,13 +49,12 @@ MSCCLPP_API_CPP std::shared_ptr Context::connect(Endpoint localEndpo throw mscclpp::Error("Local transport is IB but remote is not", ErrorCode::InvalidUsage); } conn = std::make_shared(localEndpoint, remoteEndpoint, *this); - } else if(localEndpoint.transport() == Transport::Ethernet) { + } else if (localEndpoint.transport() == Transport::Ethernet) { if (remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Local transport is Ethernet but remote is not", ErrorCode::InvalidUsage); } conn = std::make_shared(localEndpoint, remoteEndpoint); - } - else { + } else { throw mscclpp::Error("Unsupported transport", ErrorCode::InternalError); } diff --git a/src/endpoint.cc b/src/endpoint.cc index c5462ac98..e0c596533 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -4,8 +4,8 @@ #include "api.h" #include "context.hpp" -#include "utils_internal.hpp" #include "socket.h" +#include "utils_internal.hpp" namespace mscclpp { @@ -16,8 +16,7 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl) ibQp_ = contextImpl.getIbContext(transport_) ->createQp(config.ibMaxCqSize, config.ibMaxCqPollNum, config.ibMaxSendWr, 0, config.ibMaxWrPerSend); ibQpInfo_ = ibQp_->getInfo(); - } - else if(transport_ == Transport::Ethernet) { + } else if (transport_ == Transport::Ethernet) { // Configuring Ethernet Interfaces abortFlag_ = 0; int ret = FindInterfaces(netIfName_, &socketAddress_, MAX_IF_NAME_SIZE, 1, ""); @@ -40,7 +39,8 @@ MSCCLPP_API_CPP std::vector Endpoint::serialize() { std::copy_n(reinterpret_cast(&pimpl_->ibQpInfo_), sizeof(pimpl_->ibQpInfo_), std::back_inserter(data)); } if ((pimpl_->transport_) == Transport::Ethernet) { - std::copy_n(reinterpret_cast(&pimpl_->socketAddress_), sizeof(pimpl_->socketAddress_), std::back_inserter(data)); + std::copy_n(reinterpret_cast(&pimpl_->socketAddress_), sizeof(pimpl_->socketAddress_), + std::back_inserter(data)); } return data; } @@ -60,7 +60,7 @@ Endpoint::Impl::Impl(const std::vector& serialization) { std::copy_n(it, sizeof(ibQpInfo_), reinterpret_cast(&ibQpInfo_)); it += sizeof(ibQpInfo_); } - if (transport_ == Transport::Ethernet){ + if (transport_ == Transport::Ethernet) { std::copy_n(it, sizeof(socketAddress_), reinterpret_cast(&socketAddress_)); it += sizeof(socketAddress_); } diff --git a/src/include/socket.h b/src/include/socket.h index 9f53d8bfe..77cdfa61a 100644 --- a/src/include/socket.h +++ b/src/include/socket.h @@ -69,7 +69,7 @@ class Socket { void accept(const Socket* listenSocket, int64_t timeout = -1); void send(void* ptr, int size); void recv(void* ptr, int size); - void recvUntilEnd(void *ptr, int size, int* closed); + void recvUntilEnd(void* ptr, int size, int* closed); void close(); int getFd() const { return fd_; } diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index c06977a87..d1f226149 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -48,11 +48,9 @@ void CommunicatorTestBase::connectMesh(bool useIpc, bool useIb, bool useEthernet if (i != gEnv->rank) { if ((rankToNode(i) == rankToNode(gEnv->rank)) && useIpc) { connectionFutures[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc); - } - else if(useIb) { + } else if (useIb) { connectionFutures[i] = communicator->connectOnSetup(i, 0, ibTransport); - } - else if(useEthernet) { + } else if (useEthernet) { connectionFutures[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::Ethernet); } } @@ -118,8 +116,8 @@ void CommunicatorTest::SetUp() { devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, localMemory[n], remoteMemory[n]); - //registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, - // localMemory[n], remoteMemory[n]); + // registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, + // localMemory[n], remoteMemory[n]); } } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index 8a3946d00..2abb43d0e 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -132,8 +132,9 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase { void SetUp() override; void TearDown() override; - void setupMeshConnections(std::vector& proxyChannels, bool useIPC, bool useIb, bool useEthernet, - void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); + void setupMeshConnections(std::vector& proxyChannels, bool useIPC, bool useIb, + bool useEthernet, void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, + size_t recvBuffBytes = 0); void testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll); void testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll); void testPacketPingPong(bool useIbOnly); diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index ce13db580..2975097f4 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -16,16 +16,16 @@ void ProxyChannelOneToOneTest::SetUp() { void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } void ProxyChannelOneToOneTest::setupMeshConnections(std::vector& proxyChannels, - bool useIPC, bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes, - void* recvBuff, size_t recvBuffBytes) { + bool useIPC, bool useIb, bool useEthernet, void* sendBuff, + size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); - mscclpp::TransportFlags transport; + mscclpp::TransportFlags transport; - if(useIPC) transport |= mscclpp::Transport::CudaIpc; - if(useIb) transport |= ibTransport; - if(useEthernet) transport |= mscclpp::Transport::Ethernet; + if (useIPC) transport |= mscclpp::Transport::CudaIpc; + if (useIb) transport |= ibTransport; + if (useEthernet) transport |= mscclpp::Transport::Ethernet; std::vector>> connectionFutures(worldSize); std::vector> remoteMemFutures(worldSize); @@ -42,10 +42,9 @@ void ProxyChannelOneToOneTest::setupMeshConnections(std::vectorrank)) && useIPC) { connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::CudaIpc); - } else if(useIb) { + } else if (useIb) { connectionFutures[r] = communicator->connectOnSetup(r, 0, ibTransport); - } - else if(useEthernet) { + } else if (useEthernet) { connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::Ethernet); } @@ -243,7 +242,7 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool us TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(true, true, false, false); } -TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true , false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true, false, false); } TEST_F(ProxyChannelOneToOneTest, PingPongEthernet) { testPingPong(false, false, true, false); } @@ -335,8 +334,8 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) { auto putPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); auto getPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); - setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), - getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); + setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), + nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); ASSERT_EQ(proxyChannels.size(), 1); @@ -402,8 +401,8 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) { auto putPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); auto getPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); - setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), - getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); + setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), + nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); ASSERT_EQ(proxyChannels.size(), 1); From cd34080865c8e57fda036722941653aa284afcd2 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 22 Apr 2024 23:20:54 +0000 Subject: [PATCH 10/15] general adjusts --- src/connection.cc | 55 ++++++++++++++--------------- src/include/connection.hpp | 11 +++--- src/registered_memory.cc | 2 +- test/mp_unit/mp_unit_tests.hpp | 11 ++++-- test/mp_unit/proxy_channel_tests.cu | 52 +++++++++++++++++---------- 5 files changed, 76 insertions(+), 55 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 0b3ff3c93..ac0df6edf 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -183,22 +183,21 @@ void IBConnection::flush(int64_t timeoutUsec) { // EthernetConnection -EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) - : stopRcvMessages_(false), abortFlag_(0) { +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : abortFlag_(0) { // Validating Transport Protocol if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); } // Instanciating Buffers - sendBuffer_ = new char[sendBufferSize_]; - rcvBuffer_ = new char[rcvBufferSize_]; + sendBuffer_.resize(sendBufferSize_); + recvBuffer_.resize(rcvBufferSize_); // Creating Thread to Accept the Connection auto parameter = (getImpl(localEndpoint)->socket_).get(); std::thread t([this, parameter]() { - rcvSocket_ = std::make_unique(nullptr, MSCCLPP_SOCKET_MAGIC, SocketTypeUnknown, abortFlag_); - rcvSocket_->accept(parameter); + recvSocket_ = std::make_unique(nullptr, MSCCLPP_SOCKET_MAGIC, SocketTypeUnknown, abortFlag_); + recvSocket_->accept(parameter); }); // Starting Connection @@ -210,16 +209,15 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn t.join(); // Starting Thread to Receive Messages - threadRcvMessages_ = std::thread(&EthernetConnection::rcvMessages, this); + threadRecvMessages_ = std::thread(&EthernetConnection::recvMessages, this); INFO(MSCCLPP_NET, "Ethernet connection created"); } EthernetConnection::~EthernetConnection() { sendSocket_->close(); - stopRcvMessages_ = true; - rcvSocket_->close(); - threadRcvMessages_.join(); + recvSocket_->close(); + threadRecvMessages_.join(); } Transport EthernetConnection::transport() { return Transport::Ethernet; } @@ -244,9 +242,9 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe // Getting Data From GPU and Sending Data while (sendSize < size) { uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize) / sizeof(char)) * sizeof(char); - mscclpp::memcpyCuda(sendBuffer_, (char*)srcPtr + (sendSize / sizeof(char)), messageSize, + mscclpp::memcpyCuda(sendBuffer_.data(), (char*)srcPtr + (sendSize / sizeof(char)), messageSize, cudaMemcpyDeviceToHost); - sendSocket_->send(sendBuffer_, messageSize); + sendSocket_->send(sendBuffer_.data(), messageSize); sendSize += messageSize; } @@ -274,34 +272,35 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, void EthernetConnection::flush(int64_t timeoutUsec) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); } -void EthernetConnection::rcvMessages() { - // Receiving Messages Until Connection is Closed - while (!stopRcvMessages_) { - // Declarating Variables - char* ptr; - uint64_t size; - uint64_t rcvSize = 0; - int closed = 0; - bool received = true; +void EthernetConnection::recvMessages() { + // Declarating Variables + char* ptr; + uint64_t size; + uint64_t recvSize; + int closed = 0; + bool received = true; + // Receiving Messages Until Connection is Closed + while (recvSocket_->getState() != SocketStateClosed) { // Receiving Data Address - if (closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); + if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); received &= !closed; // Receiving data size - if (closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); + if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); received &= !closed; // Receiving Data and Copying Data yo GPU - while (rcvSize < size && closed == 0) { - uint64_t messageSize = std::min(rcvBufferSize_, (size - rcvSize) / sizeof(char)) * sizeof(char); - rcvSocket_->recvUntilEnd(rcvBuffer_, messageSize, &closed); + recvSize = 0; + while (recvSize < size && closed == 0) { + uint64_t messageSize = std::min(rcvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char); + recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed); received &= !closed; if (received) - mscclpp::memcpyCuda((char*)ptr + (rcvSize / sizeof(char)), rcvBuffer_, messageSize, + mscclpp::memcpyCuda((char*)ptr + (recvSize / sizeof(char)), recvBuffer_.data(), messageSize, cudaMemcpyHostToDevice); - rcvSize += messageSize; + recvSize += messageSize; } } } diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 43dfbd69e..34fdcb24f 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -56,14 +56,13 @@ class IBConnection : public Connection { class EthernetConnection : public Connection { std::unique_ptr sendSocket_; - std::unique_ptr rcvSocket_; - std::thread threadRcvMessages_; - bool stopRcvMessages_; + std::unique_ptr recvSocket_; + std::thread threadRecvMessages_; volatile uint32_t* abortFlag_; const uint64_t sendBufferSize_ = 256000000; const uint64_t rcvBufferSize_ = 256000000; - char* sendBuffer_; - char* rcvBuffer_; + std::vector sendBuffer_; + std::vector recvBuffer_; public: EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint); @@ -81,7 +80,7 @@ class EthernetConnection : public Connection { void flush(int64_t timeoutUsec) override; private: - void rcvMessages(); + void recvMessages(); void sendMessage(); }; diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 737385214..0702c497b 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -141,7 +141,7 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { } // Next decide how to set this->data - if ((getHostHash() == this->hostHash && getPidHash() == this->pidHash)) { + if (getHostHash() == this->hostHash && getPidHash() == this->pidHash) { // The memory is local to the process, so originalDataPtr is valid as is this->data = this->originalDataPtr; } else if (transports.has(Transport::CudaIpc) && getHostHash() == this->hostHash) { diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index 2abb43d0e..abd8ccff2 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -129,14 +129,21 @@ using DeviceHandle = mscclpp::DeviceHandle; class ProxyChannelOneToOneTest : public CommunicatorTestBase { protected: + struct PingPongTestParams { + bool useIPC; + bool useIB; + bool useEthernet; + bool waitWithPoll; + }; + void SetUp() override; void TearDown() override; void setupMeshConnections(std::vector& proxyChannels, bool useIPC, bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); - void testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll); - void testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll); + void testPingPong(PingPongTestParams params); + void testPingPongPerf(PingPongTestParams params); void testPacketPingPong(bool useIbOnly); void testPacketPingPongPerf(bool useIbOnly); diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 2975097f4..75858b631 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -151,14 +151,14 @@ __global__ void kernelProxyPingPong(int* buff, int rank, int nElem, bool waitWit } } -void ProxyChannelOneToOneTest::testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll) { +void ProxyChannelOneToOneTest::testPingPong(PingPongTestParams params) { if (gEnv->rank >= numRanksToUse) return; const int nElem = 4 * 1024 * 1024; std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); - setupMeshConnections(proxyChannels, useIPC, useIB, useEthernet, buff.get(), nElem * sizeof(int)); + setupMeshConnections(proxyChannels, params.useIPC, params.useIB, params.useEthernet, buff.get(), nElem * sizeof(int)); std::vector> proxyChannelHandles; for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle()); @@ -173,22 +173,22 @@ void ProxyChannelOneToOneTest::testPingPong(bool useIPC, bool useIB, bool useEth const int nTries = 1000; - kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, waitWithPoll, nTries, ret.get()); + kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, params.waitWithPoll, nTries, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); - kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, waitWithPoll, nTries, ret.get()); + kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, params.waitWithPoll, nTries, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); - kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, waitWithPoll, nTries, ret.get()); + kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, params.waitWithPoll, nTries, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); - kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, waitWithPoll, nTries, ret.get()); + kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, params.waitWithPoll, nTries, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); @@ -196,14 +196,14 @@ void ProxyChannelOneToOneTest::testPingPong(bool useIPC, bool useIB, bool useEth proxyService->stopProxy(); } -void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll) { +void ProxyChannelOneToOneTest::testPingPongPerf(PingPongTestParams params) { if (gEnv->rank >= numRanksToUse) return; const int nElem = 4 * 1024 * 1024; std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); - setupMeshConnections(proxyChannels, useIPC, useIB, useEthernet, buff.get(), nElem * sizeof(int)); + setupMeshConnections(proxyChannels, params.useIPC, params.useIB, params.useEthernet, buff.get(), nElem * sizeof(int)); std::vector> proxyChannelHandles; for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle()); @@ -221,14 +221,14 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool us const int nTries = 1000; // Warm-up - kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, waitWithPoll, nTries, ret.get()); + kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, params.waitWithPoll, nTries, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); communicator->bootstrap()->barrier(); // Measure latency mscclpp::Timer timer; - kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, waitWithPoll, nTries, ret.get()); + kernelProxyPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, params.waitWithPoll, nTries, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); communicator->bootstrap()->barrier(); @@ -240,21 +240,37 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool us proxyService->stopProxy(); } -TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(true, true, false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPong) { + testPingPong(PingPongTestParams{.useIPC = true, .useIB = true, .useEthernet = false, .waitWithPoll = false}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true, false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongIb) { + testPingPong(PingPongTestParams{.useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongEthernet) { testPingPong(false, false, true, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongEthernet) { + testPingPong(PingPongTestParams{.useIPC = false, .useIB = false, .useEthernet = true, .waitWithPoll = false}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongWithPoll) { testPingPong(true, true, false, true); } +TEST_F(ProxyChannelOneToOneTest, PingPongWithPoll) { + testPingPong(PingPongTestParams{.useIPC = true, .useIB = true, .useEthernet = false, .waitWithPoll = true}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongIbWithPoll) { testPingPong(false, true, false, true); } +TEST_F(ProxyChannelOneToOneTest, PingPongIbWithPoll) { + testPingPong(PingPongTestParams{.useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = true}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongPerf) { testPingPongPerf(true, true, false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongPerf) { + testPingPongPerf(PingPongTestParams{.useIPC = true, .useIB = true, .useEthernet = false, .waitWithPoll = false}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongPerfIb) { testPingPongPerf(false, true, false, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongPerfIb) { + testPingPongPerf(PingPongTestParams{.useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false}); +} -TEST_F(ProxyChannelOneToOneTest, PingPongPerfEthernet) { testPingPongPerf(false, false, true, false); } +TEST_F(ProxyChannelOneToOneTest, PingPongPerfEthernet) { + testPingPongPerf(PingPongTestParams{.useIPC = false, .useIB = false, .useEthernet = true, .waitWithPoll = false}); +} __device__ mscclpp::DeviceSyncer gChannelOneToOneTestProxyChansSyncer; From 1982fecc5b31caec0d05bd6daa2413191cd81a5d Mon Sep 17 00:00:00 2001 From: root Date: Tue, 23 Apr 2024 21:59:16 +0000 Subject: [PATCH 11/15] packing ethernet message in one message --- src/connection.cc | 56 ++++++++++++++++++++---------- src/endpoint.cc | 2 +- test/mp_unit/communicator_tests.cu | 14 ++++---- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index ac0df6edf..64498506b 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -201,8 +201,8 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn }); // Starting Connection - sendSocket_ = - std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_); + sendSocket_ = std::make_unique(&(getImpl(remoteEndpoint)->socketAddress_), MSCCLPP_SOCKET_MAGIC, + SocketTypeBootstrap, abortFlag_); sendSocket_->connect(); // Ensure the Connection was Established @@ -233,19 +233,27 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe // Initializing Variables char* srcPtr = reinterpret_cast(src.data()) + srcOffset / sizeof(char); char* dstPtr = reinterpret_cast(dst.originalDataPtr()) + dstOffset / sizeof(char); - uint64_t sendSize = 0; - - // Sending Info Data - sendSocket_->send(&dstPtr, sizeof(char*)); - sendSocket_->send(&size, sizeof(uint64_t)); - - // Getting Data From GPU and Sending Data - while (sendSize < size) { - uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize) / sizeof(char)) * sizeof(char); - mscclpp::memcpyCuda(sendBuffer_.data(), (char*)srcPtr + (sendSize / sizeof(char)), messageSize, - cudaMemcpyDeviceToHost); + uint64_t sentDataSize = 0; + uint64_t headerSize = 0; + + // Copying Meta Data to Send Buffer + char* dstPtrBytes = reinterpret_cast(&dstPtr); + std::copy(dstPtrBytes, dstPtrBytes + sizeof(dstPtr), sendBuffer_.data() + headerSize / sizeof(char)); + headerSize += sizeof(dstPtr); + char* sizeBytes = reinterpret_cast(&size); + std::copy(sizeBytes, sizeBytes + sizeof(size), sendBuffer_.data() + headerSize / sizeof(char)); + headerSize += sizeof(size); + + // Getting Data From GPU and Sending Message + while (sentDataSize < size) { + uint64_t dataSize = + std::min(sendBufferSize_ - headerSize / sizeof(char), (size - sentDataSize) / sizeof(char)) * sizeof(char); + uint64_t messageSize = dataSize + headerSize; + mscclpp::memcpyCuda(sendBuffer_.data() + headerSize / sizeof(char), + (char*)srcPtr + (sentDataSize / sizeof(char)), dataSize, cudaMemcpyDeviceToHost); sendSocket_->send(sendBuffer_.data(), messageSize); - sendSize += messageSize; + sentDataSize += messageSize; + headerSize = 0; } INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size); @@ -258,13 +266,23 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, // Initializing Variables uint64_t oldValue = *src; uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.originalDataPtr()) + dstOffset); - uint64_t size = sizeof(uint64_t); + uint64_t dataSize = sizeof(uint64_t); + uint64_t messageSize = 0; *src = newValue; - // Sending Data - sendSocket_->send(&dstPtr, sizeof(char*)); - sendSocket_->send(&size, sizeof(uint64_t)); - sendSocket_->send(src, size); + // Copying Data to Send Buffer + char* dstPtrBytes = reinterpret_cast(&dstPtr); + std::copy(dstPtrBytes, dstPtrBytes + sizeof(dstPtr), sendBuffer_.data() + messageSize / sizeof(char)); + messageSize += sizeof(dstPtr); + char* sizeBytes = reinterpret_cast(&dataSize); + std::copy(sizeBytes, sizeBytes + sizeof(dataSize), sendBuffer_.data() + messageSize / sizeof(char)); + messageSize += sizeof(dataSize); + char* dataBytes = reinterpret_cast(src); + std::copy(dataBytes, dataBytes + dataSize, sendBuffer_.data() + messageSize / sizeof(char)); + messageSize += dataSize; + + // Sending Message + sendSocket_->send(sendBuffer_.data(), messageSize); INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, newValue); diff --git a/src/endpoint.cc b/src/endpoint.cc index e0c596533..68c2726de 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -23,7 +23,7 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl) if (ret <= 0) throw Error("NET/Socket", ErrorCode::InternalError); // Starting Server Socket - socket_ = std::make_unique(&socketAddress_, 0xdeadbeef, SocketTypeBootstrap, abortFlag_); + socket_ = std::make_unique(&socketAddress_, MSCCLPP_SOCKET_MAGIC, SocketTypeBootstrap, abortFlag_); socket_->bindAndListen(); socketAddress_ = socket_->getAddr(); } diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index d1f226149..47da380d2 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -99,7 +99,7 @@ void CommunicatorTest::SetUp() { ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); - connectMesh(true, true, false); + connectMesh(false, false, true); devicePtr.resize(numBuffers); localMemory.resize(numBuffers); @@ -114,10 +114,10 @@ void CommunicatorTest::SetUp() { for (size_t n = 0; n < numBuffers; n++) { devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); - registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, - localMemory[n], remoteMemory[n]); - // registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, - // localMemory[n], remoteMemory[n]); + //registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, + // localMemory[n], remoteMemory[n]); + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, + localMemory[n], remoteMemory[n]); } } @@ -287,7 +287,7 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) { communicator->bootstrap()->barrier(); } -/*TEST_F(CommunicatorTest, TestEthernetConnection) { +TEST_F(CommunicatorTest, TestEthernetConnection) { if (gEnv->rank >= numRanksToUse) return; deviceBufferInit(); @@ -307,4 +307,4 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) { } } while (!ready); communicator->bootstrap()->barrier(); -}*/ \ No newline at end of file +} \ No newline at end of file From 3b7cd65fee2d30e3ee824a47fd03800d8fe6465b Mon Sep 17 00:00:00 2001 From: root Date: Tue, 23 Apr 2024 22:03:54 +0000 Subject: [PATCH 12/15] deleting comments --- test/mp_unit/communicator_tests.cu | 30 +++--------------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 47da380d2..adb6b5df6 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -99,7 +99,7 @@ void CommunicatorTest::SetUp() { ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); - connectMesh(false, false, true); + connectMesh(true, true, false); devicePtr.resize(numBuffers); localMemory.resize(numBuffers); @@ -114,10 +114,8 @@ void CommunicatorTest::SetUp() { for (size_t n = 0; n < numBuffers; n++) { devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); - //registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, - // localMemory[n], remoteMemory[n]); - registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks, - localMemory[n], remoteMemory[n]); + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, + localMemory[n], remoteMemory[n]); } } @@ -285,26 +283,4 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) { ASSERT_TRUE(testWriteCorrectness()); communicator->bootstrap()->barrier(); -} - -TEST_F(CommunicatorTest, TestEthernetConnection) { - if (gEnv->rank >= numRanksToUse) return; - - deviceBufferInit(); - communicator->bootstrap()->barrier(); - - writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); - communicator->bootstrap()->barrier(); - - // polling until it becomes ready - bool ready = false; - int niter = 0; - do { - ready = testWriteCorrectness(); - niter++; - if (niter == 10000) { - FAIL() << "Polling is stuck."; - } - } while (!ready); - communicator->bootstrap()->barrier(); } \ No newline at end of file From c626b02cbb0f7ad77c2497e06223c093bc54be4c Mon Sep 17 00:00:00 2001 From: root Date: Thu, 25 Apr 2024 01:21:32 +0000 Subject: [PATCH 13/15] adjust buffers sizes --- src/connection.cc | 6 +++--- src/include/connection.hpp | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 64498506b..4c35873d3 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -183,7 +183,7 @@ void IBConnection::flush(int64_t timeoutUsec) { // EthernetConnection -EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : abortFlag_(0) { +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize, uint64_t recvBufferSize ) : abortFlag_(0), sendBufferSize_(sendBufferSize), recvBufferSize_(recvBufferSize) { // Validating Transport Protocol if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); @@ -191,7 +191,7 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn // Instanciating Buffers sendBuffer_.resize(sendBufferSize_); - recvBuffer_.resize(rcvBufferSize_); + recvBuffer_.resize(recvBufferSize_); // Creating Thread to Accept the Connection auto parameter = (getImpl(localEndpoint)->socket_).get(); @@ -311,7 +311,7 @@ void EthernetConnection::recvMessages() { // Receiving Data and Copying Data yo GPU recvSize = 0; while (recvSize < size && closed == 0) { - uint64_t messageSize = std::min(rcvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char); + uint64_t messageSize = std::min(recvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char); recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed); received &= !closed; diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 34fdcb24f..2793c4f3a 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -59,13 +59,13 @@ class EthernetConnection : public Connection { std::unique_ptr recvSocket_; std::thread threadRecvMessages_; volatile uint32_t* abortFlag_; - const uint64_t sendBufferSize_ = 256000000; - const uint64_t rcvBufferSize_ = 256000000; + const uint64_t sendBufferSize_; + const uint64_t recvBufferSize_; std::vector sendBuffer_; std::vector recvBuffer_; public: - EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint); + EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize = 256 * 1024 * 1024, uint64_t recvBufferSize = 256 * 1024 * 1024); ~EthernetConnection(); From df2b1bcfd5e0bc0ca0236ee7a9c14b69b3f440a6 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 25 Apr 2024 02:04:00 +0000 Subject: [PATCH 14/15] adjust formatation --- src/connection.cc | 4 +++- src/include/connection.hpp | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 4c35873d3..6e01367f6 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -183,7 +183,9 @@ void IBConnection::flush(int64_t timeoutUsec) { // EthernetConnection -EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize, uint64_t recvBufferSize ) : abortFlag_(0), sendBufferSize_(sendBufferSize), recvBufferSize_(recvBufferSize) { +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize, + uint64_t recvBufferSize) + : abortFlag_(0), sendBufferSize_(sendBufferSize), recvBufferSize_(recvBufferSize) { // Validating Transport Protocol if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 2793c4f3a..283bb8d07 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -65,7 +65,8 @@ class EthernetConnection : public Connection { std::vector recvBuffer_; public: - EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize = 256 * 1024 * 1024, uint64_t recvBufferSize = 256 * 1024 * 1024); + EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize = 256 * 1024 * 1024, + uint64_t recvBufferSize = 256 * 1024 * 1024); ~EthernetConnection(); From bf557122b5c0dec35277c316b873f2247eeea036 Mon Sep 17 00:00:00 2001 From: Caio Rocha Date: Thu, 25 Apr 2024 02:04:00 +0000 Subject: [PATCH 15/15] adjust formatation --- src/connection.cc | 4 +++- src/include/connection.hpp | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 4c35873d3..6e01367f6 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -183,7 +183,9 @@ void IBConnection::flush(int64_t timeoutUsec) { // EthernetConnection -EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize, uint64_t recvBufferSize ) : abortFlag_(0), sendBufferSize_(sendBufferSize), recvBufferSize_(recvBufferSize) { +EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize, + uint64_t recvBufferSize) + : abortFlag_(0), sendBufferSize_(sendBufferSize), recvBufferSize_(recvBufferSize) { // Validating Transport Protocol if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) { throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage); diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 2793c4f3a..283bb8d07 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -65,7 +65,8 @@ class EthernetConnection : public Connection { std::vector recvBuffer_; public: - EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize = 256 * 1024 * 1024, uint64_t recvBufferSize = 256 * 1024 * 1024); + EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, uint64_t sendBufferSize = 256 * 1024 * 1024, + uint64_t recvBufferSize = 256 * 1024 * 1024); ~EthernetConnection();