Sysop: | Amessyroom |
---|---|
Location: | Fayetteville, NC |
Users: | 43 |
Nodes: | 6 (0 / 6) |
Uptime: | 104:34:41 |
Calls: | 290 |
Files: | 905 |
Messages: | 76,619 |
I am wondering if anybody can try to compile and run this C++11 code of
mine for a portable word-based proxy collector, a sort of poor mans RCU,
on an ARM based system? I don't have access to one. I am interested in
the resulting output.
https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
_______________________________________
// Chris M. Thomassons Poor Mans RCU... Example 456...
#include <iostream>
#include <atomic>
#include <thread>
#include <cstdlib>
#include <cstdint>
#include <climits>
#include <functional>
// Masks
static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
static constexpr std::uint32_t ct_ref_complete = 0x30U;
static constexpr std::uint32_t ct_ref_inc = 0x20U;
static constexpr std::uint32_t ct_proxy_mask = 0xFU;
static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
// Iteration settings
static constexpr unsigned long ct_reader_iters_n = 2000000;
static constexpr unsigned long ct_writer_iters_n = 200000;
// Thread counts
static constexpr unsigned long ct_reader_threads_n = 53;
static constexpr unsigned long ct_writer_threads_n = 11;
// Some debug/sanity check things...
// Need to make this conditional in compilation with some macros...
static std::atomic<std::uint32_t> g_debug_node_allocations(0);
static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
static std::atomic<std::uint32_t> g_debug_dtor_collect(0);
static std::atomic<std::uint32_t> g_debug_release_collect(0);
static std::atomic<std::uint32_t> g_debug_quiesce_begin(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete_nodes(0);
// Need to align and pad data structures! To do...
struct ct_node
{
std::atomic<ct_node*> m_next;
ct_node* m_defer_next;
ct_node() : m_next(nullptr), m_defer_next(nullptr)
{
g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
}
~ct_node()
{
g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
}
};
// The proxy collector itself... :^)
template<std::size_t T_defer_limit>
class ct_proxy
{
static std::uint32_t prv_destroy(ct_node* n)
{
std::uint32_t count = 0;
while (n)
{
ct_node* next = n->m_defer_next;
delete n;
count++;
n = next;
}
return count;
}
public:
class collector
{
friend class ct_proxy;
private:
std::atomic<ct_node*> m_defer;
std::atomic<std::uint32_t> m_defer_count;
std::atomic<std::uint32_t> m_count;
public:
collector()
: m_defer(nullptr),
m_defer_count(0),
m_count(0)
{
}
~collector()
{
prv_destroy(m_defer.load(std::memory_order_relaxed));
}
};
private:
std::atomic<std::uint32_t> m_current;
std::atomic<bool> m_quiesce;
ct_node* m_defer;
collector m_collectors[2];
public:
ct_proxy()
: m_current(0),
m_quiesce(false),
m_defer(nullptr)
{
}
~ct_proxy()
{
prv_destroy(m_defer);
}
private:
void prv_quiesce_begin()
{
// Try to begin the quiescence process.
if (! m_quiesce.exchange(true, std::memory_order_acquire))
{
g_debug_quiesce_begin.fetch_add(1, std::memory_order_relaxed);
// advance the current collector and grab the old one.
std::uint32_t old =
m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
collector& c = m_collectors[old & ct_proxy_mask];
// decode reference count.
std::uint32_t refs = old & ct_ref_mask;
// increment and generate an odd reference count.
std::uint32_t old_refs = c.m_count.fetch_add(refs + ct_proxy_quiescent, std::memory_order_release);
if (old_refs == 0 - refs)
{
g_debug_dtor_collect.fetch_add(1,
std::memory_order_relaxed);
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
}
void prv_quiesce_complete(collector& c)
{
g_debug_quiesce_complete.fetch_add(1, std::memory_order_relaxed);
// the collector `c' is now in a quiescent state! :^)
std::atomic_thread_fence(std::memory_order_acquire);
// maintain the back link and obtain "fresh" objects from
// this collection.
ct_node* n = m_defer;
m_defer = c.m_defer.load(std::memory_order_relaxed);
c.m_defer.store(0, std::memory_order_relaxed);
// reset the reference count.
c.m_count.store(0, std::memory_order_relaxed);
c.m_defer_count.store(0, std::memory_order_relaxed);
// release the quiesce lock.
m_quiesce.store(false, std::memory_order_release);
// destroy nodes.
std::uint32_t count = prv_destroy(n);
g_debug_quiesce_complete_nodes.fetch_add(count, std::memory_order_relaxed);
}
public:
collector& acquire()
{
// increment the master count _and_ obtain current collector.
std::uint32_t current =
m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
// decode the collector index.
return m_collectors[current & ct_proxy_mask];
}
void release(collector& c)
{
// decrement the collector.
std::uint32_t count =
c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
// check for the completion of the quiescence process.
if ((count & ct_ref_mask) == ct_ref_complete)
{
// odd reference count and drop-to-zero condition detected!
g_debug_release_collect.fetch_add(1,
std::memory_order_relaxed);
prv_quiesce_complete(c);
}
}
collector& sync(collector& c)
{
// check if the `c' is in the middle of a quiescence process.
if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
{
// drop `c' and get the next collector.
release(c);
return acquire();
}
return c;
}
void collect()
{
prv_quiesce_begin();
}
void collect(collector& c, ct_node* n)
{
if (! n) return;
// link node into the defer list.
ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
n->m_defer_next = prev;
// bump the defer count and begin quiescence process if over
// the limit.
std::uint32_t count =
c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
if (count >= (T_defer_limit / 2))
{
prv_quiesce_begin();
}
}
};
typedef ct_proxy<10> ct_proxy_collector;
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course! ;^)
class ct_stack
{
std::atomic<ct_node*> m_head;
public:
ct_stack() : m_head(nullptr)
{
}
public:
void push(ct_node* n)
{
ct_node* head = m_head.load(std::memory_order_relaxed);
do
{
n->m_next.store(head, std::memory_order_relaxed);
}
while (! m_head.compare_exchange_weak(
head,
n,
std::memory_order_release));
}
ct_node* flush()
{
return m_head.exchange(nullptr, std::memory_order_acquire);
}
ct_node* get_head()
{
return m_head.load(std::memory_order_acquire);
}
ct_node* pop()
{
ct_node* head = m_head.load(std::memory_order_acquire);
ct_node* xchg;
do
{
if (! head) return nullptr;
xchg = head->m_next.load(std::memory_order_relaxed);
}
while (!m_head.compare_exchange_weak(
head,
xchg,
std::memory_order_acquire));
return head;
}
};
// The shared state
struct ct_shared
{
ct_proxy<10> m_proxy_gc;
ct_stack m_stack;
};
// Reader threads
// Iterates through the lock free stack
void ct_thread_reader(ct_shared& shared)
{
// iterate the lockfree stack
for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
{
ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
ct_node* n = shared.m_stack.get_head();
while (n)
{
// need to add in some processing...
// std::this_thread::yield();
n = n->m_next.load(std::memory_order_relaxed);
}
shared.m_proxy_gc.release(c);
}
}
// Writer threads
// Mutates the lock free stack
void ct_thread_writer(ct_shared& shared)
{
for (unsigned long wloop = 0; wloop < 42; ++wloop)
{
shared.m_proxy_gc.collect();
for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
{
shared.m_stack.push(new ct_node());
}
//std::this_thread::yield();
ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
{
shared.m_proxy_gc.collect(c, shared.m_stack.pop());
}
shared.m_proxy_gc.release(c);
for (unsigned long i = 0; i < ct_writer_iters_n / 2; ++i)
{
shared.m_proxy_gc.collect();
}
{
ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
{
ct_node* n = shared.m_stack.pop();
if (! n) break;
shared.m_proxy_gc.collect(c, n);
}
shared.m_proxy_gc.release(c);
}
if ((wloop % 3) == 0)
{
shared.m_proxy_gc.collect();
}
}
}
int main()
{
std::cout << "Chris M. Thomassons Proxy Collector Port ver
.0.0.2...\n";
std::cout << "_______________________________________\n\n";
{
ct_shared shared;
std::thread readers[ct_reader_threads_n];
std::thread writers[ct_writer_threads_n];
std::cout << "Booting threads...\n";
for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
{
writers[i] = std::thread(ct_thread_writer, std::ref(shared));
}
for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
{
readers[i] = std::thread(ct_thread_reader, std::ref(shared));
}
std::cout << "Threads running...\n";
for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
{
readers[i].join();
}
for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
{
writers[i].join();
}
}
std::cout << "Threads completed!\n\n";
// Sanity check!
{
std::uint32_t node_allocations = g_debug_node_allocations.load(std::memory_order_relaxed);
std::uint32_t node_deallocations = g_debug_node_deallocations.load(std::memory_order_relaxed);
std::uint32_t dtor_collect = g_debug_dtor_collect.load(std::memory_order_relaxed);
std::uint32_t release_collect = g_debug_release_collect.load(std::memory_order_relaxed);
std::uint32_t quiesce_complete = g_debug_quiesce_complete.load(std::memory_order_relaxed);
std::uint32_t quiesce_begin = g_debug_quiesce_begin.load(std::memory_order_relaxed);
std::uint32_t quiesce_complete_nodes = g_debug_quiesce_complete_nodes.load(std::memory_order_relaxed);
std::cout << "node_allocations = " << node_allocations << "\n";
std::cout << "node_deallocations = " << node_deallocations <<
"\n\n";
std::cout << "dtor_collect = " << dtor_collect << "\n";
std::cout << "release_collect = " << release_collect << "\n";
std::cout << "quiesce_complete = " << quiesce_complete << "\n";
std::cout << "quiesce_begin = " << quiesce_begin << "\n";
std::cout << "quiesce_complete_nodes = " <<
quiesce_complete_nodes << "\n";
if (node_allocations != node_deallocations)
{
std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " <<
node_allocations - node_deallocations << "\n\n";
}
}
std::cout << "\n\nTest Completed!\n\n";
return 0;
}
_______________________________________
On 12/6/2024 11:55 AM, Brett wrote:
Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:[...]
I am wondering if anybody can try to compile and run this C++11 code of
mine for a portable word-based proxy collector, a sort of poor mans RCU, >>> on an ARM based system? I don't have access to one. I am interested in
the resulting output.
https://godbolt.org
https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
It seems that all of the atomics are LDREX/STREX wrt fetch_add/sub .
Even with relaxed memory order. Are the LDREX/STREX similar to the LOCK prefix on an x86/64?
https://godbolt.org/z/EPGYWve71
It has loops for this in the ASM code. Adding a loop in there can change things from wait-free to lock-free. Humm...
On 12/6/24 16:12, Chris M. Thomasson wrote:
On 12/6/2024 11:55 AM, Brett wrote:Which compiler did you choose. armv8? Try ARM64.
Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:[...]
I am wondering if anybody can try to compile and run this C++11 code of >>>> mine for a portable word-based proxy collector, a sort of poor mans RCU, >>>> on an ARM based system? I don't have access to one. I am interested in >>>> the resulting output.
https://godbolt.org
https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
It seems that all of the atomics are LDREX/STREX wrt fetch_add/sub .
Even with relaxed memory order. Are the LDREX/STREX similar to the LOCK
prefix on an x86/64?
https://godbolt.org/z/EPGYWve71
It has loops for this in the ASM code. Adding a loop in there can change
things from wait-free to lock-free. Humm...
Which compiler did you choose.? armv8?? Try ARM64.
The newer arms have new atomics
cas and atomic fetch ops.
LDREX/STREX is the older load store reserved.
On the newer stuff it's ldxr/stxr not ldrex/strex.
Well, for a ARM64 gcc 14.2.0 a relaxed fetch_add I get __aarch64_ldadd8_acq_rel in the asm.
On 12/8/24 18:31, Chris M. Thomasson wrote:
Well, for a ARM64 gcc 14.2.0 a relaxed fetch_add I get
__aarch64_ldadd8_acq_rel in the asm.
https://godbolt.org/z/YzPdM8j33
acq_rel barrier for a relaxed membar? Well, that makes me go grrrrrr!
It has to be akin to the LOCK prefix over on x86. I want it relaxed
damn it! ;^)
Apart of the memory ordering, if you are using atomic_fetch_add you
are going to get an interlocked instruction which is probably
overkill and has more overhead than you want. Atomic ops
assume other cpus might be trying atomic rmw ops on other
cpus which is not the case for userspace rcu. You want
an atomic relaxed load, and atomic relaxed store of the
incrmented value. It will be faster.
On 12/6/2024 2:43 PM, jseigh wrote:
On 12/6/24 16:12, Chris M. Thomasson wrote:
On 12/6/2024 11:55 AM, Brett wrote:Which compiler did you choose. armv8? Try ARM64.
Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:[...]
I am wondering if anybody can try to compile and run this C++11
code of
mine for a portable word-based proxy collector, a sort of poor mans
RCU,
on an ARM based system? I don't have access to one. I am interested in >>>>> the resulting output.
https://godbolt.org
https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
It seems that all of the atomics are LDREX/STREX wrt fetch_add/sub .
Even with relaxed memory order. Are the LDREX/STREX similar to the
LOCK prefix on an x86/64?
https://godbolt.org/z/EPGYWve71
It has loops for this in the ASM code. Adding a loop in there can
change things from wait-free to lock-free. Humm...
The newer arms have new atomics
cas and atomic fetch ops.
LDREX/STREX is the older load store reserved.
On the newer stuff it's ldxr/stxr not ldrex/strex.
Well, for a ARM64 gcc 14.2.0 a relaxed fetch_add I get __aarch64_ldadd8_acq_rel in the asm.
https://godbolt.org/z/YzPdM8j33
acq_rel barrier for a relaxed membar? Well, that makes me go grrrrrr!
It has to be akin to the LOCK prefix over on x86. I want it relaxed damn
it! ;^)