diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index e3df8b3081..4e3500a389 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -57,6 +57,7 @@ add_library(citra_common STATIC detached_tasks.h bit_field.h bit_set.h + bounded_threadsafe_queue.h cityhash.cpp cityhash.h color.h diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h new file mode 100644 index 0000000000..04d1905727 --- /dev/null +++ b/src/common/bounded_threadsafe_queue.h @@ -0,0 +1,250 @@ +// Copyright 2023 yuzu Emulator Project +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#pragma once + +#include +#include +#include +#include +#include + +#include "common/polyfill_thread.h" + +namespace Common { + +namespace detail { +constexpr size_t DefaultCapacity = 0x1000; +} // namespace detail + +template +class SPSCQueue { + static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); + +public: + template + bool TryEmplace(Args&&... args) { + return Emplace(std::forward(args)...); + } + + template + void EmplaceWait(Args&&... args) { + Emplace(std::forward(args)...); + } + + bool TryPop(T& t) { + return Pop(t); + } + + void PopWait(T& t) { + Pop(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + Pop(t, stop_token); + } + + T PopWait() { + T t; + Pop(t); + return t; + } + + T PopWait(std::stop_token stop_token) { + T t; + Pop(t, stop_token); + return t; + } + +private: + enum class PushMode { + Try, + Wait, + Count, + }; + + enum class PopMode { + Try, + Wait, + WaitWithStopToken, + Count, + }; + + template + bool Emplace(Args&&... args) { + const size_t write_index = m_write_index.load(std::memory_order::relaxed); + + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity; + }); + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); + + return true; + } + + template + bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) { + const size_t read_index = m_read_index.load(std::memory_order::relaxed); + + if constexpr (Mode == PopMode::Try) { + // Check if the queue is empty. + if (read_index == m_write_index.load(std::memory_order::acquire)) { + return false; + } + } else if constexpr (Mode == PopMode::Wait) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + consumer_cv.wait(lock, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + } else if constexpr (Mode == PopMode::WaitWithStopToken) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + if (stop_token.stop_requested()) { + return false; + } + } else { + static_assert(Mode < PopMode::Count, "Invalid PopMode."); + } + + // Determine the position to read from. + const size_t pos = read_index % Capacity; + + // Pop the data off the queue, moving it. + t = std::move(m_data[pos]); + + // Increment the read index. + ++m_read_index; + + // Notify the producer that we have popped off the queue. + std::scoped_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); + + return true; + } + + alignas(128) std::atomic_size_t m_read_index{0}; + alignas(128) std::atomic_size_t m_write_index{0}; + + std::array m_data; + + std::condition_variable_any producer_cv; + std::mutex producer_cv_mutex; + std::condition_variable_any consumer_cv; + std::mutex consumer_cv_mutex; +}; + +template +class MPSCQueue { +public: + template + bool TryEmplace(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryEmplace(std::forward(args)...); + } + + template + void EmplaceWait(Args&&... args) { + std::scoped_lock lock{write_mutex}; + spsc_queue.EmplaceWait(std::forward(args)...); + } + + bool TryPop(T& t) { + return spsc_queue.TryPop(t); + } + + void PopWait(T& t) { + spsc_queue.PopWait(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + spsc_queue.PopWait(t, stop_token); + } + + T PopWait() { + return spsc_queue.PopWait(); + } + + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + +private: + SPSCQueue spsc_queue; + std::mutex write_mutex; +}; + +template +class MPMCQueue { +public: + template + bool TryEmplace(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryEmplace(std::forward(args)...); + } + + template + void EmplaceWait(Args&&... args) { + std::scoped_lock lock{write_mutex}; + spsc_queue.EmplaceWait(std::forward(args)...); + } + + bool TryPop(T& t) { + std::scoped_lock lock{read_mutex}; + return spsc_queue.TryPop(t); + } + + void PopWait(T& t) { + std::scoped_lock lock{read_mutex}; + spsc_queue.PopWait(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + std::scoped_lock lock{read_mutex}; + spsc_queue.PopWait(t, stop_token); + } + + T PopWait() { + std::scoped_lock lock{read_mutex}; + return spsc_queue.PopWait(); + } + + T PopWait(std::stop_token stop_token) { + std::scoped_lock lock{read_mutex}; + return spsc_queue.PopWait(stop_token); + } + +private: + SPSCQueue spsc_queue; + std::mutex write_mutex; + std::mutex read_mutex; +}; + +} // namespace Common diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index deb20bb3b5..36f4a3f243 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -21,6 +21,7 @@ #define CITRA_LINUX_GCC_BACKTRACE #endif +#include "common/bounded_threadsafe_queue.h" #include "common/common_paths.h" #include "common/file_util.h" #include "common/literals.h" @@ -32,7 +33,6 @@ #include "common/settings.h" #include "common/string_util.h" #include "common/thread.h" -#include "common/threadsafe_queue.h" namespace Common::Log { @@ -237,11 +237,11 @@ public: void PushEntry(Class log_class, Level log_level, const char* filename, unsigned int line_num, const char* function, std::string message) { - if (!filter.CheckMessage(log_class, log_level)) + if (!filter.CheckMessage(log_class, log_level)) { return; - const Entry& entry = - CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)); - message_queue.Push(entry); + } + message_queue.EmplaceWait( + CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } private: @@ -317,7 +317,7 @@ private: ForEachBackend([&entry](Backend& backend) { backend.Write(entry); }); }; while (!stop_token.stop_requested()) { - entry = message_queue.PopWait(stop_token); + message_queue.PopWait(entry, stop_token); if (entry.filename != nullptr) { write_logs(); } @@ -325,7 +325,7 @@ private: // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a // case where a system is repeatedly spamming logs even on close. int max_logs_to_write = filter.IsDebug() ? INT_MAX : 100; - while (max_logs_to_write-- && message_queue.Pop(entry)) { + while (max_logs_to_write-- && message_queue.TryPop(entry)) { write_logs(); } }); @@ -395,7 +395,7 @@ private: ColorConsoleBackend color_console_backend{}; FileBackend file_backend; - MPSCQueue message_queue{}; + MPSCQueue message_queue{}; std::chrono::steady_clock::time_point time_origin{std::chrono::steady_clock::now()}; std::jthread backend_thread;