Skip to content

Commit

Permalink
[Object manager] don't abort entire pull request on race condition fr…
Browse files Browse the repository at this point in the history
…om concurrent chunk receive - #2 (ray-project#19216)
  • Loading branch information
mwtian authored Oct 8, 2021
1 parent fa047c0 commit b066627
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 73 deletions.
48 changes: 46 additions & 2 deletions python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,6 @@ def driver():
ray.get(driver.remote())


# TODO(ekl) this sometimes takes much longer (10+s) due to a higher level
# pull retry. We should try to resolve these hangs in the chunk transfer logic.
def test_pull_bundles_admission_control(shutdown_only):
cluster = Cluster()
object_size = int(6e6)
Expand Down Expand Up @@ -605,6 +603,52 @@ def task(x):
ray.get(t, timeout=10)


@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"_system_config": {
"worker_lease_timeout_milliseconds": 0,
"object_manager_pull_timeout_ms": 20000,
"object_spilling_threshold": 1.0,
}
}],
indirect=True)
def test_maximize_concurrent_pull_race_condition(ray_start_cluster_head):
# Test if https://github.com/ray-project/ray/issues/18062 is mitigated
cluster = ray_start_cluster_head
cluster.add_node(num_cpus=8, object_store_memory=75 * 1024 * 1024)

@ray.remote
class RemoteObjectCreator:
def put(self, i):
return np.random.rand(i * 1024 * 1024) # 8 MB data

def idle(self):
pass

@ray.remote
def f(x):
print(f"timestamp={time.time()} pulled {len(x)*8} bytes")
time.sleep(1)
return

remote_obj_creator = RemoteObjectCreator.remote()
remote_refs = [remote_obj_creator.put.remote(1) for _ in range(7)]
print(remote_refs)
# Make sure all objects are created.
ray.get(remote_obj_creator.idle.remote())

local_refs = [ray.put(np.random.rand(1 * 1024 * 1024)) for _ in range(20)]
remote_tasks = [f.remote(x) for x in local_refs]

start = time.time()
ray.get(remote_tasks)
end = time.time()
assert end - start < 20, "Too much time spent in pulling objects, " \
"check the amount of time in retries"


if __name__ == "__main__":
import pytest
import sys
Expand Down
3 changes: 1 addition & 2 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ RAY_CONFIG(int64_t, worker_register_timeout_seconds, 30)
RAY_CONFIG(int64_t, redis_db_connect_retries, 50)
RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100)

/// Timeout, in milliseconds, to wait before retrying a failed pull in the
/// ObjectManager.
/// The object manager's global timer interval in milliseconds.
RAY_CONFIG(int, object_manager_timer_freq_ms, 100)

/// Timeout, in milliseconds, to wait before retrying a failed pull in the
Expand Down
165 changes: 114 additions & 51 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,57 @@

#include "ray/object_manager/object_buffer_pool.h"

#include "absl/time/time.h"
#include "ray/common/status.h"
#include "ray/util/logging.h"

namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size)
: default_chunk_size_(chunk_size) {
store_socket_name_ = store_socket_name;
: store_socket_name_(store_socket_name), default_chunk_size_(chunk_size) {
RAY_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300));
}

ObjectBufferPool::~ObjectBufferPool() {
// Abort everything in progress.
auto create_buf_state_copy = create_buffer_state_;
for (const auto &pair : create_buf_state_copy) {
AbortCreate(pair.first);
absl::MutexLock lock(&pool_mutex_);
auto inflight_ops = create_buffer_ops_;
pool_mutex_.Unlock();

for (const auto &[id, cond_var] : inflight_ops) {
cond_var->SignalAll();
}
auto no_inflight = [this]() {
pool_mutex_.AssertReaderHeld();
return create_buffer_ops_.empty();
};
// Assume no request would arrive, acquire pool_mutex_ when there is no inflight
// operation. Otherwise print an error.
if (!pool_mutex_.LockWhenWithTimeout(absl::Condition(&no_inflight), absl::Seconds(5))) {
RAY_LOG(ERROR)
<< create_buffer_ops_.size() << " remaining inflight create buffer operations "
<< "during ObjectBufferPool destruction. Either abort these operations before "
<< "destroying ObjectBufferPool, or refactor ObjectBufferPool to make it "
"unnecessary to wait for the operations' completion.";
}

// Abort unfinished buffers in progress.
for (auto it = create_buffer_state_.begin(); it != create_buffer_state_.end(); it++) {
RAY_CHECK_OK(store_client_.Release(it->first));
RAY_CHECK_OK(store_client_.Abort(it->first));
create_buffer_state_.erase(it);
}

RAY_CHECK(create_buffer_state_.empty());
RAY_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) const {
return (data_size + default_chunk_size_ - 1) / default_chunk_size_;
}

uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) {
uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index,
uint64_t data_size) const {
return (chunk_index + 1) * default_chunk_size_ > data_size
? data_size % default_chunk_size_
: default_chunk_size_;
Expand All @@ -49,7 +73,7 @@ uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_s
std::pair<std::shared_ptr<MemoryObjectReader>, ray::Status>
ObjectBufferPool::CreateObjectReader(const ObjectID &object_id,
rpc::Address owner_address) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);

