SPSCQueue: Add PopWait

Adds a condition var to SPSCQueue so when a new log is pushed it will
wake the consumer thread that is calling PopWait. This only applies to
to queues with NeedSize=true
This commit is contained in:
James Rowe 2018-02-23 00:30:43 -07:00 committed by Daniel Lim Wee Soong
parent 47f0185bcd
commit 9fdc89a456

View file

@ -9,6 +9,7 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <chrono>
#include <cstddef> #include <cstddef>
#include <mutex> #include <mutex>
#include "common/common_types.h" #include "common/common_types.h"
@ -36,6 +37,10 @@ public:
T& Front() const { T& Front() const {
return read_ptr->current; return read_ptr->current;
} }
/**
* Push data to the queue. If NeedSize=True then Push will notify the waiting consumer thread
*/
template <typename Arg> template <typename Arg>
void Push(Arg&& t) { void Push(Arg&& t) {
// create the element, add it to the queue // create the element, add it to the queue
@ -45,8 +50,11 @@ public:
ElementPtr* new_ptr = new ElementPtr(); ElementPtr* new_ptr = new ElementPtr();
write_ptr->next.store(new_ptr, std::memory_order_release); write_ptr->next.store(new_ptr, std::memory_order_release);
write_ptr = new_ptr; write_ptr = new_ptr;
if (NeedSize) if (NeedSize) {
std::lock_guard<std::mutex> lock(size_lock);
size++; size++;
size_cv.notify_one();
}
} }
void Pop() { void Pop() {
@ -75,6 +83,25 @@ public:
return true; return true;
} }
/**
* Waits up to timeout for data to be Pushed to the queue. Push uses a condition variable to
* signal the waiting thread, but only if NeedSize = true. Returns false if the timeout is
* triggered. If the condition variable is signalled, returns the value from Pop
* @param T In parameter to store the value if this method returns true
* @param timeout Time in milliseconds to wait for a signal from a Push
*/
bool PopWait(T& t, u64 timeout = 500) {
if (NeedSize) {
std::unique_lock<std::mutex> lock(size_lock);
if (size_cv.wait_for(lock, std::chrono::milliseconds(timeout),
[& size = size] { return size > 0; })) {
return Pop(t);
}
return false;
}
return Pop(t);
}
// not thread-safe // not thread-safe
void Clear() { void Clear() {
size.store(0); size.store(0);
@ -102,6 +129,9 @@ private:
ElementPtr* write_ptr; ElementPtr* write_ptr;
ElementPtr* read_ptr; ElementPtr* read_ptr;
std::atomic<u32> size; std::atomic<u32> size;
std::mutex size_lock;
std::condition_variable size_cv;
}; };
// a simple thread-safe, // a simple thread-safe,