Skip to content

Commit

Permalink
logs improvements
Browse files Browse the repository at this point in the history
* Do not use unsafe pointer arithmetics exceeding existing memory. (exceeding m_fptr)
* Properly use the enire 32MB of the buffer, previously only 16MB were used for cuncurrent access.
* Fix a bug occurring after attempting to push 1TB (40 bits).
* Flush the log before hitting a debugging breakpoint.
  • Loading branch information
elad335 committed Nov 12, 2022
1 parent 324b103 commit 1a42bc5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 22 deletions.
6 changes: 6 additions & 0 deletions Utilities/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,10 @@ const bool s_terminate_handler_set = []() -> bool
std::set_terminate([]()
{
if (IsDebuggerPresent())
{
logs::listener::sync_all();
utils::trap();
}

report_fatal_error("RPCS3 has abnormally terminated.");
});
Expand Down Expand Up @@ -2650,7 +2653,10 @@ void thread_base::exec()
sig_log.fatal("Thread terminated due to fatal error: %s", reason);

if (IsDebuggerPresent())
{
logs::listener::sync_all();
utils::trap();
}

if (const auto _this = g_tls_this_thread)
{
Expand Down
102 changes: 80 additions & 22 deletions rpcs3/util/logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ namespace logs

// Memory-mapped buffer size
constexpr u64 s_log_size = 32 * 1024 * 1024;
static_assert(s_log_size * s_log_size > s_log_size && (s_log_size & (s_log_size - 1)) == 0); // Assert on an overflowing value

class file_writer
{
Expand All @@ -87,8 +88,8 @@ namespace logs
z_stream m_zs{};
shared_mutex m_m{};

alignas(128) atomic_t<u64> m_buf{0}; // MSB (40 bit): push begin, LSB (24 bis): push size
alignas(128) atomic_t<u64> m_out{0}; // Amount of bytes written to file
atomic_t<u64, 64> m_buf{0}; // MSB (39 bits): push begin, LSB (25 bis): push size
atomic_t<u64, 64> m_out{0}; // Amount of bytes written to file

uchar m_zout[65536]{};

Expand All @@ -102,6 +103,9 @@ namespace logs

// Append raw data
void log(const char* text, usz size);

// Ensure written to disk
void sync();
};

struct file_listener final : file_writer, public listener
Expand All @@ -111,6 +115,11 @@ namespace logs
~file_listener() override = default;

void log(u64 stamp, const message& msg, const std::string& prefix, const std::string& text) override;

void sync() override
{
file_writer::sync();
}
};

struct root_listener final : public listener
Expand Down Expand Up @@ -321,6 +330,18 @@ void logs::listener::broadcast(const logs::stored_message& msg) const
}
}

void logs::listener::sync()
{
}

void logs::listener::sync_all()
{
for (listener* lis = get_logger(); lis; lis = lis->m_next)
{
lis->sync();
}
}

