Crombie Tools
ParallelRunner.h
Go to the documentation of this file.
1 /**
2  @file ParallelRunner.h
3 
4  Defines the ParallelRunner class
5 
6  @author Daniel Abercrombie <dabercro@mit.edu>
7 */
8 
9 #ifndef CROMBIETOOLS_COMMONTOOLS_PARALLELRUNNER_H
10 #define CROMBIETOOLS_COMMONTOOLS_PARALLELRUNNER_H
11 
12 #include <vector>
13 #include <queue>
14 
15 #include "TMutex.h"
16 #include "TThread.h"
17 
18 #include "FileConfigReader.h"
19 
20 TMutex queue_lock;
21 TMutex output_lock;
22 TMutex root_lock; // For doing sketchy ROOT things
23 
24 /**
25  @ingroup commongroup
26  @class ParallelRunner
27  Class for running over FileInfos in parallel
28 */
29 
31  public:
32  /// Set the number of cores to use in the plotting
33  void SetNumThreads ( UInt_t nthreads ) { fNumThreads = nthreads; }
34 
35  protected:
36  /// Run over all of the filled FileInfos
37  void RunThreads ();
38 
39  /// Virtual function that must be implemented in inheriting class
40  virtual void RunFile (FileInfo& info) = 0;
41 
42  private:
43  /// Number of cores to prepare plots with
44  UInt_t fNumThreads {1};
45 
46  /// Files to run over in parallel
47  std::priority_queue<FileInfo> file_queue;
48 
49  /// Runs a single thread over files
50  static void* RunThread (void* prep);
51 
52  static unsigned numfiles;
53  static unsigned maxfiles;
54 };
55 
56 unsigned ParallelRunner::numfiles = 0;
57 unsigned ParallelRunner::maxfiles = 0;
58 
60  for (auto type : gFileTypes) {
61  const auto& infos = *(GetFileInfo(type));
62  numfiles += infos.size();
63  for (auto info : infos)
64  file_queue.push(*info);
65  }
66 
68 
69  if (not fNumThreads)
70  fNumThreads++;
71 
72  std::vector<TThread*> threads;
73  for (decltype(fNumThreads) i_thread = 0; i_thread < fNumThreads; ++i_thread) {
74  TThread* temp = new TThread(RunThread, this);
75  threads.push_back(temp);
76  temp->Run(this);
77  }
78 
79  for (auto thread : threads) {
80  thread->Join();
81  root_lock.Lock();
82  delete thread;
83  root_lock.UnLock();
84  }
85 }
86 
87 void* ParallelRunner::RunThread(void* prep) {
88  auto* runner = reinterpret_cast<ParallelRunner*>(prep);
89  bool running = true;
90  FileInfo info;
91  while(true) {
92  queue_lock.Lock();
93  running = !runner->file_queue.empty();
94  if (running) {
95  info = runner->file_queue.top();
96  runner->file_queue.pop();
97  }
98  queue_lock.UnLock();
99 
100  if (not running)
101  break;
102 
103  runner->RunFile(info);
104  output_lock.Lock();
105  std::cout << "Files remaining: " << --numfiles << "/" << maxfiles
106  << " Finished: " << info.fFileName << std::endl;
107  output_lock.UnLock();
108  }
109  return nullptr;
110 }
111 
112 #endif