Pakman
Manager.cc
1 #include <string>
2 #include <memory>
3 
4 #include <assert.h>
5 
6 #include <mpi.h>
7 
8 #include "spdlog/spdlog.h"
9 
10 #include "core/common.h"
11 #include "mpi/mpi_common.h"
12 #include "mpi/mpi_utils.h"
13 
14 #include "ForkedWorkerHandler.h"
15 #include "MPIWorkerHandler.h"
16 
17 #include "Manager.h"
18 
19 // Construct from simulator, pointer to program terminated flag, and
20 // Worker type (forked vs MPI)
21 Manager::Manager(const Command &simulator, worker_t worker_type,
22  bool *p_program_terminated) :
23  m_simulator(simulator),
24  m_worker_type(worker_type),
25  m_p_program_terminated(p_program_terminated)
26 {
27 }
28 
29 // Destroy MPI_Request objects
31 {
32  // If MPI_Finalize has been called, nothing needs to be done
33  int finalized = 0;
34  MPI_Finalized(&finalized);
35 
36  if (finalized)
37  return;
38 
39  // Else free any non-null requests
40  if (m_message_request != MPI_REQUEST_NULL)
41  MPI_Request_free(&m_message_request);
42  if (m_signal_request != MPI_REQUEST_NULL)
43  MPI_Request_free(&m_signal_request);
44  if (m_error_code_request != MPI_REQUEST_NULL)
45  MPI_Request_free(&m_error_code_request);
46 }
47 
48 // Probe whether Manager is active
49 bool Manager::isActive() const
50 {
51  return m_state != terminated;
52 }
53 
54 // Iterate
56 {
57  // This function should never be called if the Manager has
58  // terminated
59  assert(m_state != terminated);
60 
61  // Switch based on state
62  switch (m_state)
63  {
64  case idle:
65  doIdleStuff();
66  break;
67 
68  case busy:
69  doBusyStuff();
70  break;
71 
72  default:
73  throw;
74  }
75 }
76 
77 // Do idle stuff
78 void Manager::doIdleStuff()
79 {
80  // Sanity check: m_p_worker_handler should be the null pointer
81  assert(!m_p_worker_handler);
82 
83  // Check for program termination interrupt
84  if (*m_p_program_terminated)
85  {
86  // Terminate Manager
87  m_state = terminated;
88  return;
89  }
90 
91  // Check for Manager termination signal
92  if (probeSignal())
93  {
94  switch (receiveSignal())
95  {
96  case TERMINATE_MANAGER_SIGNAL:
97 
98  spdlog::debug("Idle manager {}/{}: received "
99  "TERMINATE_MANAGER_SIGNAL!",
100  get_mpi_comm_world_rank(), get_mpi_comm_world_size());
101 
102  // Terminate Manager
103  m_state = terminated;
104  return;
105 
106  // An idle Manager can receive FLUSH_WORKER_SIGNAL when the Master
107  // has not yet checked for the Manager's message. In this case,
108  // nothing needs to be done
109  case FLUSH_WORKER_SIGNAL:
110 
111  spdlog::debug("Idle manager {}/{}: received "
112  "FLUSH_WORKER_SIGNAL!",
113  get_mpi_comm_world_rank(), get_mpi_comm_world_size());
114 
115  return;
116 
117  // TERMINATE_MANAGER_SIGNAL and FLUSH_WORKER_SIGNAL are the
118  // only valid Master signals
119  default:
120  throw;
121  }
122  }
123 
124  // Check for message
125  if (probeMessage())
126  {
127  spdlog::debug("Idle manager {}/{}: received message!",
128  get_mpi_comm_world_rank(), get_mpi_comm_world_size());
129 
130  // Receive input string and create new Worker
131  std::string input_string = receiveMessage();
132  createWorker(input_string);
133 
134  // Switch to busy state
135  m_state = busy;
136  return;
137  }
138 }
139 
140 // Do busy stuff
141 void Manager::doBusyStuff()
142 {
143  // Sanity check: m_p_worker_handler should not be the null pointer
144  assert(m_p_worker_handler);
145 
146  // Check for program termination interrupt
147  if (*m_p_program_terminated)
148  {
149  // Terminate Worker
150  terminateWorker();
151 
152  // Terminate Manager
153  m_state = terminated;
154  return;
155  }
156 
157  // Check for Manager or Worker termination signal
158  if (probeSignal())
159  {
160  switch (receiveSignal())
161  {
162  case TERMINATE_MANAGER_SIGNAL:
163 
164  spdlog::debug("Busy manager {}/{}: received "
165  "TERMINATE_MANAGER_SIGNAL!",
166  get_mpi_comm_world_rank(), get_mpi_comm_world_size());
167 
168  // Terminate Worker
169  terminateWorker();
170 
171  // Terminate Manager
172  m_state = terminated;
173  return;
174 
175  case FLUSH_WORKER_SIGNAL:
176 
177  spdlog::debug("Busy manager {}/{}: received "
178  "FLUSH_WORKER_SIGNAL!",
179  get_mpi_comm_world_rank(), get_mpi_comm_world_size());
180 
181  // Flush Worker
182  flushWorker();
183 
184  // Send WORKER_FLUSHED_SIGNAL result
185  sendSignalToMaster(WORKER_FLUSHED_SIGNAL);
186 
187  // Switch to idle state
188  m_state = idle;
189  return;
190 
191  // TERMINATE_MANAGER_SIGNAL and FLUSH_WORKER_SIGNAL are the
192  // only valid Master signals
193  default:
194  throw;
195  }
196  }
197 
198  // Check if Worker has finished
199  if (m_p_worker_handler->isDone())
200  {
201  spdlog::debug("Busy manager {}/{}: Worker is done!",
202  get_mpi_comm_world_rank(), get_mpi_comm_world_size());
203 
204  // Get output string
205  std::string output_string = m_p_worker_handler->getOutput();
206 
207  // Get error code
208  int error_code = m_p_worker_handler->getErrorCode();
209 
210  // Send output string to master
211  sendMessageToMaster(output_string);
212 
213  // Send error code to master
214  sendErrorCodeToMaster(error_code);
215 
216  // Flush Worker
217  flushWorker();
218 
219  // Switch to idle state
220  m_state = idle;
221  return;
222  }
223 
224  // Sanity check: No message should be received in the busy state
225  assert(probeMessage() == false);
226 }
227 
228 // Create Worker
229 void Manager::createWorker(const std::string& input_string)
230 {
231  // Sanity check: m_p_worker_handler should be the null pointer
232  assert(!m_p_worker_handler);
233 
234  // Switch on Worker type
235  switch (m_worker_type)
236  {
237  // Fork Worker
238  case forked_worker:
239  m_p_worker_handler =
240  std::unique_ptr<ForkedWorkerHandler>(
241  new ForkedWorkerHandler(m_simulator, input_string));
242  break;
243 
244  // Spawn MPI Worker
245  case mpi_worker:
246  m_p_worker_handler =
247  std::unique_ptr<MPIWorkerHandler>(
248  new MPIWorkerHandler(m_simulator, input_string));
249  break;
250 
251  default:
252  throw std::runtime_error("Worker type not recognised");
253  }
254 }
255 
256 // Flush Worker
257 void Manager::flushWorker()
258 {
259  // Sanity check: This function should not be called when
260  // m_p_worker_handler is the null pointer
261  assert(m_p_worker_handler);
262 
263  // Reset m_p_worker_handler to null pointer,
264  // this flushes the worker
265  m_p_worker_handler.reset();
266 }
267 
268 // Terminate Worker
269 void Manager::terminateWorker()
270 {
271  // Sanity check: This function should not be called when
272  // m_p_worker_handler is the null pointer
273  assert(m_p_worker_handler);
274 
275  // Reset m_p_worker_handler to null pointer
276  m_p_worker_handler.reset();
277 }
278 
279 // Probe for message
280 bool Manager::probeMessage() const
281 {
282  return iprobe_wrapper(MASTER_RANK, MASTER_MSG_TAG, MPI_COMM_WORLD);
283 }
284 
285 // Probe for signal
286 bool Manager::probeSignal() const
287 {
288  return iprobe_wrapper(MASTER_RANK, MASTER_SIGNAL_TAG, MPI_COMM_WORLD);
289 }
290 
291 // Receive message
292 std::string Manager::receiveMessage() const
293 {
294  // Sanity check: probeMessage must return true
295  assert(probeMessage());
296 
297  return receive_string(MPI_COMM_WORLD, MASTER_RANK, MASTER_MSG_TAG);
298 }
299 
300 // Receive signal
301 int Manager::receiveSignal() const
302 {
303  // Sanity check: probeSignal must return true
304  assert(probeSignal());
305 
306  return receive_integer(MPI_COMM_WORLD, MASTER_RANK, MASTER_SIGNAL_TAG);
307 }
308 
309 // Send message to Master
310 void Manager::sendMessageToMaster(const std::string& message_string)
311 {
312  // Ensure previous message has finished sending
313  MPI_Wait(&m_message_request, MPI_STATUS_IGNORE);
314 
315  // Store message string in buffer
316  m_message_buffer.assign(message_string);
317 
318  // Note: Isend is used here to avoid deadlock since the Master and the root
319  // Manager are executed by the same process
320  MPI_Isend(
321  m_message_buffer.c_str(),
322  m_message_buffer.size() + 1,
323  MPI_CHAR, MASTER_RANK, MANAGER_MSG_TAG,
324  MPI_COMM_WORLD,
325  &m_message_request);
326 }
327 
328 // Send signal to Master
329 void Manager::sendSignalToMaster(int signal)
330 {
331  // Ensure previous signal has finished sending
332  MPI_Wait(&m_signal_request, MPI_STATUS_IGNORE);
333 
334  // Store signal in buffer
335  m_signal_buffer = signal;
336 
337  // Note: Isend is used here to avoid deadlock since the Master and the root
338  // Manager are executed by the same process
339  MPI_Isend(&m_signal_buffer, 1, MPI_INT, MASTER_RANK,
340  MANAGER_SIGNAL_TAG, MPI_COMM_WORLD,
341  &m_signal_request);
342 }
343 
344 // Send error code to Master
345 void Manager::sendErrorCodeToMaster(int error_code)
346 {
347  // Ensure previous error code has finished sending
348  MPI_Wait(&m_error_code_request, MPI_STATUS_IGNORE);
349 
350  // Store error_code in buffer
351  m_error_code_buffer = error_code;
352 
353  // Note: Isend is used here to avoid deadlock since the Master and the root
354  // Manager are executed by the same process
355  MPI_Isend(&m_error_code_buffer, 1, MPI_INT, MASTER_RANK,
356  MANAGER_ERROR_CODE_TAG, MPI_COMM_WORLD,
357  &m_error_code_request);
358 }
worker_t
Definition: Manager.h:36
Manager(const Command &simulator, worker_t worker_type, bool *p_program_terminated)
Definition: Manager.cc:21
bool isActive() const
Definition: Manager.cc:49
void iterate()
Definition: Manager.cc:55
~Manager()
Definition: Manager.cc:30