logs::registerer::registerer(channel& _ch)
{
std::lock_guard lock(g_mutex);
Expand Down Expand Up @@ -441,7 +462,7 @@ logs::file_writer::file_writer(const std::string& name, u64 max_size)
{
const u64 bufv = m_buf;

if (bufv & 0xffffff)
if (bufv % s_log_size)
{
// Wait if threads are writing logs
std::this_thread::yield();
Expand Down Expand Up @@ -469,10 +490,7 @@ logs::file_writer::~file_writer()
}

// Stop writer thread
while (m_out << 24 < m_buf)
{
std::this_thread::yield();
}
file_writer::sync();

m_out = -1;
m_writer.join();
Expand Down Expand Up @@ -513,21 +531,21 @@ bool logs::file_writer::flush(u64 bufv)
std::lock_guard lock(m_m);

const u64 st = +m_out;
const u64 end = std::min<u64>((st + s_log_size) & ~(s_log_size - 1), bufv >> 24);
const u64 end = std::min<u64>({(st + s_log_size) & ~(s_log_size - 1), bufv / s_log_size, m_max_size});

if (end > st)
{
// Avoid writing too big fragments
const u64 size = std::min<u64>(end - st, sizeof(m_zout) / 2);

// Write uncompressed
if (m_fout && st < m_max_size && m_fout.write(m_fptr.get() + st % s_log_size, size) != size)
if (m_fout && m_fout.write(m_fptr.get() + st % s_log_size, size) != size)
{
m_fout.close();
}

// Write compressed
if (m_fout2 && st < m_max_size)
if (m_fout2)
{
m_zs.avail_in = static_cast<uInt>(size);
m_zs.next_in = m_fptr.get() + st % s_log_size;
Expand Down Expand Up @@ -562,18 +580,16 @@ void logs::file_writer::log(const char* text, usz size)
}

// TODO: write bigger fragment directly in blocking manner
while (size && size <= 0xffffff)
while (size && size < s_log_size)
{
u64 bufv = 0;

const auto pos = m_buf.atomic_op([&](u64& v) -> uchar*
const auto [bufv, pos] = m_buf.fetch_op([&](u64& v) -> uchar*
{
const u64 v1 = v >> 24;
const u64 v2 = v & 0xffffff;
const u64 out = m_out % s_log_size;
const u64 v1 = (v / s_log_size) % s_log_size;
const u64 v2 = v % s_log_size;

if (v2 + size > 0xffffff || v1 + v2 + size >= m_out + s_log_size) [[unlikely]]
if (v1 + v2 + size > (out < v1 ? out + s_log_size : out)) [[unlikely]]
{
bufv = v;
return nullptr;
}

Expand All @@ -583,22 +599,34 @@ void logs::file_writer::log(const char* text, usz size)

if (!pos) [[unlikely]]
{
if ((bufv & 0xffffff) + size > 0xffffff || bufv & 0xffffff)
if (m_out >= m_max_size || (!m_fout && !m_fout2))
{
// Logging is inactive
return;
}

if ((bufv % s_log_size) + size >= s_log_size || bufv % s_logs_size)
{
// Concurrency limit reached
std::this_thread::yield();
}
else if (!m_m.is_free())
{
// Wait for another flush call to complete
m_m.lock_unlock();
}
else
{
// Queue is full, need to write out
flush(bufv);
}

continue;
}

if (pos + size > m_fptr.get() + s_log_size)
if (pos - m_fptr.get() + size > s_log_size)
{
const auto frag = m_fptr.get() + s_log_size - pos;
const auto frag = s_log_size - (pos - m_fptr.get());
std::memcpy(pos, text, frag);
std::memcpy(m_fptr.get(), text + frag, size - frag);
}
Expand All @@ -607,11 +635,41 @@ void logs::file_writer::log(const char* text, usz size)
std::memcpy(pos, text, size);
}

m_buf += (u64{size} << 24) - size;
m_buf += (size * s_log_size) - size;
break;
}
}

void logs::file_writer::sync()
{
if (!m_fptr || (!m_fout && !m_fout2))
{
return;
}

// Wait for the writer thread
while ((m_out % s_log_size) * s_log_size < m_buf)
{
if (m_out >= m_max_size)
{
break;
}

std::this_thread::yield();
}

// Ensure written to disk
if (m_fout)
{
m_fout.sync();
}

if (m_fout2)
{
m_fout2.sync();
}
}

logs::file_listener::file_listener(const std::string& path, u64 max_size)
: file_writer(path, max_size)
, listener()
Expand Down
6 changes: 6 additions & 0 deletions rpcs3/util/logs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ namespace logs
// Process log message
virtual void log(u64 stamp, const message& msg, const std::string& prefix, const std::string& text) = 0;

// Flush contents (file writer)
virtual void sync();

// Add new listener
static void add(listener*);

// Special purpose
void broadcast(const stored_message&) const;

// Flush log to disk
static void sync_all();
};

struct alignas(16) channel : private message
Expand Down

0 comments on commit 1a42bc5

Please sign in to comment.