24 template<
typename HYP,
typename Chain_t=MCMCChain<HYP>>
51 ChainPool(HYP& h0,
typename HYP::data_t* d,
size_t n) {
52 assert(n>=1 &&
"*** You probably shouldn't have a chain pool with 0 elements");
53 for(
size_t i=0;i<n;i++) {
63 void set_data(
typename HYP::data_t* d,
bool recompute=
true) {
65 c.set_data(d, recompute);
74 template<
typename... ARGS>
77 std::lock_guard guard(running_lock);
79 pool.emplace_back(args...);
88 void show(std::string prefix)
const {
89 for(
size_t i=0;i<
nchains();i++) {
90 std::lock_guard guard(this->pool[i].current_mutex);
91 print(prefix, i, (
double)this->pool[i].temperature, this->pool[i].getCurrent().posterior, this->pool[i].getCurrent());
100 assert(pool.size() > 0 &&
"*** Cannot run on an empty ChainPool");
101 assert(this->
nthreads() <= pool.size() &&
"*** Cannot have more threads than pool items");
116 std::lock_guard guard(running_lock);
127 for(
auto x : pool[idx].
run(c)) {
128 x.born_chain_idx = idx;
134 std::lock_guard guard(running_lock);
147 assert(ctl.
runtime == 0 &&
"*** Cannot have both time and steps specified in ChainPool (not implemented yet).");
155 unsigned long to_run_steps=0;
157 std::lock_guard guard(running_lock);
165 to_run_steps = std::min(ctl.
steps-ctl.
done_steps, this->steps_before_change);
168 if(to_run_steps <= 0) {
184 for(
auto& x : pool[idx].
run(c)) {
185 x.born_chain_idx = idx;
189 #ifdef DEBUG_CHAINPOOL 190 COUT "# Thread " <<std::this_thread::get_id() <<
" stopping chain "<< idx
TAB "at " TAB chain.current.posterior
TAB chain.current.string()
ENDL;
195 std::lock_guard guard(running_lock);
198 if(to_run_steps == steps_before_change) {
Definition: OrderedLock.h:16
unsigned long steps
Definition: Control.h:24
RunningState
Definition: ChainPool.h:43
#define TAB
Definition: IO.h:19
std::vector< Chain_t > pool
Definition: ChainPool.h:29
ChainPool(HYP &h0, typename HYP::data_t *d, size_t n)
Definition: ChainPool.h:51
time_ms runtime
Definition: Control.h:25
ChainPool()
Definition: ChainPool.h:49
size_t nthreads()
How many threads are currently run in this interface?
Definition: ThreadedInferenceInterface.h:51
Definition: ChainPool.h:25
void show(std::string prefix) const
Definition: ChainPool.h:88
volatile sig_atomic_t CTRL_C
This manages multiple threads for running inference. This requires a subclass to define run_thread...
void add_chain(ARGS... args)
Lock and modify the pool.
Definition: ChainPool.h:75
void print(FIRST f, ARGS... args)
Lock output_lock and print to std:cout.
Definition: IO.h:53
A FIFO mutex (from stackoverflow) https://stackoverflow.com/questions/14792016/creating-a-lock-that-p...
Definition: generator.hpp:21
unsigned long next_index()
Return the next index to operate on (in a thread-safe way).
Definition: ThreadedInferenceInterface.h:45
void set_data(typename HYP::data_t *d, bool recompute=true)
Set this data.
Definition: ChainPool.h:63
std::vector< RunningState > running
Definition: ChainPool.h:46
size_t nchains() const
Definition: ChainPool.h:84
size_t nthreads
Definition: Control.h:26
#define ENDL
Definition: IO.h:21
OrderedLock running_lock
Definition: ChainPool.h:47
Definition: ThreadedInferenceInterface.h:22
std::atomic< unsigned long > done_steps
Definition: Control.h:32
bool running()
Definition: Control.h:63
unsigned long steps_before_change
Definition: ChainPool.h:40
#define COUT
Definition: IO.h:24
generator< HYP &> run(Control ctl, Args... args)
Set up the multiple threads and actually run, calling run_thread_generator_wrapper.
Definition: ThreadedInferenceInterface.h:82
generator< HYP & > run_thread(Control &ctl) override
This run helper is called internally by multiple different threads, and runs a given pool...
Definition: ChainPool.h:99
This represents an MCMC hain on a hypothesis of type HYP. It uses HYP::propose and HYP::compute_poste...