Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
parallel_for_mutex_pool.cpp
Go to the documentation of this file.
4#ifndef NO_MULTITHREADING
5#include "log.hpp"
6#include "thread.hpp"
7#include <atomic>
8#include <condition_variable>
9#include <functional>
10#include <mutex>
11#include <queue>
12#include <thread>
13#include <vector>
14
16
17// Fix for https://github.com/AztecProtocol/aztec-packages/issues/19769
18// Zig's Mach-O linker (https://codeberg.org/ziglang/zig/issues/31461) misaligns
19// __thread_bss TLS template offsets when __thread_data is also present (from Rust
20// static libraries), causing x86_64-macos segfaults on any thread_local requiring
21// 16-byte alignment (e.g. std::mutex). Adding an alignas(16) initialized
22// thread_local forces __thread_data alignment to 16, ensuring __thread_bss starts
23// at a correctly aligned TLS template offset.
24// NOLINTBEGIN
25alignas(16) thread_local char tls_alignment_pad[16] __attribute__((used)) = { 1 };
26// NOLINTEND
27
28namespace {
29
30class ThreadPool {
31 public:
32 ThreadPool(size_t num_threads);
33 ThreadPool(const ThreadPool& other) = delete;
34 ThreadPool(ThreadPool&& other) = delete;
35 ~ThreadPool();
36
37 ThreadPool& operator=(const ThreadPool& other) = delete;
38 ThreadPool& operator=(ThreadPool&& other) = delete;
39
40 void start_tasks(size_t num_iterations, const std::function<void(size_t)>& func)
41 {
43 {
44 std::unique_lock<std::mutex> lock(tasks_mutex);
45 task_ = func;
46 num_iterations_ = num_iterations;
47 iteration_ = 0;
48 complete_ = 0;
49 }
50 condition.notify_all();
51
52 do_iterations();
53
54 {
55 // BB_BENCH_NAME("spinning main thread");
56 std::unique_lock<std::mutex> lock(tasks_mutex);
57 complete_condition_.wait(lock, [this] { return complete_ == num_iterations_; });
58 }
59 }
60
61 private:
63 std::vector<std::thread> workers;
64 std::mutex tasks_mutex;
65 std::function<void(size_t)> task_;
66 size_t num_iterations_ = 0;
67 size_t iteration_ = 0;
68 size_t complete_ = 0;
70 std::condition_variable complete_condition_;
71 bool stop = false;
72
73 BB_NO_PROFILE void worker_loop(size_t thread_index);
74
75 void do_iterations()
76 {
77 while (true) {
78 size_t iteration = 0;
79 {
80 std::unique_lock<std::mutex> lock(tasks_mutex);
81 if (iteration_ == num_iterations_) {
82 return;
83 }
84 iteration = iteration_++;
85 }
86 // BB_BENCH_NAME("do_iterations()");
87 task_(iteration);
88 {
89 std::unique_lock<std::mutex> lock(tasks_mutex);
90 if (++complete_ == num_iterations_) {
91 complete_condition_.notify_one();
92 return;
93 }
94 }
95 }
96 }
97};
98
99ThreadPool::ThreadPool(size_t num_threads)
100{
101 workers.reserve(num_threads);
102 for (size_t i = 0; i < num_threads; ++i) {
103 workers.emplace_back(&ThreadPool::worker_loop, this, i);
104 }
105}
106
107ThreadPool::~ThreadPool()
108{
109 {
110 std::unique_lock<std::mutex> lock(tasks_mutex);
111 stop = true;
112 }
113 condition.notify_all();
114 for (auto& worker : workers) {
115 worker.join();
116 }
117}
118
119void ThreadPool::worker_loop(size_t /*unused*/)
120{
121 // info("created worker ", worker_num);
122 while (true) {
123 {
124 std::unique_lock<std::mutex> lock(tasks_mutex);
125 condition.wait(lock, [this] { return (iteration_ < num_iterations_) || stop; });
126
127 if (stop) {
128 break;
129 }
130 }
131 // Make sure nested stats accounting works under multithreading
132 // Note: parent is a thread-local variable.
134 do_iterations();
135 }
136 // info("worker exit ", worker_num);
137}
138} // namespace
139
140namespace bb {
145void parallel_for_mutex_pool(size_t num_iterations, const std::function<void(size_t)>& func)
146{
147#ifdef __wasm__
148#define THREAD_LOCAL_MAYBE
149#else
150#define THREAD_LOCAL_MAYBE thread_local
151#endif
152
153 static THREAD_LOCAL_MAYBE ThreadPool pool(get_num_cpus() - 1);
154 static THREAD_LOCAL_MAYBE bool nested = false;
155
156 // If nested, fall back to serial execution
157 if (nested) {
158 for (size_t i = 0; i < num_iterations; ++i) {
159 func(i);
160 }
161 return;
162 }
163
164 nested = true;
165 pool.start_tasks(num_iterations, func);
166 nested = false;
167}
168} // namespace bb
169#endif
#define BB_NO_PROFILE
Entry point for Barretenberg command-line interface.
Definition api.hpp:5
void parallel_for_mutex_pool(size_t num_iterations, const std::function< void(size_t)> &func)
size_t get_num_cpus()
Definition thread.cpp:33
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
thread_local char tls_alignment_pad[16] __attribute__((used))
#define THREAD_LOCAL_MAYBE
static thread_local TimeStatsEntry * parent
Definition bb_bench.hpp:83