c++ - Stack overflow when trying to implement lock-free queue -
implemented lock-free queue based on algorithm specified in maged m. michael , michael l. scott work simple, fast, , practical non-blocking , blocking concurrent queue algorithms (for algorithm, jump page 4)
i used atomic operation on shared_ptr
such std::atomic_load_explicit
etc.
when using queue in 1 thread only, fine, when using different thread, stack overflow exception.
i not trace source of problem unfortunately. seems when 1 shared_ptr
getting out of scope, decrements number of references on next concurrentqueuenode
, causes infinite recursion, can't see why..
the code:
the queue node:
template<class t> struct concurrentqueuenode { t m_data; std::shared_ptr<concurrentqueuenode> m_next; template<class ... args> concurrentqueuenode(args&& ... args) : m_data(std::forward<args>(args)...) {} std::shared_ptr<concurrentqueuenode>& getnext() { return m_next; } t getvalue() { return std::move(m_data); } };
the concurrent queue (note: not faint hearted):
template<class t> class concurrentqueue { std::shared_ptr<concurrentqueuenode<t>> m_head, m_tail; public: concurrentqueue(){ m_head = m_tail = std::make_shared<concurrentqueuenode<t>>(); } template<class ... args> void push(args&& ... args) { auto node = std::make_shared<concurrentqueuenode<t>>(std::forward<args>(args)...); std::shared_ptr<concurrentqueuenode<t>> tail; (;;) { tail = std::atomic_load_explicit(&m_tail, std::memory_order_acquire); std::shared_ptr<concurrentqueuenode<t>> next = std::atomic_load_explicit(&tail->getnext(),std::memory_order_acquire); if (tail == std::atomic_load_explicit(&m_tail, std::memory_order_acquire)) { if (next.get() == nullptr) { auto currentnext = std::atomic_load_explicit(&m_tail, std::memory_order_acquire)->getnext(); auto res = std::atomic_compare_exchange_weak(&tail->getnext(), &next, node); if (res) { break; } } else { std::atomic_compare_exchange_weak(&m_tail, &tail, next); } } } std::atomic_compare_exchange_strong(&m_tail, &tail, node); } bool trypop(t& dest) { std::shared_ptr<concurrentqueuenode<t>> head; (;;) { head = std::atomic_load_explicit(&m_head, std::memory_order_acquire); auto tail = std::atomic_load_explicit(&m_tail,std::memory_order_acquire); auto next = std::atomic_load_explicit(&head->getnext(), std::memory_order_acquire); if (head == std::atomic_load_explicit(&m_head, std::memory_order_acquire)) { if (head.get() == tail.get()) { if (next.get() == nullptr) { return false; } std::atomic_compare_exchange_weak(&m_tail, &tail, next); } else { dest = next->getvalue(); auto res = std::atomic_compare_exchange_weak(&m_head, &head, next); if (res) { break; } } } } return true; } };
example usage reproduces problem :
int main(){ concurrentqueue<int> queue; std::thread threads[4]; (auto& thread : threads) { thread = std::thread([&queue] { (auto = 0; < 100'000; i++) { queue.push(i); int y; queue.trypop(y); } }); } (auto& thread : threads) { thread.join(); } return 0; }
the problem race condition can lead every node in queue waiting freed @ once - recursive , blows stack.
if change test use 1 thread don't pop, same stack overflow error every time.
for (auto = 1; < 100000; i++) { queue.push(i); //int y; //queue.trypop(y); }
you need unrecursive-ize deleting chain of nodes:
__forceinline ~concurrentqueuenode() { if (!m_next || m_next.use_count() > 1) return; killchainofdeath(); } void killchainofdeath() { auto pthis = this; std::shared_ptr<concurrentqueuenode> next, prev; while (1) { if (pthis->m_next.use_count() > 1) break; next.swap(pthis->m_next); // unwire node prev = null; // free previous node unwired in previous loop if (!(pthis = next.get())) // move next node break; prev.swap(next); // else next.swap free before unwire. } }
i have never used shared_ptr before, don't know if there faster way this. also, since have never used shared_ptr before, don't know if algorithm suffer aba issues. unless there special in shared_ptr implementation prevent aba worry freed nodes reused, spoofing cas. never seemed have problem though when ran code.
Comments
Post a Comment