Crombie Tools
FileConfig.h
Go to the documentation of this file.
1 #ifndef CROMBIE_CONFIGREADER_H
2 #define CROMBIE_CONFIGREADER_H
3 
4 
5 /**
6  @file ConfigReader.h
7 
8  Defines functions for reading configuration files
9 */
10 
11 #include <unistd.h>
12 
13 #include <iostream>
14 #include <functional>
15 #include <vector>
16 #include <string>
17 #include <queue>
18 #include <fstream>
19 #include <map>
20 #include <mutex>
21 #include <algorithm>
22 #include <thread>
23 #include <chrono>
24 #include <list>
25 
26 #include <sys/stat.h>
27 
28 #include "crombie/Types.h"
29 #include "crombie/Misc.h"
30 #include "crombie/Parse.h"
31 #include "crombie/FileSystem.h"
32 
33 namespace crombie {
34  namespace FileConfig {
35 
36  namespace {
37  std::string dirclean (const std::string& dir) {
38  if (dir.size() and dir.back() != '/')
39  return dir + "/";
40  return dir;
41  }
42  }
43 
44  /// The kinds of processes
45  enum class Type {
46  Data,
47  Background,
48  Signal
49  };
50 
51  /**
52  @class FileInfo
53  Hold the necessary information for running over a single file.
54  */
55  struct FileInfo {
56  /// Default constructor
57  FileInfo() {}
58  /// Set values for everything in the structure
59  FileInfo(const Type type, const std::string& dirname, const std::string& filename, const Types::strings& cuts = {"1.0"})
60  : type{type}, dirname{dirname}, name{filename}, size{FileSystem::get_size(name.data())}, cuts{cuts} {
61  Debug::Debug(__PRETTY_FUNCTION__, dirname, filename, size, cuts.size());
62  }
63  Type type {}; ///< Type of process this file is
64  std::string dirname {}; ///< Directory this file is in
65  std::string name {}; ///< The full path to the file
66  unsigned long size {}; ///< The size of the file. Used for priority
67  Types::strings cuts {}; ///< Cuts that split this file into different processes or legend entries
68  };
69 
70 
71  /// The processes that a directory can be divided between
72  struct Process {
73  /**
74  @param treename The name that this process will have in datacards
75  @param entry The entry in plot legends for this process. `"_"` is replaced with `" "`.
76  @param cut Is the cut applied to the file to create this process
77  @param style Some style number that is used to make plots
78  */
79  Process(const std::string& treename,
80  const std::string& entry,
81  const std::string& cut,
82  const short style)
83  : treename{treename}, legendentry{entry}, cut{cut}, style{style} {
84  std::replace(legendentry.begin(), legendentry.end(), '_', ' ');
85  }
86  const std::string& treename;
87  std::string legendentry;
88  const std::string& cut;
89  const short style;
90  };
91 
92 
93  /**
94  @class DirectoryInfo
95  Holds information for the files to run over as well as relevant processes
96  */
97  class DirectoryInfo {
98  public:
99 
100  DirectoryInfo (const std::string& line, const Type type, const std::vector<Process>& processes)
101  : name{getname(line)}, xs{getxs(line)}, type{type}, processes{processes} {
102  Debug::Debug(__PRETTY_FUNCTION__, line, name, xs, processes.size());
103  fillfiles();
104  }
105 
106  const std::string name; ///< The name of the directory
107  const double xs; ///< The cross section of this sample
108  const Type type; ///< The kind of process this object points to
109 
110  std::vector<FileInfo> files {}; ///< The file infos
111  const std::vector<Process> processes;
112 
113  private:
114  /**
115  Helper function to extract directory name from config line.
116  If the config line is actually a file, then it returns the unchanged file name.
117  */
118  static std::string getname(const std::string& line);
119 
120  /// Helper function to extract cross section
121  static double getxs(const std::string& line) {
122  auto begin = line.find('{') + 1;
123  auto val = line.substr(begin, line.find('}') - begin);
124  return val.size() ? std::stod(val) : 0;
125  }
126 
127  /// Fill the file info for this object
128  void fillfiles();
129  };
130 
131  // These types are used for runfiles
132 
133  /**
134  The type that is used for the mapping function
135  */
136  template<typename M> using MapFunc = std::function<M(const FileInfo&)>;
137 
138  /**
139  The parameter passed to the FileConfig::runfiles reduce function
140  @param M is the type given by a SingleOut mapping function
141  */
142  template<typename M> using ToMerge = Types::map<std::list<M>>;
143 
144  /**
145  A functional type that is not necessary, but might be a useful shortcut
146  @param R The output of the reduction formula
147  @param M The output of the map formula
148  */
149  template<typename R, typename M> using MergeFunc = std::function<R(const ToMerge<M>&)>;
150 
151 
152  /**
153  @class FileConfig
154  A single object that holds all of the information about the files to run on.
155  This is the result of reading a single file configuration file.
156  */
157  class FileConfig {
158  public:
159  FileConfig(const std::string& inputdir, const bool onedir = true);
160 
161  /**
162  This will perform a map-reduce operation on all of the files in this configuration.
163  Takes two functions. One to map files to outputs and one to reduce these outputs.
164  @param M The type of output of the map function
165  @param R The type of reduce
166  @param map Is a function that takes a FileInfo as input, and outputs some object of type M
167  @param reduce is fed ToMerge<M> as input. Whatever reduce returns is returned by runfiles.
168  */
169  template <typename M, typename R>
170  auto runfiles (MapFunc<M> map, R reduce);
171 
172  /// Read the directory infos
173  const std::vector<DirectoryInfo>& get_dirs () const { return dirinfos; }
174 
175  /// Read a single directory info
176  const DirectoryInfo& get_dir (const std::string& dirname) const;
177 
178  /// Say if this object has MC directories stored in it
179  const bool has_mc () const { return _has_mc; }
180  /// Say if this object has data directories stored in it
181  const bool has_data () const { return _has_data; }
182 
183  private:
184  std::vector<DirectoryInfo> dirinfos; ///< Internal store of DirectoryInfo objects
185  const std::string inputdir; ///< The directory containing the files
186 
187  bool _has_mc {false}; ///< Tracks when MC files are stored
188  bool _has_data {false}; ///< Tracks when data files are stored
189 
190  friend std::istream& operator>>(std::istream& is, FileConfig& config);
191  };
192 
193  std::istream& operator>>(std::istream& is, FileConfig& config);
194 
195  /// Reads a configuration file for file info
196  FileConfig read(const std::string& indir, const std::string& config) {
197  FileConfig output {indir, false};
198  std::ifstream input {config};
199  input >> output;
200  return output;
201  }
202 
203 
204  // IMPLEMENTATIONS BELOW HERE //
205 
206  FileConfig::FileConfig(const std::string& inputdir, const bool onedir)
207  : inputdir{onedir ? "" : dirclean(inputdir)} {
208  if (onedir) {
209  std::stringstream input {inputdir + " {}"};
210  input >> *this;
211  }
212  }
213 
214 
215  const DirectoryInfo& FileConfig::get_dir(const std::string& dirname) const {
216  for (auto& dir : dirinfos)
217  if (dir.name == dirname)
218  return dir;
219 
220  throw std::logic_error{dirname + " does not seem to be a valid directory name"};
221  }
222 
223 
224  std::string DirectoryInfo::getname(const std::string& line) {
225  std::string dir {line.substr(0, line.find(' '))};
226  // Just return the file name if a .root file.
227  return (dir.substr(dir.size() - 5, 5) == ".root") ? dir : dirclean(dir);
228  }
229 
230 
232  if (FileSystem::exists(name)) {
234  for (auto& proc : processes)
235  cuts.push_back(proc.cut);
236 
237  if (name.back() != '/') // If the directory info is actually a file, just push back one file
238  files.push_back({type, name, name, cuts});
239  else { // Otherwise, push back all of the files
240  for (auto& file : FileSystem::list(name)) {
241  // Only loading .root files
242  if (file.find(".root") != std::string::npos)
243  files.push_back({type, name, name + file, cuts});
244  }
245  }
246 
247  }
248  else {
249  throw std::runtime_error{"Path " + name + " does not exist"};
250  }
251  }
252 
253 
254  std::istream& operator>>(std::istream& is, FileConfig& config) {
255  Type current_type = Type::Data;
256  std::vector<Process> current_procs {{"data_obs", "Data", "1.0", 1}};
257 
258  // This is the type we change to when we see process lines
259  Type default_type = Type::Background;
260 
261  std::string entry; // Hold these things as
262  std::string cut; // we go along in case there's
263  short style; // a less complete line
264 
265  bool in_dirs = true;
266 
267  for (auto& line : Parse::parse(is)) {
268  // Set the default type, if needed
269  const Types::map<Type> default_lines {
270  {"DATA", Type::Data},
271  {"SIGNAL", Type::Signal},
272  {"MC", Type::Background}
273  };
274  if (default_lines.find(line) != default_lines.end()) {
275  default_type = default_lines.at(line);
276  continue;
277  }
278 
279  if (line.find('{') == std::string::npos) { // If not a line about directory info
280  if (in_dirs) { // Time to reset processes if new
281  in_dirs = false;
282  current_procs.clear();
283  current_type = default_type;
284  }
285  // Read the line cuts
286  auto tokens = Misc::tokenize(line);
287  // Update these values if needed
288  if (tokens.size() > 1) {
289  entry = tokens[1];
290  cut = tokens.size() == 4 ? tokens[2] : "1.0";
291  style = std::stoi(tokens.back());
292  }
293  current_procs.push_back({tokens[0], entry, cut, style});
294  }
295  else { // Otherwise add a DirectoryInfo
296  if (current_type != Type::Data)
297  config._has_mc = true;
298  else
299  config._has_data = true;
300  in_dirs = true;
301  config.dirinfos.push_back({config.inputdir + line, current_type, current_procs});
302  }
303  }
304  return is;
305  }
306 
307 
308  /// Compares FileInfo objects based on the size of the files; used for queue priority
309  bool operator<(const FileInfo& a, const FileInfo& b) {
310  return a.size < b.size;
311  }
312 
313 
314  template <typename M, typename R>
316  unsigned nthreads = std::stoi(Misc::env("nthreads", "1"));
317  ToMerge<M> outputs; // This is fed into reduce, in addition to the directory infos
318  std::priority_queue<FileInfo> queue;
319  for (const auto& dirinfo : dirinfos) {
320  for (const auto& fileinfo : dirinfo.files)
321  queue.push(fileinfo);
322  }
323 
324  // Launch threads
325  std::mutex inlock;
326  std::mutex outlock;
327 
328  std::vector<std::thread> threads;
329  nthreads = std::max(nthreads, 1u);
330 
331  auto total = queue.size();
332  auto divisor = total/100 + 1;
333  auto progress = total/divisor;
334 
335  std::cout << "Using " << nthreads << " threads to run over " << total << " files" << std::endl;
336 
337  Misc::draw_progress(0, progress);
338 
339  std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
340 
341  for (unsigned i = 0; i < nthreads; ++i) {
342  threads.push_back(std::thread([&] () {
343  bool running = true;
344  while(true) {
345  Debug::Debug(__PRETTY_FUNCTION__, "Start thread loop");
346  FileInfo info;
347  inlock.lock();
348  running = !queue.empty();
349  if (running) {
350  info = queue.top();
351  auto done = total - queue.size();
352  if (done % divisor == 0) {
353  Misc::draw_progress(std::min(done/divisor, progress - 1), progress);
354  }
355  queue.pop();
356  }
357  inlock.unlock();
358  if (not running)
359  break;
360  try {
361  auto fileoutput = map(info);
362  outlock.lock();
363  outputs[info.dirname].push_back(std::move(fileoutput));
364  outlock.unlock();
365  }
366  catch (const std::exception& e) {
367  std::cerr << "Cannot run over file " << info.name << std::endl;
368  std::cerr << e.what() << std::endl;
369  }
370  }
371  })
372  );
373  }
374 
375  for (auto& thread : threads)
376  thread.join();
377 
378  Misc::draw_progress(progress, progress);
379  std::cout << std::endl; // New line after progress bar
380 
381  std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
382 
383  std::cout << "Ran over " << total << " files in "
384  << std::chrono::duration_cast<std::chrono::seconds>(end - start).count()
385  << " seconds" << std::endl;
386 
387  return reduce(outputs);
388  }
389 
390  }
391 }
392 
393 
394 #endif