Skip to content

Commit

Permalink
Use WorkerThreadPool for Server threads
Browse files Browse the repository at this point in the history
* Servers now use WorkerThreadPool for background computation.
* This helps keep the number of threads used fixed at all times.
* It also ensures everything works on HTML5 with threads.
* And makes it easier to support disabling threads for also HTML5.

CommandQueueMT now syncs with the servers via the WorkerThreadPool
yielding mechanism, which makes its classic main sync semaphore
superfluous.

Also, some warnings about calls that kill performance when using
threaded rendering are removed because there's a mechanism that
warns about that in a more general fashion.

Co-authored-by: Pedro J. Estébanez <[email protected]>
  • Loading branch information
reduz and RandomShaper committed Apr 10, 2024
1 parent c28f590 commit 65686de
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 204 deletions.
16 changes: 16 additions & 0 deletions core/config/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ int Engine::get_audio_output_latency() const {
return _audio_output_latency;
}

void Engine::increment_frames_drawn() {
if (frame_server_synced) {
server_syncs++;
} else {
server_syncs = 0;
}
frame_server_synced = false;

frames_drawn++;
}

uint64_t Engine::get_frames_drawn() {
return frames_drawn;
}
Expand Down Expand Up @@ -364,6 +375,11 @@ Engine *Engine::get_singleton() {
return singleton;
}

bool Engine::notify_frame_server_synced() {
frame_server_synced = true;
return server_syncs > SERVER_SYNC_FRAME_COUNT_WARNING;
}

Engine::Engine() {
singleton = this;
}
Expand Down
7 changes: 7 additions & 0 deletions core/config/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class Engine {
String write_movie_path;
String shader_cache_path;

static constexpr int SERVER_SYNC_FRAME_COUNT_WARNING = 5;
int server_syncs = 0;
bool frame_server_synced = false;

public:
static Engine *get_singleton();

Expand Down Expand Up @@ -179,6 +183,9 @@ class Engine {
bool is_generate_spirv_debug_info_enabled() const;
int32_t get_gpu_index() const;

void increment_frames_drawn();
bool notify_frame_server_synced();

Engine();
virtual ~Engine() {}
};
Expand Down
8 changes: 1 addition & 7 deletions core/templates/command_queue_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,8 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
return &sync_sems[idx];
}

CommandQueueMT::CommandQueueMT(bool p_sync) {
if (p_sync) {
sync = memnew(Semaphore);
}
CommandQueueMT::CommandQueueMT() {
}

CommandQueueMT::~CommandQueueMT() {
if (sync) {
memdelete(sync);
}
}
43 changes: 25 additions & 18 deletions core/templates/command_queue_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,17 @@
#define CMD_TYPE(N) Command##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)>
#define CMD_ASSIGN_PARAM(N) cmd->p##N = p##N

#define DECL_PUSH(N) \
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
unlock(); \
if (sync) \
sync->post(); \
#define DECL_PUSH(N) \
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
}

#define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R>
Expand All @@ -272,9 +273,10 @@
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->ret = r_ret; \
cmd->sync_sem = ss; \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
if (sync) \
sync->post(); \
ss->sem.wait(); \
ss->in_use = false; \
}
Expand All @@ -290,9 +292,10 @@
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->sync_sem = ss; \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
if (sync) \
sync->post(); \
ss->sem.wait(); \
ss->in_use = false; \
}
Expand Down Expand Up @@ -340,7 +343,7 @@ class CommandQueueMT {
LocalVector<uint8_t> command_mem;
SyncSemaphore sync_sems[SYNC_SEMAPHORES];
Mutex mutex;
Semaphore *sync = nullptr;
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
uint64_t flush_read_ptr = 0;

template <typename T>
Expand Down Expand Up @@ -421,12 +424,16 @@ class CommandQueueMT {
}

void wait_and_flush() {
ERR_FAIL_NULL(sync);
sync->wait();
ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
_flush();
}

CommandQueueMT(bool p_sync);
void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
pump_task_id = p_task_id;
}

CommandQueueMT();
~CommandQueueMT();
};

Expand Down
6 changes: 0 additions & 6 deletions drivers/gles3/storage/particles_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ bool ParticlesStorage::particles_get_emitting(RID p_particles) {
return false;
}

ERR_FAIL_COND_V_MSG(RSG::threaded, false, "This function should never be used with threaded rendering, as it stalls the renderer.");
Particles *particles = particles_owner.get_or_null(p_particles);
ERR_FAIL_NULL_V(particles, false);

Expand Down Expand Up @@ -380,10 +379,6 @@ void ParticlesStorage::particles_request_process(RID p_particles) {
}

