Skip to content

Commit

Permalink
Revert "Enhanced RNG State Management with Index-Based Control for Gr…
Browse files Browse the repository at this point in the history
…aph-Safe Tensor Parallelism (PaddlePaddle#58859)" (PaddlePaddle#60148)

This reverts commit 3bcdeef.
  • Loading branch information
zyfncg authored Dec 21, 2023
1 parent 1115890 commit e032331
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 583 deletions.
23 changes: 11 additions & 12 deletions paddle/fluid/pybind/generator_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,37 @@ void BindGenerator(py::module* m_ptr) {
"GeneratorState")
.def("current_seed",
[](std::shared_ptr<phi::Generator::GeneratorState>& self) {
return self->seed;
return self->current_seed;
})
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) || \
defined(PADDLE_WITH_CUSTOM_DEVICE) || defined(PADDLE_WITH_XPU)
// NOTE(shenliang03): Due to the inability to serialize mt19937_64
// type, resulting in a problem with precision under the cpu.
.def(py::pickle(
[](const phi::Generator::GeneratorState& s) { // __getstate__
return py::make_tuple(s.device, s.seed, s.offset);
return py::make_tuple(s.device, s.current_seed, s.thread_offset);
},
[](py::tuple s) { // __setstate__
if (s.size() != 3)
throw std::runtime_error(
"Invalid Random state. Please check the format(device, "
"current_seed, thread_offset).");

int64_t device = s[0].cast<int64_t>();
int64_t seed = s[1].cast<int64_t>();
uint64_t offset = s[2].cast<uint64_t>();

phi::Generator::GeneratorState state(device, seed, offset);
phi::Generator::GeneratorState state;
state.device = s[0].cast<std::int64_t>();
state.current_seed = s[1].cast<std::uint64_t>();
state.thread_offset = s[2].cast<std::uint64_t>();

std::seed_seq seq({state.current_seed});
auto engine = std::make_shared<std::mt19937_64>(seq);
state.cpu_engine = *engine;
return state;
}))
#endif
.def("__str__", [](const phi::Generator::GeneratorState& self) {
std::stringstream ostr;
ostr << self.device << " " << self.seed << " " << self.offset << " "
<< self.cpu_engine;
ostr << self.device << " " << self.current_seed << " "
<< self.thread_offset << " " << self.cpu_engine;
return ostr.str();
});

