Skip to content

Commit

Permalink
Change device handle interfaces & others (#142)
Browse files Browse the repository at this point in the history
* Changed device handle interfaces
* Changed proxy service interfaces
* Move device code into separate files
* Fixed FIFO polling issues
* Add configuration arguments in several interface functions

---------

Co-authored-by: Changho Hwang <[email protected]>
Co-authored-by: Binyang Li <[email protected]>
Co-authored-by: root <root@a100-saemal0.qxveptpukjsuthqvv514inp03c.gx.internal.cloudapp.net>
  • Loading branch information
4 people authored Aug 16, 2023
1 parent 4865b20 commit 8d1b984
Show file tree
Hide file tree
Showing 59 changed files with 1,269 additions and 1,034 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Xcompiler -Wall,-Wextra")

list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake)

# clang-format targets
include(${PROJECT_SOURCE_DIR}/cmake/AddClangFormatTargets.cmake)
# Format targets
include(${PROJECT_SOURCE_DIR}/cmake/AddFormatTargets.cmake)

# Options
option(ENABLE_TRACE "Enable tracing" OFF)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mscclpp::Communicator comm(bootstrap);
// Setup connections here using `comm`
...
// Construct the default proxy
mscclpp::ProxyService proxyService(comm);
mscclpp::ProxyService proxyService();
// Start the proxy
proxyService.startProxy();
// Run the user application, i.e., launch GPU kernels here
Expand All @@ -80,7 +80,7 @@ While the default implementation already enables any kinds of communication, MSC
```cpp
// Proxy FIFO is obtained from mscclpp::Proxy on the host and copied to the device.
__device__ mscclpp::DeviceProxyFifo fifo;
__device__ mscclpp::FifoDeviceHandle fifo;
__global__ void gpuKernel() {
...
// Only one thread is needed for the followings
Expand Down
18 changes: 0 additions & 18 deletions cmake/AddClangFormatTargets.cmake

This file was deleted.

38 changes: 38 additions & 0 deletions cmake/AddFormatTargets.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

# Add targets to run clang-format and black

add_custom_target(check-format)
add_custom_target(format)

find_program(CLANG_FORMAT clang-format)
if(CLANG_FORMAT)
message(STATUS "Found clang-format: ${CLANG_FORMAT}")
set(FIND_DIRS ${PROJECT_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/python ${PROJECT_SOURCE_DIR}/test)
add_custom_target(check-format-cpp ALL
COMMAND ${CLANG_FORMAT} -style=file --dry-run `find ${FIND_DIRS} -type f -name *.h -o -name *.hpp -o -name *.c -o -name *.cc -o -name *.cpp -o -name *.cu`
)
add_dependencies(check-format check-format-cpp)
add_custom_target(format-cpp
COMMAND ${CLANG_FORMAT} -style=file -i `find ${FIND_DIRS} -type f -name *.h -o -name *.hpp -o -name *.c -o -name *.cc -o -name *.cpp -o -name *.cu`
)
add_dependencies(format format-cpp)
else()
message(STATUS "clang-format not found.")
endif()

find_program(BLACK black)
if (BLACK)
message(STATUS "Found black: ${BLACK}")
add_custom_target(check-format-py
COMMAND ${BLACK} --config ${PROJECT_SOURCE_DIR}/pyproject.toml --check ${PROJECT_SOURCE_DIR}/python ${PROJECT_SOURCE_DIR}/test
)
add_dependencies(check-format check-format-py)
add_custom_target(format-py
COMMAND ${BLACK} --config ${PROJECT_SOURCE_DIR}/pyproject.toml ${PROJECT_SOURCE_DIR}/python ${PROJECT_SOURCE_DIR}/test
)
add_dependencies(format format-py)
else()
message(STATUS, "black not found.")
endif()
27 changes: 0 additions & 27 deletions include/mscclpp/config.hpp

This file was deleted.

16 changes: 12 additions & 4 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ class TcpBootstrap : public Bootstrap {

/// Initialize the @ref TcpBootstrap with a given unique ID.
/// @param uniqueId The unique ID to initialize the @ref TcpBootstrap with.
void initialize(UniqueId uniqueId);
/// @param timeoutSec The connection timeout in seconds.
void initialize(UniqueId uniqueId, int64_t timeoutSec = 30);

/// Initialize the @ref TcpBootstrap with a string formatted as "ip:port" or "interface:ip:port".
/// @param ifIpPortTrio The string formatted as "ip:port" or "interface:ip:port".
void initialize(const std::string& ifIpPortTrio);
/// @param timeoutSec The connection timeout in seconds.
void initialize(const std::string& ifIpPortTrio, int64_t timeoutSec = 30);

/// Return the rank of the process.
int getRank() override;
Expand Down Expand Up @@ -384,7 +386,7 @@ class Connection {
virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0;

/// Flush any pending writes to the remote process.
virtual void flush() = 0;
virtual void flush(int64_t timeoutUsec = 3e7) = 0;

/// Get the rank of the remote process.
///
Expand Down Expand Up @@ -533,8 +535,14 @@ class Communicator {
/// @param remoteRank The rank of the remote process.
/// @param tag The tag of the connection for identifying it.
/// @param transport The type of transport to be used.
/// @param ibMaxCqSize The maximum number of completion queue entries for IB. Unused if transport is not IB.
/// @param ibMaxCqPollNum The maximum number of completion queue entries to poll for IB. Unused if transport is not
/// IB.
/// @param ibMaxSendWr The maximum number of outstanding send work requests for IB. Unused if transport is not IB.
/// @param ibMaxWrPerSend The maximum number of work requests per send for IB. Unused if transport is not IB.
/// @return std::shared_ptr<Connection> A shared pointer to the connection.
std::shared_ptr<Connection> connectOnSetup(int remoteRank, int tag, Transport transport);
std::shared_ptr<Connection> connectOnSetup(int remoteRank, int tag, Transport transport, int ibMaxCqSize = 1024,
int ibMaxCqPollNum = 1, int ibMaxSendWr = 8192, int ibMaxWrPerSend = 64);

/// Add a custom Setuppable object to a list of objects to be setup later, when @ref setup() is called.
///
Expand Down
93 changes: 16 additions & 77 deletions include/mscclpp/fifo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,25 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <mscclpp/fifo_device.hpp>
#include <mscclpp/poll.hpp>

#define MSCCLPP_PROXY_FIFO_SIZE 128

namespace mscclpp {

/// A struct representing a pair of 64-bit unsigned integers used as a trigger for the proxy.
///
/// This struct is used as a work element in the concurrent FIFO where multiple device threads can push
/// ProxyTrigger elements and a single host proxy thread consumes these work elements.
///
struct alignas(16) ProxyTrigger {
uint64_t fst, snd;
};

/// A concurrent FIFO where multiple device threads can push work elements and a single host proxy thread consumes them.
///
/// The FIFO has a head pointer allocated on the device which starts at 0 and goes up to 2^64-1, which is almost
/// infinity. There are two copies of the tail, one on the device, @ref DeviceProxyFifo::tailReplica, and another on the
/// host, namely, hostTail. The host always has the "true" tail and occasionally pushes it to the copy on the device.
/// Therefore, most of the time, the device has a stale version. The invariants are: tailReplica <= hostTail <= head.
/// The @ref push() function increments head, hostTail is updated in @ref HostProxyFifo::pop(), and it occasionally
/// flushes it to tailReplica via @ref HostProxyFifo::flushTail().
///
/// Duplicating the tail is a good idea because the FIFO is large enough, and we do not need frequent updates for the
/// tail as there is usually enough space for device threads to push their work into.
///
struct DeviceProxyFifo {
#ifdef __CUDACC__
/// Push a trigger to the FIFO.
///
/// @param trigger The trigger to push.
/// @return The new head of the FIFO.
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1);

// Only one of two conditions need to be met to proceed. Either the tail has advanced enough or where we need to
// write to is 0. However, the first condition is faster to check since the tail is flushed periodically anyways but
// for the second condition we need to read CPU memory.
// As volatile access is slow, we first check using the bare pointer and then use the volatile pointer if the
// condition is not met.
if (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *(this->tailReplica)) {
OR_POLL_MAYBE_JAILBREAK(curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica),
*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0,
1000000);
}

ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]);
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd));
return curFifoHead;
}

/// Wait until there is a place in the FIFO to push a trigger.
///
/// @param curFifoHead The current head of the FIFO.
__forceinline__ __device__ void sync(uint64_t curFifoHead) {
// Same as push but in this case checking the fist condition is probably faster since for tail to be pushed we need
// to wait for cudaMemcpy to be done.
OR_POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]) != 0,
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead, 1000000);
}
#endif // __CUDACC__

/// The FIFO buffer that is allocated on the host via `cudaHostAlloc()`.
ProxyTrigger* triggers;
/// Replica of the FIFO tail that is allocated on device.
uint64_t* tailReplica;
/// The FIFO head. Allocated on the device and only accessed by the device.
uint64_t* head;
};

/// A class representing a host proxy FIFO that can consume work elements pushed by device threads.
class HostProxyFifo {
class Fifo {
public:
/// Constructs a new @ref HostProxyFifo object.
HostProxyFifo();
/// Constructs a new @ref Fifo object.
/// @param size The number of entires in the FIFO.
Fifo(int size = 128);

/// Destroys the @ref HostProxyFifo object.
~HostProxyFifo();
/// Destroys the @ref Fifo object.
~Fifo();

/// Polls the FIFO for a trigger.
///
/// @param trigger A pointer to the trigger to be filled.
void poll(ProxyTrigger* trigger);
/// Returns @ref ProxyTrigger which is the trigger at the head of fifo.
ProxyTrigger poll();

/// Pops a trigger from the FIFO.
void pop();
Expand All @@ -100,10 +35,14 @@ class HostProxyFifo {
/// @param sync If true, waits for the flush to complete before returning.
void flushTail(bool sync = false);

/// Returns a @ref DeviceProxyFifo object representing the device FIFO.
/// Return the FIFO size.
/// @return The FIFO size.
int size() const;

/// Returns a @ref FifoDeviceHandle object representing the device FIFO.
///
/// @return A @ref DeviceProxyFifo object representing the device FIFO.
DeviceProxyFifo deviceFifo();
/// @return A @ref FifoDeviceHandle object representing the device FIFO.
FifoDeviceHandle deviceHandle();

private:
struct Impl;
Expand Down
83 changes: 83 additions & 0 deletions include/mscclpp/fifo_device.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_FIFO_DEVICE_HPP_
#define MSCCLPP_FIFO_DEVICE_HPP_

#include "poll.hpp"

namespace mscclpp {

/// A struct representing a pair of 64-bit unsigned integers used as a trigger for the proxy.
///
/// This struct is used as a work element in the concurrent FIFO where multiple device threads can push
/// ProxyTrigger elements and a single host proxy thread consumes these work elements.
///
/// Do not use the most significant bit of @ref snd as it is reserved for memory consistency purposes
struct alignas(16) ProxyTrigger {
uint64_t fst, snd;
};

/// A concurrent FIFO where multiple device threads can push work elements and a single host proxy thread consumes them.
///
/// The FIFO has a head pointer allocated on the device which starts at 0 and goes up to 2^64-1, which is almost
/// infinity. There are two copies of the tail, one on the device, @ref FifoDeviceHandle::tailReplica, and another on
/// the host, namely, hostTail. The host always has the "true" tail and occasionally pushes it to the copy on the
/// device. Therefore, most of the time, the device has a stale version. The invariants are: tailReplica <= hostTail <=
/// head. The @ref push() function increments head, hostTail is updated in @ref Fifo::pop(), and it occasionally flushes
/// it to tailReplica via @ref Fifo::flushTail().
///
/// Duplicating the tail is a good idea because the FIFO is large enough, and we do not need frequent updates for the
/// tail as there is usually enough space for device threads to push their work into.
///
struct FifoDeviceHandle {
#ifdef __CUDACC__
/// Push a trigger to the FIFO.
///
/// @param trigger The trigger to push.
/// @return The new head of the FIFO.
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1);
// make the last bit intentionally non-zero so that we can safely poll. Don't worry, we will change it back in host
// side
trigger.snd ^= ((uint64_t)1 << (uint64_t)63);

// Only one of two conditions need to be met to proceed. Either the tail has advanced enough or where we need to
// write to is 0. However, the first condition is faster to check since the tail is flushed periodically anyways but
// for the second condition we need to read CPU memory.
// As volatile access is slow, we first check using the bare pointer and then use the volatile pointer if the
// condition is not met.
if (curFifoHead >= size + *(this->tailReplica)) {
OR_POLL_MAYBE_JAILBREAK(curFifoHead >= size + *((volatile uint64_t*)this->tailReplica),
*(volatile uint64_t*)&this->triggers[curFifoHead % size] != 0, 1000000);
}

ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % size]);
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd));
return curFifoHead;
}

/// Wait until there is a place in the FIFO to push a trigger.
///
/// @param curFifoHead The current head of the FIFO.
__forceinline__ __device__ void sync(uint64_t curFifoHead) {
// Same as push but in this case checking the fist condition is probably faster since for tail to be pushed we need
// to wait for cudaMemcpy to be done.
OR_POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % size]) != 0,
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead, 1000000);
}
#endif // __CUDACC__

/// The FIFO buffer that is allocated on the host via `cudaHostAlloc()`.
ProxyTrigger* triggers;
/// Replica of the FIFO tail that is allocated on device.
uint64_t* tailReplica;
/// The FIFO head. Allocated on the device and only accessed by the device.
uint64_t* head;
/// The FIFO size.
int size;
};

} // namespace mscclpp

#endif // MSCCLPP_FIFO_DEVICE_HPP_
File renamed without changes.
Loading

0 comments on commit 8d1b984

Please sign in to comment.