std::vector<ObjectID> object_ids{object_id};
std::vector<plasma::ObjectBuffer> object_buffers(1);
Expand All @@ -76,53 +100,21 @@ ray::Status ObjectBufferPool::CreateChunk(const ObjectID &object_id,
const rpc::Address &owner_address,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index) {
std::unique_lock<std::mutex> lock(pool_mutex_);
if (create_buffer_state_.count(object_id) == 0) {
int64_t object_size = data_size - metadata_size;
// Try to create shared buffer.
std::shared_ptr<Buffer> data;

// Release the buffer pool lock during the blocking create call.
lock.unlock();
Status s = store_client_.CreateAndSpillIfNeeded(
object_id, owner_address, object_size, nullptr, metadata_size, &data,
plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
lock.lock();

// Another thread may have succeeded in creating the chunk while the lock
// was released. In that case skip the remainder of the creation block.
if (create_buffer_state_.count(object_id) == 0) {
std::vector<boost::asio::mutable_buffer> buffer;
if (!s.ok()) {
// Create failed. The object may already exist locally. If something else went
// wrong, another chunk will succeed in creating the buffer, and this
// chunk will eventually make it here via pull requests.
return ray::Status::IOError(s.message());
}
// Read object into store.
uint8_t *mutable_data = data->Data();
uint64_t num_chunks = GetNumChunks(data_size);
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data)));
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
}
}
if (create_buffer_state_[object_id].chunk_state[chunk_index] !=
CreateChunkState::AVAILABLE) {
absl::MutexLock lock(&pool_mutex_);
RAY_RETURN_NOT_OK(EnsureBufferExists(object_id, owner_address, data_size, metadata_size,
chunk_index));
auto &state = create_buffer_state_.at(object_id);
if (state.chunk_state[chunk_index] != CreateChunkState::AVAILABLE) {
// There can be only one reference to this chunk at any given time.
return ray::Status::IOError("Chunk already received by a different thread.");
}
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED;
state.chunk_state[chunk_index] = CreateChunkState::REFERENCED;
return ray::Status::OK();
}

void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chunk_index,
const std::string &data) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it == create_buffer_state_.end() ||
it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) {
Expand All @@ -148,7 +140,7 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chun
}

void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it != create_buffer_state_.end()) {
RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id
Expand Down Expand Up @@ -179,13 +171,84 @@ std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
return chunks;
}

ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id,
const rpc::Address &owner_address,
uint64_t data_size,
uint64_t metadata_size,
uint64_t chunk_index) {
while (true) {
// Buffer for object_id already exists.
if (create_buffer_state_.contains(object_id)) {
return ray::Status::OK();
}

auto it = create_buffer_ops_.find(object_id);
if (it == create_buffer_ops_.end()) {
// No inflight create buffer operation, proceed to start one.
break;
}

auto cond_var = it->second;
// Release pool_mutex_ while waiting, until the current inflight create buffer
// operation finishes.
cond_var->Wait(&pool_mutex_);
}

// Indicate that there is an inflight create buffer operation, by inserting into
// create_buffer_ops_.
RAY_CHECK(
create_buffer_ops_.insert({object_id, std::make_shared<absl::CondVar>()}).second);
const int64_t object_size =
static_cast<int64_t>(data_size) - static_cast<int64_t>(metadata_size);
std::shared_ptr<Buffer> data;

// Release pool_mutex_ during the blocking create call.
pool_mutex_.Unlock();
Status s = store_client_.CreateAndSpillIfNeeded(
object_id, owner_address, static_cast<int64_t>(object_size), nullptr,
static_cast<int64_t>(metadata_size), &data,
plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
pool_mutex_.Lock();

// No other thread could have created the buffer.
RAY_CHECK(!create_buffer_state_.contains(object_id));

// Remove object_id from create_buffer_ops_ to indicate to the waiting ops that the
// inflight operation has finished. Wake up waiters so they can either start another
// create buffer op, or proceed after the buffer has been created.
{
auto it = create_buffer_ops_.find(object_id);
it->second->SignalAll();
create_buffer_ops_.erase(it);
}

if (!s.ok()) {
// Create failed. Buffer creation will be tried by another chunk.
// And this chunk will eventually make it here via retried pull requests.
return ray::Status::IOError(s.message());
}

// Read object into store.
uint8_t *mutable_data = data->Data();
uint64_t num_chunks = GetNumChunks(data_size);
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data)));
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;

return ray::Status::OK();
}

void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
RAY_CHECK_OK(store_client_.Delete(object_ids));
}

std::string ObjectBufferPool::DebugString() const {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
std::stringstream result;
result << "BufferPool:";
result << "\n- create buffer state map size: " << create_buffer_state_.size();
Expand Down
Loading

0 comments on commit b066627

Please sign in to comment.