Expand All @@ -76,9 +78,6 @@ void BindGenerator(py::module* m_ptr) {
[](phi::Generator& self) { new (&self) phi::Generator(); })
.def("get_state", &phi::Generator::GetState)
.def("set_state", &phi::Generator::SetState)
.def("get_state_index", &phi::Generator::GetStateIndex)
.def("set_state_index", &phi::Generator::SetStateIndex)
.def("register_state_index", &phi::Generator::RegisterStateIndex)
.def("manual_seed",
[](std::shared_ptr<phi::Generator>& self, uint64_t seed) {
self->SetCurrentSeed(seed);
Expand Down
164 changes: 86 additions & 78 deletions paddle/phi/core/generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License. */

#include <glog/logging.h>

#include <cstdint>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -158,125 +157,134 @@ const std::shared_ptr<Generator>& GetRandomSeedGenerator(
// RandomGenerator.
std::shared_ptr<std::mt19937_64> GetCPURandomEngine(uint64_t seed) {
if (seed == 0) {
VLOG(4) << "Use random cpu_engine from generator";
VLOG(4) << "Use random engine from generator";
return DefaultCPUGenerator()->GetCPUEngine();
} else {
// NOTE(zhiqiu): creating an cpu_engine instance everytime instead of using
// NOTE(zhiqiu): creating an engine instance everytime instead of using
// OpDefaultCPUEngine(), this is the legacy behavior of random operators.
// The benefit is that when runing PE with fixed-seed in multiple thrads,
// each thread has their own cpu_engine, and doesn't affect each other.
// each thread has their own engine, and doesn't affect each other.
//
// And we need to measure the determinacy of Generator in PE.
auto cpu_engine = std::make_shared<std::mt19937_64>();
auto engine = std::make_shared<std::mt19937_64>();
static std::mutex mu_;
{
std::lock_guard<std::mutex> lock(mu_);
cpu_engine->seed(seed);
engine->seed(seed);
}
return cpu_engine;
return engine;
}
}

inline void Generator::print_state_info() {
VLOG(4) << "Generator Random state "
<< "device id: " << state().device << ", seed: " << state().seed
<< ", offset: " << state().offset << ", cpu_engine: " << cpu_engine();
}

Generator::Generator() {
auto seed = GetRandomSeed();
current_index = states_.size();
states_.emplace_back(seed);
print_state_info();
std::seed_seq seq({seed});
auto engine = std::make_shared<std::mt19937_64>(seq);
this->state_.cpu_engine = *engine;
this->state_.device = -1;
this->state_.current_seed = seed;
this->state_.thread_offset = 0;
this->engine_ = engine;
VLOG(4) << "initial seed: " << this->state_.current_seed
<< ", cpu engine: " << &this->state_.cpu_engine;
}

Generator::Generator(uint64_t seed) {
current_index = states_.size();
states_.emplace_back(-1, seed);
print_state_info();
}

Generator::Generator(uint64_t seed, int64_t device_id) {
current_index = states_.size();
// device id first, then seed
states_.emplace_back(device_id, seed);
print_state_info();
}

phi::Generator::GeneratorState Generator::GetState() { return state().clone(); }

void Generator::SetState(const phi::Generator::GeneratorState& state) {
std::lock_guard<std::mutex> lock(mu_);
if (current_index < states_.size())
states_[current_index] = state.clone();
else
PADDLE_THROW(phi::errors::NotFound("Generator index is not found"));
print_state_info();
}

uint64_t Generator::GetStateIndex() { return current_index; }

void Generator::SetStateIndex(uint64_t StateIndex) {
std::lock_guard<std::mutex> lock(mu_);
if (current_index < states_.size())
current_index = StateIndex;
else
PADDLE_THROW(phi::errors::NotFound("Generator index is not found"));
std::seed_seq seq({seed});
auto engine = std::make_shared<std::mt19937_64>(seq);
this->state_.cpu_engine = *engine;
this->state_.device = -1;
this->state_.current_seed = seed;
this->state_.thread_offset = 0;
this->engine_ = engine;
VLOG(4) << "initial seed: " << this->state_.current_seed
<< ", cpu engine: " << &this->state_.cpu_engine;
}

uint64_t Generator::RegisterStateIndex(const GeneratorState& state) {
std::lock_guard<std::mutex> lock(mu_);
auto new_index = states_.size();
states_.push_back(state);
current_index = new_index;
return new_index;
Generator::Generator(uint64_t seed, uint64_t device_id) {
std::seed_seq seq({seed});
auto engine = std::make_shared<std::mt19937_64>(seq);
this->state_.cpu_engine = *engine;
this->state_.device = static_cast<int64_t>(device_id);
this->state_.current_seed = seed;
this->state_.thread_offset = 0;
this->engine_ = engine;
VLOG(4) << "initial seed: " << this->state_.current_seed
<< ", cpu engine: " << &this->state_.cpu_engine;
}

inline Generator::GeneratorState& Generator::state() {
if (current_index < states_.size())
return states_[current_index];
else
PADDLE_THROW(phi::errors::NotFound("Generator index is not found"));
phi::Generator::GeneratorState Generator::GetState() {
std::lock_guard<std::mutex> lock(this->mu_);
state_.cpu_engine = *engine_;
VLOG(4) << "Get Random state: "
<< "device id: " << (uint64_t)(this->state_.device)
<< ", current_seed: " << this->state_.current_seed
<< ", thread_offset: " << this->state_.thread_offset
<< ", cpu engine: " << *(this->engine_);
return this->state_;
}

inline std::shared_ptr<std::mt19937_64> Generator::cpu_engine() {
return state().cpu_engine;
void Generator::SetState(const phi::Generator::GeneratorState& state) {
std::lock_guard<std::mutex> lock(this->mu_);
this->state_ = state;
this->engine_ = std::make_shared<std::mt19937_64>(state.cpu_engine);
VLOG(4) << "Set Random state: "
<< "device id: " << (uint64_t)(this->state_.device)
<< ", current_seed: " << this->state_.current_seed
<< ", thread_offset: " << this->state_.thread_offset
<< ", cpu engine: " << *(this->engine_);
}

uint64_t Generator::GetCurrentSeed() {
std::lock_guard<std::mutex> lock(mu_);
return state().seed;
std::lock_guard<std::mutex> lock(this->mu_);
return this->state_.current_seed;
}

uint64_t Generator::Seed() {
std::lock_guard<std::mutex> lock(mu_);
uint64_t seed = GetRandomSeed();
state().reset(seed);
return seed;
std::lock_guard<std::mutex> lock(this->mu_);
uint64_t seed = 0;
std::random_device de;
seed = ((((uint64_t)de()) << 32) + de()) & 0x1FFFFFFFFFFFFF;
this->state_.current_seed = seed;
std::seed_seq seq({seed});
this->engine_->seed(seq);

return this->state_.current_seed;
}

void Generator::SetCurrentSeed(uint64_t seed) {
std::lock_guard<std::mutex> lock(mu_);
state().reset(seed);
std::lock_guard<std::mutex> lock(this->mu_);
this->state_.current_seed = seed;
this->state_.thread_offset = 0;
std::seed_seq seq({seed});
this->engine_->seed(seq);
}

std::shared_ptr<std::mt19937_64> Generator::GetCPUEngine() {
return cpu_engine();
std::lock_guard<std::mutex> lock(this->mu_);
return this->engine_;
}

void Generator::SetCPUEngine(std::shared_ptr<std::mt19937_64> engine) {
std::lock_guard<std::mutex> lock(this->mu_);
this->engine_ = engine;
}

uint64_t Generator::Random64() {
std::lock_guard<std::mutex> lock(mu_);
auto current_engine = cpu_engine();
return (*current_engine)();
std::lock_guard<std::mutex> lock(this->mu_);
auto engine = this->engine_;
return (*engine)();
}

std::pair<uint64_t, uint64_t> Generator::IncrementOffset(uint64_t increment) {
std::pair<uint64_t, uint64_t> Generator::IncrementOffset(
uint64_t increment_offset) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
std::lock_guard<std::mutex> lock(mu_);
uint64_t offset = state().offset;
state().offset = offset + increment;
print_state_info();
return std::make_pair(state().seed, offset);
std::lock_guard<std::mutex> lock(this->mu_);
uint64_t cur_offset = this->state_.thread_offset;
VLOG(10) << "cur_offset = " << cur_offset
<< " increment_offset = " << increment_offset;
this->state_.thread_offset += increment_offset;
return std::make_pair(this->state_.current_seed, cur_offset);
#else
PADDLE_THROW(phi::errors::PermissionDenied(
"Increment Offset only support in CUDA place"));
Expand Down
Loading

0 comments on commit e032331

Please sign in to comment.