Fleet  0.0.9
Inference in the LOT
ParallelTempering.h
Go to the documentation of this file.
1 #pragma once
2 
3 //#define PARALLEL_TEMPERING_SHOW_DETAIL
4 
5 
6 #include <signal.h>
7 #include <functional>
8 
9 #include "Errors.h"
10 #include "ChainPool.h"
11 #include "Timing.h"
12 
13 extern std::atomic<bool> CTRL_C; //volatile sig_atomic_t CTRL_C;
14 
25 template<typename HYP>
26 class ParallelTempering : public ChainPool<HYP> {
27 
28  // loops on the swapper and adapter threads wait this long before checking their time, to make them interruptible
29  // NOTE that this can throw off timing speeds for e.g. parallel tempering for very small amounts of data
30  const unsigned long time_resolution_ms = 25;
31 
32 public:
33 
34  time_ms swap_every = 250; // try a round of swaps this often
35  time_ms adapt_every = 5000; //
36  time_ms show_every = 10000;
37 
38  OrderedLock overall_mutex; // This mutex coordinates swappers, adapters, and printers, otherwise they can lock each other out
39 
40  std::vector<double> temperatures;
41 
42  // Swap history stores how often the i'th chain swaps with the (i-1)'st chain
43  std::vector<FiniteHistory<bool>> swap_history;
44 
45  bool is_temperature; // set for whether we initialize according to a temperature ladder (true) or data
46 
47  std::atomic<bool> terminate; // used to kill swapper and adapter
48 
49  ParallelTempering(HYP& h0, typename HYP::data_t* d, std::initializer_list<double> t) :
50  ChainPool<HYP>(h0, d, t.size()), temperatures(t), terminate(false) {
51 
52  swap_history.reserve(temperatures.size()); // reserve so we don't move
53 
54  for(size_t i=0;i<temperatures.size();i++) {
55  this->pool[i].temperature = temperatures[i]; // set its temperature
56  swap_history.emplace_back();
57  }
58  }
59 
60 
61  ParallelTempering(HYP& h0, typename HYP::data_t* d, unsigned long n, double maxT) :
62  ChainPool<HYP>(h0, d, n),terminate(false) {
63  assert(n != 0);
64  swap_history.reserve(n);
65 
66  if(n == 1) { // just enforce this as a hard cosntraint.
67  this->pool[0].temperature = 1.0;
68  }
69  else {
70  for(size_t i=0;i<n;i++) {
71  this->pool[i].temperature = exp(i * log(maxT)/(n-1));
72  swap_history.emplace_back();
73  //print("t=", double(this->pool[i].temperature));
74  }
75  }
76  }
77 
78  void __shower_thread() {
79 
80  while(not (terminate or CTRL_C) ) {
81 
82  // we will sleep for adapt_every but in tiny chunks so that
83  // we can be stopped with CTRL_C
84  auto last = now();
85  while(time_since(last) < show_every and (not CTRL_C) and (not terminate))
86  std::this_thread::sleep_for(std::chrono::milliseconds(time_resolution_ms));
87 
88  if(CTRL_C or terminate) return;
89 
91  }
92 
93  }
94 
96 
97  // runs a swapper every swap_every ms (double)
98  // NOTE: If we have 1 element in the pool, it is caught below
99  while(!(terminate or CTRL_C)) {
100 
101  // make this interruptible
102  auto last = now();
103  while(time_since(last) < swap_every and (not CTRL_C) and (not terminate))
104  std::this_thread::sleep_for(std::chrono::milliseconds(time_resolution_ms));
105 
106  // Here we are going to go through and swap each chain with its neighbor
107  std::lock_guard og(overall_mutex); // must get this so we don't lock by another thread stealing one of below
108  for(size_t k=this->pool.size()-1;k>=1;k--) { // swap k with k-1; starting at the bottom so stuff can percolate up
109  if(CTRL_C or terminate) break;
110 
111  // get both of these thread locks
112  std::lock_guard guard1(this->pool[k-1].current_mutex);
113  std::lock_guard guard2(this->pool[k ].current_mutex);
114 
115  // compute R based on data
116  double Tnow = this->pool[k-1].at_temperature(this->pool[k-1].temperature) + this->pool[k].at_temperature(this->pool[k].temperature);
117  double Tswp = this->pool[k-1].at_temperature(this->pool[k].temperature) + this->pool[k].at_temperature(this->pool[k-1].temperature);
118  double R = Tswp-Tnow;
119 
120  //DEBUG("Swap p: ", k, R, this->pool[k-1].samples, this->pool[k-1].getCurrent().posterior, this->pool[k-1].getCurrent());
121 
122  if(R >= 0 or flip(exp(R))) {
123 
124  #ifdef PARALLEL_TEMPERING_SHOW_DETAIL
125  COUT "# Swapping " <<k<< " and " <<(k-1)<<"." TAB Tnow TAB Tswp TAB this->pool[k].current.likelihood TAB this->pool[k-1].current.likelihood ENDL;
126  COUT "# " TAB this->pool[k].current.string() ENDL;
127  COUT "# " TAB this->pool[k-1].current.string() ENDL;
128  #endif
129 
130  // swap the chains
131  std::swap(this->pool[k].getCurrent(), this->pool[k-1].getCurrent());
132 
133  // and let's swap their steps since improvement so that the steps-since-improvement
134  // stays with a chain
135  std::swap(this->pool[k].steps_since_improvement, this->pool[k-1].steps_since_improvement);
136 
137  swap_history.at(k) << true;
138  }
139  else {
140  swap_history.at(k) << false;
141  }
142 
143  }
144  }
145  }
146 
148 
149  while(not (terminate or CTRL_C) ) {
150 
151  // we will sleep for adapt_every but in tiny chunks so that
152  // we can be stopped with CTRL_C
153  auto last = now();
154  while(time_since(last) < adapt_every and (not CTRL_C) and (not terminate))
155  std::this_thread::sleep_for(std::chrono::milliseconds(time_resolution_ms));
156 
157  if(CTRL_C or terminate) return;
158 
159  // This is not interruptible with CTRL_C: std::this_thread::sleep_for(std::chrono::milliseconds(adapt_every));
160 
161  #ifdef PARALLEL_TEMPERING_SHOW_DETAIL
162  show_statistics();
163  #endif
164 
165  adapt(); // TOOD: Check what counts as t
166  }
167  }
168 
169 
171 
172 // print("Starting swapper thread");
173 
174  // Start a swapper and adapter thread
175  std::thread swapper(&ParallelTempering<HYP>::__swapper_thread, this); // pass in the non-static mebers like this:
176  std::thread adapter(&ParallelTempering<HYP>::__adapter_thread, this);
177 
178  #ifdef PARALLEL_TEMPERING_SHOW_DETAIL
179  std::thread shower(&ParallelTempering<HYP>::__shower_thread, this);
180  #endif
181 
182  // run normal pool run
183  for(auto& h : ChainPool<HYP>::run(ctl)) {
184  co_yield h;
185  }
186 
187  // kill everything else once that thread is done
188  terminate = true;
189 
190  swapper.join();
191  adapter.join();
192 
193  #ifdef PARALLEL_TEMPERING_SHOW_DETAIL
194  shower.join();
195  #endif
196 
197 
198  }
199 
201 
202  // TODO: This raelly needs a lock, but for some reason there is a lock problem that eventually hangs
203  // when we do it. Not sure why, very hard to debug/understand. Might be that the current_mutex is held
204  // by the generator so when we try to get to this inside the generator loop, its a disaster?
205 
206  //std::lock_guard og(overall_mutex);
207 
208  COUT "# Pool info: \n";
209  for(size_t i=0;i<this->pool.size();i++) {
210 
211  // ugh locking doesn't work here..
212  //std::unique_lock guard(this->pool[i].current_mutex);
213  auto cpy = this->pool[i].current; // otherwise, without a mutex, it can change between accessing string and posterior, which is gnarly
214  //guard.unlock();
215 
216  print(i,
217  double(this->pool[i].temperature),
218  cpy.posterior,
219  this->pool[i].acceptance_ratio(),
220  swap_history[i].mean(),
221  int(swap_history[i].N),
222  this->pool[i].samples,
223  cpy.string()
224  );
225  }
226  }
227 
228  double k(unsigned long t, double v, double t0) {
229  return (1.0/v) * t0/(t+t0);
230  }
231 
232  void adapt(double v=3, double t0=1000000) {
233  std::lock_guard og(overall_mutex);
234  // no total lock necessary since we don't modify anything except the (atomic) temperature
235 
236  std::vector<double> sw(this->pool.size());
237 
238  for(size_t i=1;i<this->pool.size()-1;i++) { // never adjust i=0 (T=1) or the max temperature
239  sw[i] = log(this->pool[i].temperature - this->pool[i-1].temperature);
240 
241  if( swap_history[i].N>0 && swap_history[i+1].N>0 ) { // only adjust if there are samples
242  sw[i] += k(this->pool[i].samples, v, t0) * (swap_history[i].mean()-swap_history[i+1].mean());
243  }
244 
245  }
246 
247  // Reset all of the swap histories (otherwise we keep adapting in a bad way)
248  // As of Jan 2023 we don't reset the histories since these are finite_histories
249  //for(size_t i=1;i<this->pool.size();i++) {
250  // swap_history[i].reset();
251  // swap_history[i] << true; swap_history[i] << false; // this is a little +1 smoothing
252  // }
253 
254  // and then convert S to temperatures again
255  // but never adjust i=0 (T=1) OR the last one
256  for(size_t i=1;i<this->pool.size();i++) {
257  this->pool[i].temperature = this->pool[i-1].temperature + exp(sw[i]);
258  }
259  }
260 
261 };
262 
263 
264 
272 template<typename HYP>
273 class DataTempering : public ParallelTempering<HYP> {
274 
275 public:
276  std::vector<FiniteHistory<bool>> swap_history;
277  std::atomic<bool> terminate; // used to kill swapper and adapter
278 
279  DataTempering(HYP& h0, std::vector<typename HYP::data_t>& datas) : terminate(false) {
280 
281  swap_history.reserve(datas.size());
282 
283  // This version anneals on data, giving each chain a different amount in datas order
284  for(size_t i=0;i<datas.size();i++) {
285  this->pool.push_back(ChainPool<HYP>(i==0?h0:h0.restart(), &(datas[i])));
286  this->pool[i].temperature = 1.0;
287  swap_history.emplace_back();
288  }
289  }
290 
291 
292  // Same as ParallelTempering, but no adapter
294 
295  // Start a swapper and adapter thread
296  std::thread swapper(&ParallelTempering<HYP>::__swapper_thread, this, swap_every); // pass in the non-static mebers like this:
297 
298  // run normal pool run
299  for(auto& h : ChainPool<HYP>::run(ctl)) {
300  co_yield h;
301  }
302 
303  // kill everything else once that thread is done
304  terminate = true;
305 
306  swapper.join();
307  }
308 
309  void adapt(double v=3, double t0=1000000) { assert(false && "*** You should not call adapt for DataTempering");}
310 };
311 
Definition: OrderedLock.h:16
generator< HYP & > run(Control ctl)
Definition: ParallelTempering.h:170
A ChainPool stores a bunch of MCMCChains and allows you to run them serially or in parallel...
ParallelTempering(HYP &h0, typename HYP::data_t *d, unsigned long n, double maxT)
Definition: ParallelTempering.h:61
std::atomic< bool > CTRL_C
void __swapper_thread()
Definition: ParallelTempering.h:95
void __adapter_thread()
Definition: ParallelTempering.h:147
#define TAB
Definition: IO.h:19
std::vector< MCMCChain< HYP > > pool
Definition: ChainPool.h:29
time_ms adapt_every
Definition: ParallelTempering.h:35
std::atomic< bool > terminate
Definition: ParallelTempering.h:277
time_ms swap_every
Definition: ParallelTempering.h:34
void __shower_thread()
Definition: ParallelTempering.h:78
Definition: ParallelTempering.h:273
bool flip(float p=0.5)
Definition: Random.h:25
Definition: ChainPool.h:25
DataTempering(HYP &h0, std::vector< typename HYP::data_t > &datas)
Definition: ParallelTempering.h:279
double k(unsigned long t, double v, double t0)
Definition: ParallelTempering.h:228
OrderedLock overall_mutex
Definition: ParallelTempering.h:38
void print(FIRST f, ARGS... args)
Lock output_lock and print to std:cout.
Definition: IO.h:53
Definition: Control.h:23
Definition: ParallelTempering.h:26
std::atomic< bool > terminate
Definition: ParallelTempering.h:47
Definition: generator.hpp:21
void adapt(double v=3, double t0=1000000)
Definition: ParallelTempering.h:309
void show_statistics()
Definition: ParallelTempering.h:200
std::vector< double > temperatures
Definition: ParallelTempering.h:40
std::vector< FiniteHistory< bool > > swap_history
Definition: ParallelTempering.h:43
void swap(generator< T > &a, generator< T > &b)
Definition: generator.hpp:238
std::vector< FiniteHistory< bool > > swap_history
Definition: ParallelTempering.h:276
#define ENDL
Definition: IO.h:21
generator< HYP & > run(Control ctl, time_ms swap_every, time_ms adapt_every)
Definition: ParallelTempering.h:293
time_ms show_every
Definition: ParallelTempering.h:36
void adapt(double v=3, double t0=1000000)
Definition: ParallelTempering.h:232
#define COUT
Definition: IO.h:24
ParallelTempering(HYP &h0, typename HYP::data_t *d, std::initializer_list< double > t)
Definition: ParallelTempering.h:49
bool is_temperature
Definition: ParallelTempering.h:45