Crombie Tools
Parallelization.py
Go to the documentation of this file.
1 """ @file Parallelization.py
2 Defines the CrombieTools.Parallelization package.
3 @author Daniel Abercrombie <dabercro@mit.edu>
4 
5 @package CrombieTools.Parallelization
6 Package for running processes in parallel.
7 Submodule of CrombieTools.
8 """
9 
10 import os
11 from multiprocessing import Process, Queue
12 from Queue import Empty
13 from time import time
14 
15 """Number of processors from environment"""
16 DefaultNumProcs = int(os.environ.get('CrombieNLocalProcs') or 1)
17 
18 def RunParallel(objectToRun, functionName, parametersLists, procs=DefaultNumProcs, printing=True, doCopy=True):
19  """ Starts parallel processes.
20 
21  @param objectToRun is an objectToRun that can be copied and run independently when
22  it's own Copy() function is called.
23  @param functionName is the str name of the function that will be run in multiple instances.
24  @param parametersLists is a list of lists. Each sublist contains the parameters for the functionName.
25  @param procs is the maximum number of processors that will be used.
26  @param printing tells which files are about to be processed and how long they took if True.
27  @param doCopy copies the object that the function should be run over
28  """
29 
30  totStartTime = time()
31 
32  if 'Copy' not in dir(objectToRun) and doCopy:
33  print('Object not copyable.')
34  print('Exiting...')
35  exit(1)
36 
37  if not functionName in dir(objectToRun):
38  print('You gave an invalid function name!')
39  exit(1)
40 
41  if 'GetOutDirectory' in dir(objectToRun):
42  outDir = str(objectToRun.GetOutDirectory())
43  if not os.path.exists(outDir):
44  os.makedirs(outDir)
45 
46  def runOnQueue(inQueue):
47  running = True
48 
49  objCopy = objectToRun.Copy() if doCopy else objectToRun
50  functionToRun = getattr(objCopy, functionName)
51 
52  while running:
53  try:
54  parameters = inQueue.get(True, 1)
55  if printing:
56  print('About to process ' + str(parameters))
57 
58  startTime = time()
59  functionToRun(*parameters)
60  if printing:
61  print('Finished ' + str(parameters) + ' ... Elapsed time: ' + str(time() - startTime) + ' seconds')
62 
63  except Empty:
64  if printing:
65  print('Worker finished...')
66 
67  running = False
68 
69  if (doCopy):
70  del objCopy
71 
72  if printing:
73  print('About to use {0} processers.'.format(procs))
74 
75  theQueue = Queue()
76  theProcesses = []
77 
78  for parameters in parametersLists:
79  theQueue.put(parameters)
80 
81  for worker in range(procs):
82  aProcess = Process(target=runOnQueue, args=(theQueue,))
83  aProcess.start()
84  theProcesses.append(aProcess)
85 
86  for aProccess in theProcesses:
87  aProccess.join()
88 
89  print('All done!\n')
90  print('Total jobs: ' + str(len(parametersLists)))
91  print('Total time: ' + str(time() - totStartTime) + ' seconds\n')
92 
93 
94 def RunOnDirectory(objectToRun, procs=DefaultNumProcs, printing=True):
95  """ Runs an objectToRun over a directory.
96 
97  @param objectToRun has GetInDirectory() and RunOnFile() function members.
98  This function then runs the objectToRun's over all the files in that directory.
99  @param procs is the maximum number of processes to start.
100  @param printing tells which files are about to be processed and how long they took if True.
101  """
102  theFiles = []
103 
104  if not 'GetInDirectory' in dir(objectToRun):
105  print('##########################################')
106  print('# Missing GetInDirectory in this object. #')
107  print('##########################################')
108  exit(1)
109 
110  inDir = str(objectToRun.GetInDirectory())
111 
112  outDir = None
113  if 'GetOutDirectory' in dir(objectToRun):
114  outDir = str(objectToRun.GetOutDirectory())
115 
116  def GetSize(name):
117  return os.path.getsize(inDir + name)
118 
119  for inFileName in sorted(os.listdir(inDir), key=GetSize, reverse=True):
120  if inFileName.endswith('.root'):
121  if outDir and os.path.exists(outDir + inFileName):
122  print(outDir + inFileName + ' already exists !!! ... Skipping.')
123  continue
124  theFiles.append([inFileName])
125 
126  RunParallel(objectToRun, 'RunOnFile', theFiles, procs, printing)