• Re: portable proxy collector test...

    From Brett@21:1/5 to Chris M. Thomasson on Fri Dec 6 19:55:07 2024
    Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:
    I am wondering if anybody can try to compile and run this C++11 code of
    mine for a portable word-based proxy collector, a sort of poor mans RCU,
    on an ARM based system? I don't have access to one. I am interested in
    the resulting output.

    https://godbolt.org

    https://pastebin.com/raw/CYZ78gVj
    (raw text link, no ads... :^)

    _______________________________________

    // Chris M. Thomassons Poor Mans RCU... Example 456...


    #include <iostream>
    #include <atomic>
    #include <thread>
    #include <cstdlib>
    #include <cstdint>
    #include <climits>
    #include <functional>


    // Masks
    static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
    static constexpr std::uint32_t ct_ref_complete = 0x30U;
    static constexpr std::uint32_t ct_ref_inc = 0x20U;
    static constexpr std::uint32_t ct_proxy_mask = 0xFU;
    static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;


    // Iteration settings
    static constexpr unsigned long ct_reader_iters_n = 2000000;
    static constexpr unsigned long ct_writer_iters_n = 200000;


    // Thread counts
    static constexpr unsigned long ct_reader_threads_n = 53;
    static constexpr unsigned long ct_writer_threads_n = 11;


    // Some debug/sanity check things...
    // Need to make this conditional in compilation with some macros...
    static std::atomic<std::uint32_t> g_debug_node_allocations(0);
    static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
    static std::atomic<std::uint32_t> g_debug_dtor_collect(0);
    static std::atomic<std::uint32_t> g_debug_release_collect(0);
    static std::atomic<std::uint32_t> g_debug_quiesce_begin(0);
    static std::atomic<std::uint32_t> g_debug_quiesce_complete(0);
    static std::atomic<std::uint32_t> g_debug_quiesce_complete_nodes(0);

    // Need to align and pad data structures! To do...

    struct ct_node
    {
    std::atomic<ct_node*> m_next;
    ct_node* m_defer_next;

    ct_node() : m_next(nullptr), m_defer_next(nullptr)
    {
    g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
    }

    ~ct_node()
    {
    g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
    }
    };


    // The proxy collector itself... :^)
    template<std::size_t T_defer_limit>
    class ct_proxy
    {
    static std::uint32_t prv_destroy(ct_node* n)
    {
    std::uint32_t count = 0;

    while (n)
    {
    ct_node* next = n->m_defer_next;
    delete n;
    count++;
    n = next;
    }

    return count;
    }


    public:
    class collector
    {
    friend class ct_proxy;

    private:
    std::atomic<ct_node*> m_defer;
    std::atomic<std::uint32_t> m_defer_count;
    std::atomic<std::uint32_t> m_count;



    public:
    collector()
    : m_defer(nullptr),
    m_defer_count(0),
    m_count(0)
    {

    }


    ~collector()
    {
    prv_destroy(m_defer.load(std::memory_order_relaxed));
    }
    };


    private:
    std::atomic<std::uint32_t> m_current;
    std::atomic<bool> m_quiesce;
    ct_node* m_defer;
    collector m_collectors[2];


    public:
    ct_proxy()
    : m_current(0),
    m_quiesce(false),
    m_defer(nullptr)
    {

    }

    ~ct_proxy()
    {
    prv_destroy(m_defer);
    }


    private:
    void prv_quiesce_begin()
    {
    // Try to begin the quiescence process.
    if (! m_quiesce.exchange(true, std::memory_order_acquire))
    {
    g_debug_quiesce_begin.fetch_add(1, std::memory_order_relaxed);

    // advance the current collector and grab the old one.
    std::uint32_t old =
    m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
    old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
    collector& c = m_collectors[old & ct_proxy_mask];

    // decode reference count.
    std::uint32_t refs = old & ct_ref_mask;

    // increment and generate an odd reference count.
    std::uint32_t old_refs = c.m_count.fetch_add(refs + ct_proxy_quiescent, std::memory_order_release);

    if (old_refs == 0 - refs)
    {
    g_debug_dtor_collect.fetch_add(1,
    std::memory_order_relaxed);

    // odd reference count and drop-to-zero condition detected!
    prv_quiesce_complete(c);
    }
    }
    }


    void prv_quiesce_complete(collector& c)
    {
    g_debug_quiesce_complete.fetch_add(1, std::memory_order_relaxed);

    // the collector `c' is now in a quiescent state! :^)
    std::atomic_thread_fence(std::memory_order_acquire);

    // maintain the back link and obtain "fresh" objects from
    // this collection.
    ct_node* n = m_defer;
    m_defer = c.m_defer.load(std::memory_order_relaxed);
    c.m_defer.store(0, std::memory_order_relaxed);

    // reset the reference count.
    c.m_count.store(0, std::memory_order_relaxed);
    c.m_defer_count.store(0, std::memory_order_relaxed);

    // release the quiesce lock.
    m_quiesce.store(false, std::memory_order_release);

    // destroy nodes.
    std::uint32_t count = prv_destroy(n);

    g_debug_quiesce_complete_nodes.fetch_add(count, std::memory_order_relaxed);
    }


    public:
    collector& acquire()
    {
    // increment the master count _and_ obtain current collector.
    std::uint32_t current =
    m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);

    // decode the collector index.
    return m_collectors[current & ct_proxy_mask];
    }

    void release(collector& c)
    {
    // decrement the collector.
    std::uint32_t count =
    c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);

    // check for the completion of the quiescence process.
    if ((count & ct_ref_mask) == ct_ref_complete)
    {
    // odd reference count and drop-to-zero condition detected!
    g_debug_release_collect.fetch_add(1,
    std::memory_order_relaxed);

    prv_quiesce_complete(c);
    }
    }


    collector& sync(collector& c)
    {
    // check if the `c' is in the middle of a quiescence process.
    if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
    {
    // drop `c' and get the next collector.
    release(c);

    return acquire();
    }

    return c;
    }


    void collect()
    {
    prv_quiesce_begin();
    }


    void collect(collector& c, ct_node* n)
    {
    if (! n) return;

    // link node into the defer list.
    ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
    n->m_defer_next = prev;

    // bump the defer count and begin quiescence process if over
    // the limit.
    std::uint32_t count =
    c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;

    if (count >= (T_defer_limit / 2))
    {
    prv_quiesce_begin();
    }
    }
    };



    typedef ct_proxy<10> ct_proxy_collector;


    // you're basic lock-free stack...
    // well, minus ABA counter and DWCAS of course! ;^)
    class ct_stack
    {
    std::atomic<ct_node*> m_head;


    public:
    ct_stack() : m_head(nullptr)
    {

    }


    public:
    void push(ct_node* n)
    {
    ct_node* head = m_head.load(std::memory_order_relaxed);

    do
    {
    n->m_next.store(head, std::memory_order_relaxed);
    }

    while (! m_head.compare_exchange_weak(
    head,
    n,
    std::memory_order_release));
    }


    ct_node* flush()
    {
    return m_head.exchange(nullptr, std::memory_order_acquire);
    }


    ct_node* get_head()
    {
    return m_head.load(std::memory_order_acquire);
    }


    ct_node* pop()
    {
    ct_node* head = m_head.load(std::memory_order_acquire);
    ct_node* xchg;

    do
    {
    if (! head) return nullptr;

    xchg = head->m_next.load(std::memory_order_relaxed);
    }

    while (!m_head.compare_exchange_weak(
    head,
    xchg,
    std::memory_order_acquire));

    return head;
    }
    };


    // The shared state
    struct ct_shared
    {
    ct_proxy<10> m_proxy_gc;
    ct_stack m_stack;
    };



    // Reader threads
    // Iterates through the lock free stack
    void ct_thread_reader(ct_shared& shared)
    {
    // iterate the lockfree stack
    for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
    {
    ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();

    ct_node* n = shared.m_stack.get_head();

    while (n)
    {
    // need to add in some processing...
    // std::this_thread::yield();

    n = n->m_next.load(std::memory_order_relaxed);
    }

    shared.m_proxy_gc.release(c);
    }
    }



    // Writer threads
    // Mutates the lock free stack
    void ct_thread_writer(ct_shared& shared)
    {
    for (unsigned long wloop = 0; wloop < 42; ++wloop)
    {
    shared.m_proxy_gc.collect();

    for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
    {
    shared.m_stack.push(new ct_node());
    }

    //std::this_thread::yield();

    ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();

    for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
    {
    shared.m_proxy_gc.collect(c, shared.m_stack.pop());
    }

    shared.m_proxy_gc.release(c);

    for (unsigned long i = 0; i < ct_writer_iters_n / 2; ++i)
    {
    shared.m_proxy_gc.collect();
    }

    {
    ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();

    for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
    {
    ct_node* n = shared.m_stack.pop();
    if (! n) break;

    shared.m_proxy_gc.collect(c, n);
    }

    shared.m_proxy_gc.release(c);
    }

    if ((wloop % 3) == 0)
    {
    shared.m_proxy_gc.collect();
    }
    }
    }



    int main()
    {
    std::cout << "Chris M. Thomassons Proxy Collector Port ver
    .0.0.2...\n";
    std::cout << "_______________________________________\n\n";

    {
    ct_shared shared;

    std::thread readers[ct_reader_threads_n];
    std::thread writers[ct_writer_threads_n];

    std::cout << "Booting threads...\n";

    for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
    {
    writers[i] = std::thread(ct_thread_writer, std::ref(shared));
    }

    for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
    {
    readers[i] = std::thread(ct_thread_reader, std::ref(shared));
    }

    std::cout << "Threads running...\n";

    for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
    {
    readers[i].join();
    }

    for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
    {
    writers[i].join();
    }
    }

    std::cout << "Threads completed!\n\n";


    // Sanity check!
    {
    std::uint32_t node_allocations = g_debug_node_allocations.load(std::memory_order_relaxed);
    std::uint32_t node_deallocations = g_debug_node_deallocations.load(std::memory_order_relaxed);
    std::uint32_t dtor_collect = g_debug_dtor_collect.load(std::memory_order_relaxed);
    std::uint32_t release_collect = g_debug_release_collect.load(std::memory_order_relaxed);
    std::uint32_t quiesce_complete = g_debug_quiesce_complete.load(std::memory_order_relaxed);
    std::uint32_t quiesce_begin = g_debug_quiesce_begin.load(std::memory_order_relaxed);
    std::uint32_t quiesce_complete_nodes = g_debug_quiesce_complete_nodes.load(std::memory_order_relaxed);

    std::cout << "node_allocations = " << node_allocations << "\n";
    std::cout << "node_deallocations = " << node_deallocations <<
    "\n\n";
    std::cout << "dtor_collect = " << dtor_collect << "\n";
    std::cout << "release_collect = " << release_collect << "\n";
    std::cout << "quiesce_complete = " << quiesce_complete << "\n";
    std::cout << "quiesce_begin = " << quiesce_begin << "\n";
    std::cout << "quiesce_complete_nodes = " <<
    quiesce_complete_nodes << "\n";

    if (node_allocations != node_deallocations)
    {
    std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " <<
    node_allocations - node_deallocations << "\n\n";
    }

    }

    std::cout << "\n\nTest Completed!\n\n";

    return 0;
    }

    _______________________________________


    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From jseigh@21:1/5 to Chris M. Thomasson on Fri Dec 6 17:43:33 2024
    On 12/6/24 16:12, Chris M. Thomasson wrote:
    On 12/6/2024 11:55 AM, Brett wrote:
    Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:
    I am wondering if anybody can try to compile and run this C++11 code of
    mine for a portable word-based proxy collector, a sort of poor mans RCU, >>> on an ARM based system? I don't have access to one. I am interested in
    the resulting output.

    https://godbolt.org

    https://pastebin.com/raw/CYZ78gVj
    (raw text link, no ads... :^)
    [...]

    It seems that all of the atomics are LDREX/STREX wrt fetch_add/sub .
    Even with relaxed memory order. Are the LDREX/STREX similar to the LOCK prefix on an x86/64?

    https://godbolt.org/z/EPGYWve71

    It has loops for this in the ASM code. Adding a loop in there can change things from wait-free to lock-free. Humm...

    Which compiler did you choose. armv8? Try ARM64.

    The newer arms have new atomics
    cas and atomic fetch ops.

    LDREX/STREX is the older load store reserved.
    On the newer stuff it's ldxr/stxr not ldrex/strex.

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From Scott Lurndal@21:1/5 to jseigh on Fri Dec 6 23:08:42 2024
    jseigh <jseigh_es00@xemaps.com> writes:
    On 12/6/24 16:12, Chris M. Thomasson wrote:
    On 12/6/2024 11:55 AM, Brett wrote:
    Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:
    I am wondering if anybody can try to compile and run this C++11 code of >>>> mine for a portable word-based proxy collector, a sort of poor mans RCU, >>>> on an ARM based system? I don't have access to one. I am interested in >>>> the resulting output.

    https://godbolt.org

    https://pastebin.com/raw/CYZ78gVj
    (raw text link, no ads... :^)
    [...]

    It seems that all of the atomics are LDREX/STREX wrt fetch_add/sub .
    Even with relaxed memory order. Are the LDREX/STREX similar to the LOCK
    prefix on an x86/64?

    https://godbolt.org/z/EPGYWve71

    It has loops for this in the ASM code. Adding a loop in there can change
    things from wait-free to lock-free. Humm...

    Which compiler did you choose. armv8? Try ARM64.

    GCC has specific options to specify which version of the
    armv8 architecture to use. The LSE (Large System Extension)
    was optional in ARMv8.1 and was made mandatory in later
    revisions. That's the extension that includes the
    atomic ops.

    The compiler will only generate them if the target arch
    is set to an appropriate architecture revision.

    use -march=armv8.1-a

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From aph@littlepinkcloud.invalid@21:1/5 to Chris M. Thomasson on Mon Dec 9 10:22:22 2024
    Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:

    Which compiler did you choose.? armv8?? Try ARM64.

    The newer arms have new atomics
    cas and atomic fetch ops.

    LDREX/STREX is the older load store reserved.
    On the newer stuff it's ldxr/stxr not ldrex/strex.

    Well, for a ARM64 gcc 14.2.0 a relaxed fetch_add I get __aarch64_ldadd8_acq_rel in the asm.

    For gcc 13.2.1 -O2 -march=armv8.1-a I see a mixture of acq/rel and
    relaxed. Are you sure you're looking at the right one?

    Andrew.

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From jseigh@21:1/5 to jseigh on Mon Dec 9 07:34:55 2024
    On 12/9/24 07:28, jseigh wrote:
    On 12/8/24 18:31, Chris M. Thomasson wrote:

    Well, for a ARM64 gcc 14.2.0 a relaxed fetch_add I get
    __aarch64_ldadd8_acq_rel in the asm.

    https://godbolt.org/z/YzPdM8j33

    acq_rel barrier for a relaxed membar? Well, that makes me go grrrrrr!

    It has to be akin to the LOCK prefix over on x86. I want it relaxed
    damn it! ;^)

    Apart of the memory ordering, if you are using atomic_fetch_add you
    are going to get an interlocked instruction which is probably
    overkill and has more overhead than you want.   Atomic ops
    assume other cpus might be trying atomic rmw ops on other
    cpus which is not the case for userspace rcu.  You want
    an atomic relaxed load, and atomic relaxed store of the
    incrmented value.  It will be faster.


    I should add, you will need a store/load memory barrier
    after that unless you use an asymetric memory barrier.
    I haven't worked that logic out yet since I'm not working
    on any userspace RCU (qsbr) implementations. Probably
    the same number of grace periods.

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From jseigh@21:1/5 to Chris M. Thomasson on Mon Dec 9 07:28:38 2024
    On 12/8/24 18:31, Chris M. Thomasson wrote:
    On 12/6/2024 2:43 PM, jseigh wrote:
    On 12/6/24 16:12, Chris M. Thomasson wrote:
    On 12/6/2024 11:55 AM, Brett wrote:
    Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:
    I am wondering if anybody can try to compile and run this C++11
    code of
    mine for a portable word-based proxy collector, a sort of poor mans
    RCU,
    on an ARM based system? I don't have access to one. I am interested in >>>>> the resulting output.

    https://godbolt.org

    https://pastebin.com/raw/CYZ78gVj
    (raw text link, no ads... :^)
    [...]

    It seems that all of the atomics are LDREX/STREX wrt fetch_add/sub .
    Even with relaxed memory order. Are the LDREX/STREX similar to the
    LOCK prefix on an x86/64?

    https://godbolt.org/z/EPGYWve71

    It has loops for this in the ASM code. Adding a loop in there can
    change things from wait-free to lock-free. Humm...

    Which compiler did you choose.  armv8?  Try ARM64.

    The newer arms have new atomics
    cas and atomic fetch ops.

    LDREX/STREX is the older load store reserved.
    On the newer stuff it's ldxr/stxr not ldrex/strex.

    Well, for a ARM64 gcc 14.2.0 a relaxed fetch_add I get __aarch64_ldadd8_acq_rel in the asm.

    https://godbolt.org/z/YzPdM8j33

    acq_rel barrier for a relaxed membar? Well, that makes me go grrrrrr!

    It has to be akin to the LOCK prefix over on x86. I want it relaxed damn
    it! ;^)

    Apart of the memory ordering, if you are using atomic_fetch_add you
    are going to get an interlocked instruction which is probably
    overkill and has more overhead than you want. Atomic ops
    assume other cpus might be trying atomic rmw ops on other
    cpus which is not the case for userspace rcu. You want
    an atomic relaxed load, and atomic relaxed store of the
    incrmented value. It will be faster.

    Joe Seigh

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)