From 5f1e7e6fbc5c9ed54becae8709273c44e7c872b9 Mon Sep 17 00:00:00 2001 From: lawnjelly Date: Thu, 30 Mar 2023 20:06:43 +0100 Subject: [PATCH] Make MessageQueue growable * Uses simple vector resizing (po2) * Uses pair of read and write buffers --- core/local_vector.h | 6 +- core/message_queue.cpp | 280 +++++++++++++++++++++----------- core/message_queue.h | 24 ++- doc/classes/ProjectSettings.xml | 2 +- 4 files changed, 206 insertions(+), 106 deletions(-) diff --git a/core/local_vector.h b/core/local_vector.h index 18355c71b684..fe14dbb494ef 100644 --- a/core/local_vector.h +++ b/core/local_vector.h @@ -136,9 +136,11 @@ class LocalVector { } } _FORCE_INLINE_ bool empty() const { return count == 0; } - _FORCE_INLINE_ void reserve(U p_size) { + _FORCE_INLINE_ U get_capacity() const { return capacity; } + + _FORCE_INLINE_ void reserve(U p_size, bool p_allow_shrink = false) { p_size = nearest_power_of_2_templated(p_size); - if (p_size > capacity) { + if (!p_allow_shrink ? p_size > capacity : ((p_size >= count) && (p_size != capacity))) { capacity = p_size; data = (T *)memrealloc(data, capacity * sizeof(T)); CRASH_COND_MSG(!data, "Out of memory"); diff --git a/core/message_queue.cpp b/core/message_queue.cpp index adebc296946c..6e1bf25b8aea 100644 --- a/core/message_queue.cpp +++ b/core/message_queue.cpp @@ -44,17 +44,24 @@ Error MessageQueue::push_call(ObjectID p_id, const StringName &p_method, const V int room_needed = sizeof(Message) + sizeof(Variant) * p_argcount; - if ((buffer_end + room_needed) >= buffer_size) { - String type; - if (ObjectDB::get_instance(p_id)) { - type = ObjectDB::get_instance(p_id)->get_class(); + Buffer &buffer = buffers[write_buffer]; + + if ((buffer.end + room_needed) > buffer.data.size()) { + if ((buffer.end + room_needed) > max_allowed_buffer_size) { + String type; + if (ObjectDB::get_instance(p_id)) { + type = ObjectDB::get_instance(p_id)->get_class(); + } + print_line("Failed method: " + p_method); + statistics(); + ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings."); + } else { + buffer.data.resize(buffer.end + room_needed); } - print_line("Failed method: " + type + ":" + p_method + " target ID: " + itos(p_id)); - statistics(); - ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings."); } - Message *msg = memnew_placement(&buffer[buffer_end], Message); + Message *msg = memnew_placement(&buffer.data[buffer.end], Message); + msg->args = p_argcount; msg->instance_id = p_id; msg->target = p_method; @@ -63,11 +70,11 @@ Error MessageQueue::push_call(ObjectID p_id, const StringName &p_method, const V msg->type |= FLAG_SHOW_ERROR; } - buffer_end += sizeof(Message); + buffer.end += sizeof(Message); for (int i = 0; i < p_argcount; i++) { - Variant *v = memnew_placement(&buffer[buffer_end], Variant); - buffer_end += sizeof(Variant); + Variant *v = memnew_placement(&buffer.data[buffer.end], Variant); + buffer.end += sizeof(Variant); *v = *p_args[i]; } @@ -94,26 +101,33 @@ Error MessageQueue::push_set(ObjectID p_id, const StringName &p_prop, const Vari uint8_t room_needed = sizeof(Message) + sizeof(Variant); - if ((buffer_end + room_needed) >= buffer_size) { - String type; - if (ObjectDB::get_instance(p_id)) { - type = ObjectDB::get_instance(p_id)->get_class(); + Buffer &buffer = buffers[write_buffer]; + + if ((buffer.end + room_needed) > buffer.data.size()) { + if ((buffer.end + room_needed) > max_allowed_buffer_size) { + String type; + if (ObjectDB::get_instance(p_id)) { + type = ObjectDB::get_instance(p_id)->get_class(); + } + print_line("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id)); + statistics(); + ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings."); + } else { + buffer.data.resize(buffer.end + room_needed); } - print_line("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id)); - statistics(); - ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings."); } - Message *msg = memnew_placement(&buffer[buffer_end], Message); + Message *msg = memnew_placement(&buffer.data[buffer.end], Message); + msg->args = 1; msg->instance_id = p_id; msg->target = p_prop; msg->type = TYPE_SET; - buffer_end += sizeof(Message); + buffer.end += sizeof(Message); - Variant *v = memnew_placement(&buffer[buffer_end], Variant); - buffer_end += sizeof(Variant); + Variant *v = memnew_placement(&buffer.data[buffer.end], Variant); + buffer.end += sizeof(Variant); *v = p_value; return OK; @@ -126,20 +140,30 @@ Error MessageQueue::push_notification(ObjectID p_id, int p_notification) { uint8_t room_needed = sizeof(Message); - if ((buffer_end + room_needed) >= buffer_size) { - print_line("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id)); - statistics(); - ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings."); + Buffer &buffer = buffers[write_buffer]; + + if ((buffer.end + room_needed) > buffer.data.size()) { + if ((buffer.end + room_needed) > max_allowed_buffer_size) { + String type; + if (ObjectDB::get_instance(p_id)) { + type = ObjectDB::get_instance(p_id)->get_class(); + } + print_line("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id)); + statistics(); + ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings."); + } else { + buffer.data.resize(buffer.end + room_needed); + } } - Message *msg = memnew_placement(&buffer[buffer_end], Message); + Message *msg = memnew_placement(&buffer.data[buffer.end], Message); msg->type = TYPE_NOTIFICATION; msg->instance_id = p_id; //msg->target; msg->notification = p_notification; - buffer_end += sizeof(Message); + buffer.end += sizeof(Message); return OK; } @@ -161,9 +185,11 @@ void MessageQueue::statistics() { Map call_count; int null_count = 0; + Buffer &buffer = buffers[write_buffer]; + uint32_t read_pos = 0; - while (read_pos < buffer_end) { - Message *message = (Message *)&buffer[read_pos]; + while (read_pos < buffer.end) { + Message *message = (Message *)&buffer.data[read_pos]; Object *target = ObjectDB::get_instance(message->instance_id); @@ -208,7 +234,7 @@ void MessageQueue::statistics() { } } - print_line("TOTAL BYTES: " + itos(buffer_end)); + print_line("TOTAL BYTES: " + itos(buffer.end)); print_line("NULL count: " + itos(null_count)); for (Map::Element *E = set_count.front(); E; E = E->next()) { @@ -225,7 +251,8 @@ void MessageQueue::statistics() { } int MessageQueue::get_max_buffer_usage() const { - return buffer_max_used; + // Note this may be better read_buffer, or a combination, depending when this is read. + return buffers[write_buffer].data.size(); } void MessageQueue::_call_function(Object *p_target, const StringName &p_func, const Variant *p_args, int p_argcount, bool p_show_error) { @@ -244,73 +271,135 @@ void MessageQueue::_call_function(Object *p_target, const StringName &p_func, co } } -void MessageQueue::flush() { - if (buffer_end > buffer_max_used) { - buffer_max_used = buffer_end; +void MessageQueue::_update_buffer_monitor() { + // The number of flushes is an approximate delay before + // considering shrinking. This is somewhat of a magic number, + // but only acts to prevent excessive oscillations. + if (++_buffer_size_monitor.flush_count == 8192) { + uint32_t max_size = _buffer_size_monitor.max_size; + + // Uncomment this define to log message queue sizes and + // auto-shrinking behaviour. + // #define GODOT_DEBUG_MESSAGE_QUEUE_SIZES +#ifdef GODOT_DEBUG_MESSAGE_QUEUE_SIZES + print_line("MessageQueue buffer max size " + itos(max_size) + " bytes."); +#endif + + // reset for next time + _buffer_size_monitor.flush_count = 0; + _buffer_size_monitor.max_size = 0; + + for (uint32_t n = 0; n < 2; n++) { + uint32_t cap = buffers[n].data.get_capacity(); + + // Only worry about reducing memory if the capacity is high + // (due to e.g. loading a level or something). + // The shrinking will only take place below 256K, to prevent + // excessive reallocating. + if (cap > (256 * 1024)) { + // Only shrink if we are routinely using a lot less than the capacity. + if ((max_size * 4) < cap) { + buffers[n].data.reserve(cap / 2, true); +#ifdef GODOT_DEBUG_MESSAGE_QUEUE_SIZES + print_line("MessageQueue reducing buffer[" + itos(n) + "] capacity from " + itos(cap) + " bytes to " + itos(cap / 2) + " bytes."); +#endif + } + } + } } +} - uint32_t read_pos = 0; - +void MessageQueue::flush() { //using reverse locking strategy _THREAD_SAFE_LOCK_ - ERR_FAIL_COND(flushing); //already flushing, you did something odd + if (flushing) { + _THREAD_SAFE_UNLOCK_ + ERR_FAIL_MSG("Already flushing"); //already flushing, you did something odd + } + + // first flip buffers, in preparation + SWAP(read_buffer, write_buffer); + flushing = true; + _update_buffer_monitor(); + _THREAD_SAFE_UNLOCK_ - while (read_pos < buffer_end) { - //lock on each iteration, so a call can re-add itself to the message queue + // This loop works by having a read buffer and write buffer. + // While we are reading from one buffer we can be filling another. + // This enables them to be independent, and not require locks per message. + // It also avoids pushing and resizing the write buffer corrupting the read buffer. + // The trade off is that it requires more memory. + // However the peak size of each can be lower, because they do not ADD + // to each other during transit. - Message *message = (Message *)&buffer[read_pos]; + while (buffers[read_buffer].data.size()) { + uint32_t read_pos = 0; + Buffer &buffer = buffers[read_buffer]; - uint32_t advance = sizeof(Message); - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - advance += sizeof(Variant) * message->args; - } + while (read_pos < buffer.end) { + Message *message = (Message *)&buffer.data[read_pos]; - //pre-advance so this function is reentrant - read_pos += advance; + uint32_t advance = sizeof(Message); + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + advance += sizeof(Variant) * message->args; + } - _THREAD_SAFE_UNLOCK_ + read_pos += advance; - Object *target = ObjectDB::get_instance(message->instance_id); + Object *target = ObjectDB::get_instance(message->instance_id); - if (target != nullptr) { - switch (message->type & FLAG_MASK) { - case TYPE_CALL: { - Variant *args = (Variant *)(message + 1); + if (target != nullptr) { + switch (message->type & FLAG_MASK) { + case TYPE_CALL: { + Variant *args = (Variant *)(message + 1); - // messages don't expect a return value + // messages don't expect a return value - _call_function(target, message->target, args, message->args, message->type & FLAG_SHOW_ERROR); + _call_function(target, message->target, args, message->args, message->type & FLAG_SHOW_ERROR); - } break; - case TYPE_NOTIFICATION: { - // messages don't expect a return value - target->notification(message->notification); + } break; + case TYPE_NOTIFICATION: { + // messages don't expect a return value + target->notification(message->notification); - } break; - case TYPE_SET: { - Variant *arg = (Variant *)(message + 1); - // messages don't expect a return value - target->set(message->target, *arg); + } break; + case TYPE_SET: { + Variant *arg = (Variant *)(message + 1); + // messages don't expect a return value + target->set(message->target, *arg); - } break; + } break; + } } - } - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - Variant *args = (Variant *)(message + 1); - for (int i = 0; i < message->args; i++) { - args[i].~Variant(); + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + Variant *args = (Variant *)(message + 1); + for (int i = 0; i < message->args; i++) { + args[i].~Variant(); + } } - } - message->~Message(); + message->~Message(); + + } // while going through buffer + + buffer.end = 0; // reset buffer + + uint32_t buffer_data_size = buffer.data.size(); + buffer.data.clear(); _THREAD_SAFE_LOCK_ - } + // keep track of the maximum used size, so we can downsize buffers when appropriate + _buffer_size_monitor.max_size = MAX(buffer_data_size, _buffer_size_monitor.max_size); - buffer_end = 0; // reset buffer + // flip buffers, this is the only part that requires a lock + SWAP(read_buffer, write_buffer); + _THREAD_SAFE_UNLOCK_ + + } // while read buffer not empty + + _THREAD_SAFE_LOCK_ flushing = false; _THREAD_SAFE_UNLOCK_ } @@ -324,34 +413,35 @@ MessageQueue::MessageQueue() { singleton = this; flushing = false; - buffer_end = 0; - buffer_max_used = 0; - buffer_size = GLOBAL_DEF_RST("memory/limits/message_queue/max_size_kb", DEFAULT_QUEUE_SIZE_KB); - ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/message_queue/max_size_kb", PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_kb", PROPERTY_HINT_RANGE, "1024,4096,1,or_greater")); - buffer_size *= 1024; - buffer = memnew_arr(uint8_t, buffer_size); + max_allowed_buffer_size = GLOBAL_DEF_RST("memory/limits/message_queue/max_size_mb", 32); + ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/message_queue/max_size_mb", PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "4,512,1,or_greater")); + + max_allowed_buffer_size *= 1024 * 1024; } MessageQueue::~MessageQueue() { - uint32_t read_pos = 0; + for (int which = 0; which < 2; which++) { + Buffer &buffer = buffers[which]; + uint32_t read_pos = 0; - while (read_pos < buffer_end) { - Message *message = (Message *)&buffer[read_pos]; - Variant *args = (Variant *)(message + 1); - int argc = message->args; - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - for (int i = 0; i < argc; i++) { - args[i].~Variant(); + while (read_pos < buffer.end) { + Message *message = (Message *)&buffer.data[read_pos]; + Variant *args = (Variant *)(message + 1); + int argc = message->args; + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + for (int i = 0; i < argc; i++) { + args[i].~Variant(); + } } - } - message->~Message(); + message->~Message(); - read_pos += sizeof(Message); - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - read_pos += sizeof(Variant) * message->args; + read_pos += sizeof(Message); + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + read_pos += sizeof(Variant) * message->args; + } } - } + + } // for which singleton = nullptr; - memdelete_arr(buffer); } diff --git a/core/message_queue.h b/core/message_queue.h index 58bf26e6be75..4d6307534d30 100644 --- a/core/message_queue.h +++ b/core/message_queue.h @@ -31,16 +31,13 @@ #ifndef MESSAGE_QUEUE_H #define MESSAGE_QUEUE_H +#include "core/local_vector.h" #include "core/object.h" #include "core/os/thread_safe.h" class MessageQueue { _THREAD_SAFE_CLASS_ - enum { - DEFAULT_QUEUE_SIZE_KB = 4096 - }; - enum { TYPE_CALL, TYPE_NOTIFICATION, @@ -60,12 +57,23 @@ class MessageQueue { }; }; - uint8_t *buffer; - uint32_t buffer_end; - uint32_t buffer_max_used; - uint32_t buffer_size; + struct Buffer { + LocalVector data; + uint64_t end = 0; + }; + + Buffer buffers[2]; + int read_buffer = 0; + int write_buffer = 1; + uint64_t max_allowed_buffer_size = 0; + + struct BufferSizeMonitor { + uint32_t max_size = 0; + uint32_t flush_count = 0; + } _buffer_size_monitor; void _call_function(Object *p_target, const StringName &p_func, const Variant *p_args, int p_argcount, bool p_show_error); + void _update_buffer_monitor(); static MessageQueue *singleton; diff --git a/doc/classes/ProjectSettings.xml b/doc/classes/ProjectSettings.xml index efd72957e803..474caf0db535 100644 --- a/doc/classes/ProjectSettings.xml +++ b/doc/classes/ProjectSettings.xml @@ -1190,7 +1190,7 @@ - + Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.