File size: 3,819 Bytes
9c774f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>
#include <concepts>
#include <thread>
static inline size_t avail_threads(){
return std::thread::hardware_concurrency();
}
template <typename F, typename... Args>
using irt = std::invoke_result_t<F, Args...>;
template <typename F, typename... Args>
static inline auto taskify(const F& funk, Args... args){
return std::bind(funk, std::forward<Args...>(args)...);
}
static inline auto promitask(const std::invocable<> auto& funk){
typedef irt<decltype(funk)> ret;
auto pr = std::make_shared<std::promise<ret>>();
if constexpr (std::same_as<irt<decltype(funk)>, void>)
return make_pair([=](){ funk(), pr->set_value(); }, pr);
else
return make_pair([=](){ pr->set_value(funk()); }, pr);
}
static inline auto await_many(const std::ranges::range auto& fu){
std::for_each(fu.begin(), fu.end(), [](auto&& f){ f.wait(); });
}
typedef std::function<void()> tasque;
template <typename Thrd = std::jthread>
class threadpool {
/// If true the queue thread should exit
std::atomic<bool> done;
/// The thread object associated with this queue
std::vector<Thrd> queue_threads;
/// A queue of functions that will be executed on the queue thread
std::queue<tasque> work_queue;
/// The mutex used in the condition variable
std::mutex queue_mutex;
/// The condition variable that waits for a new function to be inserted in the
/// queue
std::condition_variable cond;
/// This funciton executes on the queue_thread
void queue_runner() {
while (!done) {
tasque func;
{
std::unique_lock<std::mutex> lock(queue_mutex);
cond.wait( lock
, [this]() { return work_queue.empty() == false || done; });
if (!done){
swap(func, work_queue.front());
work_queue.pop();
}
}
if (func) func();
}
}
void qup(const std::invocable<> auto& f){
std::lock_guard<std::mutex> lock(queue_mutex);
work_queue.push(f);
cond.notify_one();
}
public:
template <typename F, typename... Args>
void enqueue(const F& func, Args... args) requires std::invocable<F, Args...> {
qup(taskify(func, args...));
}
template <typename F, typename... Args>
auto inquire(const F& func, Args... args) requires std::invocable<F, Args...> {
auto [t, pr] = promitask(taskify(func, args...));
auto fut = pr->get_future();
enqueue(t);
return fut;
}
void clear() {
{
std::lock_guard<std::mutex> lock(queue_mutex);
while(!work_queue.empty())
work_queue.pop();
}
sync();
}
void sync(){
std::atomic<size_t> n(0);
const size_t m = queue_threads.size();
auto present = [&](){ ++n; size_t l = n.load(); while(l < m) l = n.load(); };
std::vector<std::future<void>> fu;
std::ranges::generate_n(std::back_inserter(fu), m, [=, this](){ return inquire(present); });
await_many(fu);
}
threadpool(size_t n, size_t res) : done(false)
, queue_threads(n ? std::clamp(n, size_t(1), avail_threads() - res)
: std::max(size_t(1), avail_threads() - res)) {
for(auto& i:queue_threads){
Thrd tmp(&threadpool::queue_runner, this);
std::swap(i, tmp);
}
}
threadpool(size_t n) : threadpool(n, 0) {}
threadpool() : threadpool(0, 1) {}
~threadpool() {
sync();
done.store(true);
cond.notify_all();
}
threadpool(const threadpool& other) : work_queue(other.work_queue), done(false) {
for(auto& i:queue_threads){
Thrd tmp(&threadpool::queue_runner, this);
std::swap(i, tmp);
}
}
threadpool& operator=(const threadpool& other){
clear();
work_queue = other.work_queue;
return *this;
}
size_t size() const { return queue_threads.size(); }
threadpool& operator=(threadpool&& other) = default;
threadpool(threadpool&& other) = default;
};
|