AABB ParticlesStorage::particles_get_current_aabb(RID p_particles) {
if (RSG::threaded) {
WARN_PRINT_ONCE("Calling this function with threaded rendering enabled stalls the renderer, use with care.");
}

const Particles *particles = particles_owner.get_or_null(p_particles);
ERR_FAIL_NULL_V(particles, AABB());

Expand Down Expand Up @@ -1207,7 +1202,6 @@ Dependency *ParticlesStorage::particles_get_dependency(RID p_particles) const {
}

bool ParticlesStorage::particles_is_inactive(RID p_particles) const {
ERR_FAIL_COND_V_MSG(RSG::threaded, false, "This function should never be used with threaded rendering, as it stalls the renderer.");
const Particles *particles = particles_owner.get_or_null(p_particles);
ERR_FAIL_NULL_V(particles, false);
return !particles->emitting && particles->inactive;
Expand Down
11 changes: 8 additions & 3 deletions main/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,9 @@ Error Main::setup(const char *execpath, int argc, char *argv[], bool p_second_ph
// Editor and project manager cannot run with rendering in a separate thread (they will crash on startup).
rtm = OS::RENDER_THREAD_SAFE;
}
#if !defined(THREADS_ENABLED)
rtm = OS::RENDER_THREAD_SAFE;
#endif
OS::get_singleton()->_render_thread_mode = OS::RenderThreadMode(rtm);
}

Expand Down Expand Up @@ -2717,7 +2720,9 @@ Error Main::setup2() {
}

if (OS::get_singleton()->_render_thread_mode == OS::RENDER_SEPARATE_THREAD) {
WARN_PRINT("The Multi-Threaded rendering thread model is experimental, and has known issues which can lead to project crashes. Use the Single-Safe option in the project settings instead.");
WARN_PRINT("The Multi-Threaded rendering thread model is experimental. Feel free to try it since it will eventually become a stable feature.\n"
"However, bear in mind that at the moment it can lead to project crashes or instability.\n"
"So, unless you want to test the engine, use the Single-Safe option in the project settings instead.");
}

/* Initialize Pen Tablet Driver */
Expand Down Expand Up @@ -4025,11 +4030,11 @@ bool Main::iteration() {
if ((!force_redraw_requested) && OS::get_singleton()->is_in_low_processor_usage_mode()) {
if (RenderingServer::get_singleton()->has_changed()) {
RenderingServer::get_singleton()->draw(true, scaled_step); // flush visual commands
Engine::get_singleton()->frames_drawn++;
Engine::get_singleton()->increment_frames_drawn();
}
} else {
RenderingServer::get_singleton()->draw(true, scaled_step); // flush visual commands
Engine::get_singleton()->frames_drawn++;
Engine::get_singleton()->increment_frames_drawn();
force_redraw_requested = false;
}
}
Expand Down
54 changes: 19 additions & 35 deletions servers/physics_server_2d_wrap_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,26 @@
#include "core/os/os.h"

void PhysicsServer2DWrapMT::thread_exit() {
exit.set();
exit = true;
}

void PhysicsServer2DWrapMT::thread_step(real_t p_delta) {
physics_server_2d->step(p_delta);
step_sem.post();
}

void PhysicsServer2DWrapMT::_thread_callback(void *_instance) {
PhysicsServer2DWrapMT *vsmt = reinterpret_cast<PhysicsServer2DWrapMT *>(_instance);

vsmt->thread_loop();
}

void PhysicsServer2DWrapMT::thread_loop() {
server_thread = Thread::get_caller_id();

physics_server_2d->init();

exit.clear();
step_thread_up.set();
while (!exit.is_set()) {
// flush commands one by one, until exit is requested
command_queue.wait_and_flush();
command_queue.set_pump_task_id(server_task_id);
while (!exit) {
WorkerThreadPool::get_singleton()->yield();
command_queue.flush_all();
}

command_queue.flush_all(); // flush all
command_queue.flush_all();

physics_server_2d->finish();
}
Expand All @@ -70,18 +63,14 @@ void PhysicsServer2DWrapMT::step(real_t p_step) {
if (create_thread) {
command_queue.push(this, &PhysicsServer2DWrapMT::thread_step, p_step);
} else {
command_queue.flush_all(); //flush all pending from other threads
command_queue.flush_all(); // Flush all pending from other threads.
physics_server_2d->step(p_step);
}
}

void PhysicsServer2DWrapMT::sync() {
if (create_thread) {
if (first_frame) {
first_frame = false;
} else {
step_sem.wait(); //must not wait if a step was not issued
}
step_sem.wait();
}
physics_server_2d->sync();
}
Expand All @@ -96,39 +85,34 @@ void PhysicsServer2DWrapMT::end_sync() {

void PhysicsServer2DWrapMT::init() {
if (create_thread) {
thread.start(_thread_callback, this);
while (!step_thread_up.is_set()) {
OS::get_singleton()->delay_usec(1000);
}
exit = false;
server_task_id = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::thread_loop), true);
step_sem.post();
} else {
physics_server_2d->init();
}
}

void PhysicsServer2DWrapMT::finish() {
if (thread.is_started()) {
if (create_thread) {
command_queue.push(this, &PhysicsServer2DWrapMT::thread_exit);
thread.wait_to_finish();
if (server_task_id != WorkerThreadPool::INVALID_TASK_ID) {
WorkerThreadPool::get_singleton()->wait_for_task_completion(server_task_id);
server_task_id = WorkerThreadPool::INVALID_TASK_ID;
}
} else {
physics_server_2d->finish();
}
}

PhysicsServer2DWrapMT::PhysicsServer2DWrapMT(PhysicsServer2D *p_contained, bool p_create_thread) :
command_queue(p_create_thread) {
PhysicsServer2DWrapMT::PhysicsServer2DWrapMT(PhysicsServer2D *p_contained, bool p_create_thread) {
physics_server_2d = p_contained;
create_thread = p_create_thread;

if (!p_create_thread) {
server_thread = Thread::get_caller_id();
} else {
server_thread = 0;
if (!create_thread) {
server_thread = Thread::MAIN_ID;
}

main_thread = Thread::get_caller_id();
}

PhysicsServer2DWrapMT::~PhysicsServer2DWrapMT() {
memdelete(physics_server_2d);
//finish();
}
Loading

0 comments on commit 65686de

Please sign in to comment.