| #pragma once |
|
|
| #include <atomic> |
| #include <utility> |
| #include <cstring> |
| #include <type_traits> |
| #include <cstdint> |
|
|
| #include "libipc/def.h" |
|
|
| #include "libipc/platform/detail.h" |
| #include "libipc/circ/elem_def.h" |
| #include "libipc/utility/log.h" |
| #include "libipc/utility/utility.h" |
|
|
| namespace ipc { |
|
|
| |
| |
| |
|
|
| template <typename Flag> |
| struct prod_cons_impl; |
|
|
| template <> |
| struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> { |
|
|
| template <std::size_t DataSize, std::size_t AlignSize> |
| struct elem_t { |
| std::aligned_storage_t<DataSize, AlignSize> data_ {}; |
| }; |
|
|
| alignas(cache_line_size) std::atomic<circ::u2_t> rd_; |
| alignas(cache_line_size) std::atomic<circ::u2_t> wt_; |
|
|
| constexpr circ::u2_t cursor() const noexcept { |
| return 0; |
| } |
|
|
| template <typename W, typename F, typename E> |
| bool push(W* , F&& f, E* elems) { |
| auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed)); |
| if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) { |
| return false; |
| } |
| std::forward<F>(f)(&(elems[cur_wt].data_)); |
| wt_.fetch_add(1, std::memory_order_release); |
| return true; |
| } |
|
|
| |
| |
| |
| |
| template <typename W, typename F, typename E> |
| bool force_push(W* wrapper, F&&, E*) { |
| wrapper->elems()->disconnect_receiver(~static_cast<circ::cc_t>(0u)); |
| return false; |
| } |
|
|
| template <typename W, typename F, typename R, typename E> |
| bool pop(W* , circ::u2_t& , F&& f, R&& out, E* elems) { |
| auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); |
| if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { |
| return false; |
| } |
| std::forward<F>(f)(&(elems[cur_rd].data_)); |
| std::forward<R>(out)(true); |
| rd_.fetch_add(1, std::memory_order_release); |
| return true; |
| } |
| }; |
|
|
| template <> |
| struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>> |
| : prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> { |
|
|
| template <typename W, typename F, typename E> |
| bool force_push(W* wrapper, F&&, E*) { |
| wrapper->elems()->disconnect_receiver(1); |
| return false; |
| } |
|
|
| template <typename W, typename F, typename R, |
| template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS> |
| bool pop(W* , circ::u2_t& , F&& f, R&& out, E<DS, AS>* elems) { |
| byte_t buff[DS]; |
| for (unsigned k = 0;;) { |
| auto cur_rd = rd_.load(std::memory_order_relaxed); |
| if (circ::index_of(cur_rd) == |
| circ::index_of(wt_.load(std::memory_order_acquire))) { |
| return false; |
| } |
| std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); |
| if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { |
| std::forward<F>(f)(buff); |
| std::forward<R>(out)(true); |
| return true; |
| } |
| ipc::yield(k); |
| } |
| } |
| }; |
|
|
| template <> |
| struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>> |
| : prod_cons_impl<wr<relat::single, relat::multi, trans::unicast>> { |
|
|
| using flag_t = std::uint64_t; |
|
|
| template <std::size_t DataSize, std::size_t AlignSize> |
| struct elem_t { |
| std::aligned_storage_t<DataSize, AlignSize> data_ {}; |
| std::atomic<flag_t> f_ct_ { 0 }; |
| }; |
|
|
| alignas(cache_line_size) std::atomic<circ::u2_t> ct_; |
|
|
| template <typename W, typename F, typename E> |
| bool push(W* , F&& f, E* elems) { |
| circ::u2_t cur_ct, nxt_ct; |
| for (unsigned k = 0;;) { |
| cur_ct = ct_.load(std::memory_order_relaxed); |
| if (circ::index_of(nxt_ct = cur_ct + 1) == |
| circ::index_of(rd_.load(std::memory_order_acquire))) { |
| return false; |
| } |
| if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_acq_rel)) { |
| break; |
| } |
| ipc::yield(k); |
| } |
| auto* el = elems + circ::index_of(cur_ct); |
| std::forward<F>(f)(&(el->data_)); |
| |
| el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release); |
| while (1) { |
| auto cac_ct = el->f_ct_.load(std::memory_order_acquire); |
| if (cur_ct != wt_.load(std::memory_order_relaxed)) { |
| return true; |
| } |
| if ((~cac_ct) != cur_ct) { |
| return true; |
| } |
| if (!el->f_ct_.compare_exchange_strong(cac_ct, 0, std::memory_order_relaxed)) { |
| return true; |
| } |
| wt_.store(nxt_ct, std::memory_order_release); |
| cur_ct = nxt_ct; |
| nxt_ct = cur_ct + 1; |
| el = elems + circ::index_of(cur_ct); |
| } |
| return true; |
| } |
|
|
| template <typename W, typename F, typename E> |
| bool force_push(W* wrapper, F&&, E*) { |
| wrapper->elems()->disconnect_receiver(1); |
| return false; |
| } |
|
|
| template <typename W, typename F, typename R, |
| template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS> |
| bool pop(W* , circ::u2_t& , F&& f, R&& out, E<DS, AS>* elems) { |
| byte_t buff[DS]; |
| for (unsigned k = 0;;) { |
| auto cur_rd = rd_.load(std::memory_order_relaxed); |
| auto cur_wt = wt_.load(std::memory_order_acquire); |
| auto id_rd = circ::index_of(cur_rd); |
| auto id_wt = circ::index_of(cur_wt); |
| if (id_rd == id_wt) { |
| auto* el = elems + id_wt; |
| auto cac_ct = el->f_ct_.load(std::memory_order_acquire); |
| if ((~cac_ct) != cur_wt) { |
| return false; |
| } |
| if (el->f_ct_.compare_exchange_weak(cac_ct, 0, std::memory_order_relaxed)) { |
| wt_.store(cur_wt + 1, std::memory_order_release); |
| } |
| k = 0; |
| } |
| else { |
| std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); |
| if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { |
| std::forward<F>(f)(buff); |
| std::forward<R>(out)(true); |
| return true; |
| } |
| ipc::yield(k); |
| } |
| } |
| } |
| }; |
|
|
| template <> |
| struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> { |
|
|
| using rc_t = std::uint64_t; |
|
|
| enum : rc_t { |
| ep_mask = 0x00000000ffffffffull, |
| ep_incr = 0x0000000100000000ull |
| }; |
|
|
| template <std::size_t DataSize, std::size_t AlignSize> |
| struct elem_t { |
| std::aligned_storage_t<DataSize, AlignSize> data_ {}; |
| std::atomic<rc_t> rc_ { 0 }; |
| }; |
|
|
| alignas(cache_line_size) std::atomic<circ::u2_t> wt_; |
| alignas(cache_line_size) rc_t epoch_ { 0 }; |
|
|
| circ::u2_t cursor() const noexcept { |
| return wt_.load(std::memory_order_acquire); |
| } |
|
|
| template <typename W, typename F, typename E> |
| bool push(W* wrapper, F&& f, E* elems) { |
| E* el; |
| for (unsigned k = 0;;) { |
| circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); |
| if (cc == 0) return false; |
| el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); |
| |
| auto cur_rc = el->rc_.load(std::memory_order_acquire); |
| circ::cc_t rem_cc = cur_rc & ep_mask; |
| if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch_)) { |
| return false; |
| } |
| |
| if (el->rc_.compare_exchange_weak( |
| cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) { |
| break; |
| } |
| ipc::yield(k); |
| } |
| std::forward<F>(f)(&(el->data_)); |
| wt_.fetch_add(1, std::memory_order_release); |
| return true; |
| } |
|
|
| template <typename W, typename F, typename E> |
| bool force_push(W* wrapper, F&& f, E* elems) { |
| E* el; |
| epoch_ += ep_incr; |
| for (unsigned k = 0;;) { |
| circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); |
| if (cc == 0) return false; |
| el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); |
| |
| auto cur_rc = el->rc_.load(std::memory_order_acquire); |
| circ::cc_t rem_cc = cur_rc & ep_mask; |
| if (cc & rem_cc) { |
| ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc); |
| cc = wrapper->elems()->disconnect_receiver(rem_cc); |
| if (cc == 0) return false; |
| } |
| |
| if (el->rc_.compare_exchange_weak( |
| cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) { |
| break; |
| } |
| ipc::yield(k); |
| } |
| std::forward<F>(f)(&(el->data_)); |
| wt_.fetch_add(1, std::memory_order_release); |
| return true; |
| } |
|
|
| template <typename W, typename F, typename R, typename E> |
| bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { |
| if (cur == cursor()) return false; |
| auto* el = elems + circ::index_of(cur++); |
| std::forward<F>(f)(&(el->data_)); |
| for (unsigned k = 0;;) { |
| auto cur_rc = el->rc_.load(std::memory_order_acquire); |
| if ((cur_rc & ep_mask) == 0) { |
| std::forward<R>(out)(true); |
| return true; |
| } |
| auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id()); |
| if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { |
| std::forward<R>(out)((nxt_rc & ep_mask) == 0); |
| return true; |
| } |
| ipc::yield(k); |
| } |
| } |
| }; |
|
|
| template <> |
| struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> { |
|
|
| using rc_t = std::uint64_t; |
| using flag_t = std::uint64_t; |
|
|
| enum : rc_t { |
| rc_mask = 0x00000000ffffffffull, |
| ep_mask = 0x00ffffffffffffffull, |
| ep_incr = 0x0100000000000000ull, |
| ic_mask = 0xff000000ffffffffull, |
| ic_incr = 0x0000000100000000ull |
| }; |
|
|
| template <std::size_t DataSize, std::size_t AlignSize> |
| struct elem_t { |
| std::aligned_storage_t<DataSize, AlignSize> data_ {}; |
| std::atomic<rc_t > rc_ { 0 }; |
| std::atomic<flag_t> f_ct_ { 0 }; |
| }; |
|
|
| alignas(cache_line_size) std::atomic<circ::u2_t> ct_; |
| alignas(cache_line_size) std::atomic<rc_t> epoch_ { 0 }; |
|
|
| circ::u2_t cursor() const noexcept { |
| return ct_.load(std::memory_order_acquire); |
| } |
|
|
| constexpr static rc_t inc_rc(rc_t rc) noexcept { |
| return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask); |
| } |
|
|
| constexpr static rc_t inc_mask(rc_t rc) noexcept { |
| return inc_rc(rc) & ~rc_mask; |
| } |
|
|
| template <typename W, typename F, typename E> |
| bool push(W* wrapper, F&& f, E* elems) { |
| E* el; |
| circ::u2_t cur_ct; |
| rc_t epoch = epoch_.load(std::memory_order_acquire); |
| for (unsigned k = 0;;) { |
| circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); |
| if (cc == 0) return false; |
| el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); |
| |
| auto cur_rc = el->rc_.load(std::memory_order_relaxed); |
| circ::cc_t rem_cc = cur_rc & rc_mask; |
| if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) { |
| return false; |
| } |
| else if (!rem_cc) { |
| auto cur_fl = el->f_ct_.load(std::memory_order_acquire); |
| if ((cur_fl != cur_ct) && cur_fl) { |
| return false; |
| } |
| } |
| |
| if (el->rc_.compare_exchange_weak( |
| cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed) && |
| epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { |
| break; |
| } |
| ipc::yield(k); |
| } |
| |
| ct_.store(cur_ct + 1, std::memory_order_release); |
| std::forward<F>(f)(&(el->data_)); |
| |
| el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release); |
| return true; |
| } |
|
|
| template <typename W, typename F, typename E> |
| bool force_push(W* wrapper, F&& f, E* elems) { |
| E* el; |
| circ::u2_t cur_ct; |
| rc_t epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; |
| for (unsigned k = 0;;) { |
| circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); |
| if (cc == 0) return false; |
| el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); |
| |
| auto cur_rc = el->rc_.load(std::memory_order_acquire); |
| circ::cc_t rem_cc = cur_rc & rc_mask; |
| if (cc & rem_cc) { |
| ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc); |
| cc = wrapper->elems()->disconnect_receiver(rem_cc); |
| if (cc == 0) return false; |
| } |
| |
| if (el->rc_.compare_exchange_weak( |
| cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed)) { |
| if (epoch == epoch_.load(std::memory_order_acquire)) { |
| break; |
| } |
| else if (push(wrapper, std::forward<F>(f), elems)) { |
| return true; |
| } |
| epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; |
| } |
| ipc::yield(k); |
| } |
| |
| ct_.store(cur_ct + 1, std::memory_order_release); |
| std::forward<F>(f)(&(el->data_)); |
| |
| el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release); |
| return true; |
| } |
|
|
| template <typename W, typename F, typename R, typename E, std::size_t N> |
| bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { |
| auto* el = elems + circ::index_of(cur); |
| auto cur_fl = el->f_ct_.load(std::memory_order_acquire); |
| if (cur_fl != ~static_cast<flag_t>(cur)) { |
| return false; |
| } |
| ++cur; |
| std::forward<F>(f)(&(el->data_)); |
| for (unsigned k = 0;;) { |
| auto cur_rc = el->rc_.load(std::memory_order_acquire); |
| if ((cur_rc & rc_mask) == 0) { |
| std::forward<R>(out)(true); |
| el->f_ct_.store(cur + N - 1, std::memory_order_release); |
| return true; |
| } |
| auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id()); |
| bool last_one = false; |
| if ((last_one = (nxt_rc & rc_mask) == 0)) { |
| el->f_ct_.store(cur + N - 1, std::memory_order_release); |
| } |
| if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { |
| std::forward<R>(out)(last_one); |
| return true; |
| } |
| ipc::yield(k); |
| } |
| } |
| }; |
|
|
| } |
|
|