1 """ @file Parallelization.py 2 Defines the CrombieTools.Parallelization package. 3 @author Daniel Abercrombie <dabercro@mit.edu> 5 @package CrombieTools.Parallelization 6 Package for running processes in parallel. 7 Submodule of CrombieTools. 11 from multiprocessing
import Process, Queue
12 from Queue
import Empty
15 """Number of processors from environment""" 16 DefaultNumProcs = int(os.environ.get(
'CrombieNLocalProcs')
or 1)
18 def RunParallel(objectToRun, functionName, parametersLists, procs=DefaultNumProcs, printing=True, doCopy=True):
19 """ Starts parallel processes. 21 @param objectToRun is an objectToRun that can be copied and run independently when 22 it's own Copy() function is called. 23 @param functionName is the str name of the function that will be run in multiple instances. 24 @param parametersLists is a list of lists. Each sublist contains the parameters for the functionName. 25 @param procs is the maximum number of processors that will be used. 26 @param printing tells which files are about to be processed and how long they took if True. 27 @param doCopy copies the object that the function should be run over 32 if 'Copy' not in dir(objectToRun)
and doCopy:
33 print(
'Object not copyable.')
37 if not functionName
in dir(objectToRun):
38 print(
'You gave an invalid function name!')
41 if 'GetOutDirectory' in dir(objectToRun):
42 outDir = str(objectToRun.GetOutDirectory())
43 if not os.path.exists(outDir):
46 def runOnQueue(inQueue):
49 objCopy = objectToRun.Copy()
if doCopy
else objectToRun
50 functionToRun = getattr(objCopy, functionName)
54 parameters = inQueue.get(
True, 1)
56 print(
'About to process ' + str(parameters))
59 functionToRun(*parameters)
61 print(
'Finished ' + str(parameters) +
' ... Elapsed time: ' + str(time() - startTime) +
' seconds')
65 print(
'Worker finished...')
73 print(
'About to use {0} processers.'.format(procs))
78 for parameters
in parametersLists:
79 theQueue.put(parameters)
81 for worker
in range(procs):
82 aProcess = Process(target=runOnQueue, args=(theQueue,))
84 theProcesses.append(aProcess)
86 for aProccess
in theProcesses:
90 print(
'Total jobs: ' + str(len(parametersLists)))
91 print(
'Total time: ' + str(time() - totStartTime) +
' seconds\n')
95 """ Runs an objectToRun over a directory. 97 @param objectToRun has GetInDirectory() and RunOnFile() function members. 98 This function then runs the objectToRun's over all the files in that directory. 99 @param procs is the maximum number of processes to start. 100 @param printing tells which files are about to be processed and how long they took if True. 104 if not 'GetInDirectory' in dir(objectToRun):
105 print(
'##########################################')
106 print(
'# Missing GetInDirectory in this object. #')
107 print(
'##########################################')
110 inDir = str(objectToRun.GetInDirectory())
113 if 'GetOutDirectory' in dir(objectToRun):
114 outDir = str(objectToRun.GetOutDirectory())
117 return os.path.getsize(inDir + name)
119 for inFileName
in sorted(os.listdir(inDir), key=GetSize, reverse=
True):
120 if inFileName.endswith(
'.root'):
121 if outDir
and os.path.exists(outDir + inFileName):
122 print(outDir + inFileName +
' already exists !!! ... Skipping.')
124 theFiles.append([inFileName])
126 RunParallel(objectToRun,
'RunOnFile', theFiles, procs, printing)