Pakman
MPIMaster.cc
1 #include <iostream>
2 #include <memory>
3 #include <string>
4 #include <queue>
5 
6 #include <assert.h>
7 
8 #include <mpi.h>
9 
10 #include "spdlog/spdlog.h"
11 
12 #include "mpi/mpi_utils.h"
13 #include "mpi/mpi_common.h"
14 #include "controller/AbstractController.h"
15 
16 #include "MPIMaster.h"
17 
18 // Construct from pointer to program terminated flag
19 MPIMaster::MPIMaster(bool *p_program_terminated) :
20  AbstractMaster(p_program_terminated),
21  m_comm_size(get_mpi_comm_world_size()),
22  m_map_manager_to_task(get_mpi_comm_world_size()),
23  m_message_buffers(get_mpi_comm_world_size())
24 {
25  // Initialize requests to MPI_REQUEST_NULL
26  // and initialize idle managers
27  for (int i = 0; i < m_comm_size; i++)
28  {
29  m_message_requests.push_back(MPI_REQUEST_NULL);
30  m_signal_requests.push_back(MPI_REQUEST_NULL);
31  m_idle_managers.insert(i);
32  }
33 }
34 
35 // Destroy MPI_Request objects
37 {
38  // If MPI_Finalize has been called, nothing needs to be done
39  int finalized = 0;
40  MPI_Finalized(&finalized);
41 
42  if (finalized)
43  return;
44 
45  // Else free any non-null requests
46  for (int i = 0; i < m_comm_size; i++)
47  {
48  if (m_message_requests[i] != MPI_REQUEST_NULL)
49  MPI_Request_free(&m_message_requests[i]);
50  if (m_signal_requests[i] != MPI_REQUEST_NULL)
51  MPI_Request_free(&m_signal_requests[i]);
52  }
53 }
54 
55 // Probe whether Master is active
56 bool MPIMaster::isActive() const
57 {
58  return m_state != terminated;
59 }
60 
61 // Iterate
63 {
64  // This function should never be called recursively
65  assert(!m_entered);
66  m_entered = true;
67 
68  // This function should never be called if the Master has
69  // terminated
70  assert(m_state != terminated);
71 
72  // Switch based on state
73  switch (m_state)
74  {
75  case normal:
76  doNormalStuff();
77  break;
78 
79  case flushing:
80  doFlushingStuff();
81  break;
82 
83  default:
84  throw;
85  }
86 
87  m_entered = false;
88 }
89 
90 // Returns true if more pending tasks are needed
92 {
93  return m_pending_tasks.size() < m_comm_size;
94 }
95 
96 // Do normal stuff
97 void MPIMaster::doNormalStuff()
98 {
99  // This function should never be called if the Master has
100  // terminated
101  assert(m_state != terminated);
102 
103  // Check for program termination interrupt
104  if (programTerminated())
105  {
106  // Terminate Master
107  m_state = terminated;
108  return;
109  }
110  // Listen to Managers
111  listenToManagers();
112 
113  // Pop finished tasks from busy queue and insert into finished queue
114  popBusyQueue();
115 
116  // Call controller
117  if (auto p_controller = m_p_controller.lock())
118  p_controller->iterate();
119 
120  // Check for termination of Master and Managers
121  if (m_master_manager_terminated)
122  {
123  // Send TERMINATE_MANAGER_SIGNAL to all Managers
124  sendSignalToAllManagers(TERMINATE_MANAGER_SIGNAL);
125 
126  // Terminate Master
127  m_state = terminated;
128  return;
129  }
130 
131  // Check for flushing of Workers
132  if (m_worker_flushed)
133  {
134  spdlog::debug("MPIMaster::doNormalStuff: Flushing workers!");
135 
136  // Send FLUSH_WORKER_SIGNAL to all Managers
137  sendSignalToAllManagers(FLUSH_WORKER_SIGNAL);
138 
139  // Reset flag
140  m_worker_flushed = false;
141 
142  // Switch to flushing state
143  m_state = flushing;
144  return;
145  }
146 
147  // Delegate tasks to Managers
148  delegateToManagers();
149 }
150 
151 // Do flushing stuff
152 void MPIMaster::doFlushingStuff()
153 {
154  // This function should never be called if the Master has
155  // terminated
156  assert(m_state != terminated);
157 
158  // Check for program termination interrupt
159  if (programTerminated())
160  {
161  // Terminate Master
162  m_state = terminated;
163  return;
164  }
165 
166  // Check for termination of Master and Managers
167  if (m_master_manager_terminated)
168  {
169  // Send TERMINATE_MANAGER_SIGNAL to all Managers
170  sendSignalToAllManagers(TERMINATE_MANAGER_SIGNAL);
171 
172  // Terminate Master
173  m_state = terminated;
174  return;
175  }
176 
177  // Discard messages, error codes and signals
178  discardMessagesErrorCodesAndSignals();
179 
180  // If all Managers are idle, transition to normal state
181  if (m_idle_managers.size() == m_comm_size)
182  {
183  // Debug info
184  if (spdlog::get(g_program_name)->level() <= spdlog::level::debug)
185  {
186  spdlog::debug("MPIMaster::doFlushingStuff: "
187  "transition to normal state!");
188  spdlog::debug("Idle managers:");
189  for (auto it = m_idle_managers.begin();
190  it != m_idle_managers.end(); it++)
191  spdlog::debug("{}", *it);
192  spdlog::debug("-- END --");
193  }
194 
195  m_state = normal;
196  return;
197  }
198 }
199 
200 // Push pending task
201 void MPIMaster::pushPendingTask(const std::string& input_string)
202 {
203  m_pending_tasks.push(input_string);
204 }
205 
206 // Returns whether finished tasks queue is empty
208 {
209  return m_finished_tasks.empty();
210 }
211 
212 // Returns reference to front finished task
214 {
215  return m_finished_tasks.front();
216 }
217 
218 // Pop finished task
220 {
221  m_finished_tasks.pop();
222 }
223 
224 // Flush finished, busy and pending tasks
226 {
227  m_worker_flushed = true;
228 
229  // Flush all TaskHandler queues
230  flushQueues();
231 }
232 
233 // Terminate Master
235 {
236  m_master_manager_terminated = true;
237 }
238 
239 // Listen to messages from Managers.
240 void MPIMaster::listenToManagers()
241 {
242  // While there are any incoming messages
243  while (probeMessage())
244  {
245  // Probe manager
246  int manager_rank = probeMessageManager();
247 
248  // Receive message
249  std::string&& output_string = receiveMessage(manager_rank);
250 
251  // Receive error code
252  int error_code = receiveErrorCode(manager_rank);
253 
254  // Record output string
255  m_map_manager_to_task[manager_rank]->recordOutputAndErrorCode(
256  output_string, error_code);
257 
258  // Mark manager as idle
259  m_idle_managers.insert(manager_rank);
260  }
261 }
262 
263 // Pop finished tasks from busy queue and insert into finished queue
264 void MPIMaster::popBusyQueue()
265 {
266  // If busy queue is empty, return immediately
267  if (m_busy_tasks.empty())
268  return;
269 
270  // While there are finished tasks (or tasks where errors occured) in the
271  // front of the queue
272  while (!m_busy_tasks.empty() && !m_busy_tasks.front().isPending())
273  {
274  spdlog::debug("MPIMaster::popBusyQueue: "
275  "Moving TaskHandler from busy to finished!");
276  spdlog::debug("finished, busy, pending: {}, {}, {}",
277  m_finished_tasks.size(), m_busy_tasks.size(),
278  m_pending_tasks.size());
279 
280  // Move TaskHandler to finished tasks
281  m_finished_tasks.push(std::move(m_busy_tasks.front()));
282 
283  // Pop front TaskHandler from busy queue
284  m_busy_tasks.pop();
285 
286  spdlog::debug("MPIMaster::popBusyQueue: "
287  "Done moving TaskHandler from busy to finished!");
288  spdlog::debug("finished, busy, pending: {}, {}, {}",
289  m_finished_tasks.size(), m_busy_tasks.size(),
290  m_pending_tasks.size());
291  }
292 }
293 
294 // Delegate to Managers
295 void MPIMaster::delegateToManagers()
296 {
297  if (spdlog::get(g_program_name)->level() <= spdlog::level::debug)
298  {
299  spdlog::debug("MPIMaster::delegateToManagers: entered!");
300  spdlog::debug("Idle managers:");
301  for (auto it = m_idle_managers.begin();
302  it != m_idle_managers.end(); it++)
303  spdlog::debug("{}", *it);
304  spdlog::debug("-- END --");
305  }
306 
307  // While there are idle managers
308  auto it = m_idle_managers.begin();
309  for (; (it != m_idle_managers.end()) && !m_pending_tasks.empty(); it++)
310  {
311  spdlog::debug("MPIMaster::delegateToManagers: "
312  "Moving TaskHandler from pending to busy!");
313  spdlog::debug("finished, busy, pending: {}, {}, {}",
314  m_finished_tasks.size(), m_busy_tasks.size(),
315  m_pending_tasks.size());
316 
317  // Send message to Manager
318  sendMessageToManager(*it, m_pending_tasks.front().getInputString());
319 
320  // Move pending TaskHandler to busy queue
321  m_busy_tasks.push(std::move(m_pending_tasks.front()));
322 
323  // Pop front TaskHandler from pending queue
324  m_pending_tasks.pop();
325 
326  // Set map from Manager to TaskHandler
327  m_map_manager_to_task[*it] = &m_busy_tasks.back();
328 
329  spdlog::debug("MPIMaster::delegateToManagers: "
330  "Done moving TaskHandler from pending to busy!");
331  spdlog::debug("finished, busy, pending: {}, {}, {}",
332  m_finished_tasks.size(), m_busy_tasks.size(),
333  m_pending_tasks.size());
334  }
335 
336  // Mark Managers as busy
337  m_idle_managers.erase(m_idle_managers.begin(), it);
338 
339  if (spdlog::get(g_program_name)->level() <= spdlog::level::debug)
340  {
341  spdlog::debug("MPIMaster::delegateToManagers: exiting");
342  spdlog::debug("Idle managers:");
343  for (auto it = m_idle_managers.begin();
344  it != m_idle_managers.end(); it++)
345  spdlog::debug("{}", *it);
346  spdlog::debug("-- END --");
347  }
348 }
349 
350 // Flush all task queues (finished, busy, pending)
351 void MPIMaster::flushQueues()
352 {
353  while (!m_finished_tasks.empty()) m_finished_tasks.pop();
354  while (!m_busy_tasks.empty()) m_busy_tasks.pop();
355  while (!m_pending_tasks.empty()) m_pending_tasks.pop();
356 }
357 
358 // Discard any messages and signals until all Managers are idle
359 void MPIMaster::discardMessagesErrorCodesAndSignals()
360 {
361  // While there are any incoming messages
362  while (probeMessage())
363  {
364  // Probe manager
365  int manager_rank = probeMessageManager();
366 
367  // Receive and discard message
368  receiveMessage(manager_rank);
369 
370  // Receive and discard error code
371  receiveErrorCode(manager_rank);
372 
373  // Mark manager as idle
374  m_idle_managers.insert(manager_rank);
375  }
376 
377  // While there are any incoming signals
378  while (probeSignal())
379  {
380  // Probe manager
381  int manager_rank = probeSignalManager();
382 
383  // If it a cancellation signal, mark manager as idle
384  if (receiveSignal(manager_rank) == WORKER_FLUSHED_SIGNAL)
385  m_idle_managers.insert(manager_rank);
386  }
387 }
388 
389 // Probe for message
390 bool MPIMaster::probeMessage() const
391 {
392  return iprobe_wrapper(MPI_ANY_SOURCE, MANAGER_MSG_TAG, MPI_COMM_WORLD);
393 }
394 
395 // Probe for signal
396 bool MPIMaster::probeSignal() const
397 {
398  return iprobe_wrapper(MPI_ANY_SOURCE, MANAGER_SIGNAL_TAG, MPI_COMM_WORLD);
399 }
400 
401 // Probe for Manager rank of incoming message
402 int MPIMaster::probeMessageManager() const
403 {
404  MPI_Status status;
405  MPI_Probe(MPI_ANY_SOURCE, MANAGER_MSG_TAG, MPI_COMM_WORLD, &status);
406  return static_cast<int>(status.MPI_SOURCE);
407 }
408 
409 // Probe for Manager rank of incoming signal
410 int MPIMaster::probeSignalManager() const
411 {
412  MPI_Status status;
413  MPI_Probe(MPI_ANY_SOURCE, MANAGER_SIGNAL_TAG, MPI_COMM_WORLD, &status);
414  return static_cast<int>(status.MPI_SOURCE);
415 }
416 
417 // Receive message from Manager
418 std::string MPIMaster::receiveMessage(int manager_rank) const
419 {
420  // Sanity check: probeMessage must return true
421  assert(probeMessage());
422 
423  return receive_string(MPI_COMM_WORLD, manager_rank, MANAGER_MSG_TAG);
424 }
425 
426 // Receive signal from Manager
427 int MPIMaster::receiveSignal(int manager_rank) const
428 {
429  // Sanity check: probeSignal must return true
430  assert(probeSignal());
431 
432  return receive_integer(MPI_COMM_WORLD, manager_rank, MANAGER_SIGNAL_TAG);
433 }
434 
435 // Receive error code from Manager
436 int MPIMaster::receiveErrorCode(int manager_rank) const
437 {
438  return receive_integer(MPI_COMM_WORLD, manager_rank,
439  MANAGER_ERROR_CODE_TAG);
440 }
441 
442 // Send message to a Manager
443 void MPIMaster::sendMessageToManager(int manager_rank,
444  const std::string& message_string)
445 {
446  if (spdlog::get(g_program_name)->level() <= spdlog::level::debug)
447  {
448  spdlog::debug("MPIMaster::sendMessageToManager: "
449  "sending to manager_rank {}",
450  " and message:\n{}", manager_rank, message_string);
451  spdlog::debug("Idle managers:");
452  for (auto it = m_idle_managers.begin();
453  it != m_idle_managers.end(); it++)
454  spdlog::debug("{}", *it);
455  spdlog::debug("-- END --");
456  }
457 
458  // Ensure previous message has finished sending
459  MPI_Wait(&m_message_requests[manager_rank], MPI_STATUS_IGNORE);
460 
461  // Store message string in buffer
462  m_message_buffers[manager_rank].assign(message_string);
463 
464  // Note: Isend is used here to avoid deadlock since the Master and the root
465  // Manager are executed by the same process
466  MPI_Isend(
467  m_message_buffers[manager_rank].c_str(),
468  m_message_buffers[manager_rank].size() + 1,
469  MPI_CHAR, manager_rank, MASTER_MSG_TAG,
470  MPI_COMM_WORLD,
471  &m_message_requests[manager_rank]);
472 }
473 
474 // Send signal to all Managers
475 void MPIMaster::sendSignalToAllManagers(int signal)
476 {
477  // Ensure previous signals have finished sending
478  for (int manager_rank = 0; manager_rank < m_comm_size; manager_rank++)
479  MPI_Wait(&m_signal_requests[manager_rank], MPI_STATUS_IGNORE);
480 
481  // Store signal in buffer
482  m_signal_buffer = signal;
483 
484  // Note: Isend is used here to avoid deadlock since the Master and the root
485  // Manager are executed by the same process
486  for (int manager_rank = 0; manager_rank < m_comm_size; manager_rank++)
487  MPI_Isend(&m_signal_buffer, 1, MPI_INT, manager_rank,
488  MASTER_SIGNAL_TAG, MPI_COMM_WORLD,
489  &m_signal_requests[manager_rank]);
490 }
virtual void terminate() override
Definition: MPIMaster.cc:234
virtual void iterate() override
Definition: MPIMaster.cc:62
virtual TaskHandler & frontFinishedTask() override
Definition: MPIMaster.cc:213
virtual void pushPendingTask(const std::string &input_string) override
Definition: MPIMaster.cc:201
const char * g_program_name
Definition: main.cc:22
virtual ~MPIMaster() override
Definition: MPIMaster.cc:36
virtual bool isActive() const override
Definition: MPIMaster.cc:56
virtual void popFinishedTask() override
Definition: MPIMaster.cc:219
std::weak_ptr< AbstractController > m_p_controller
bool programTerminated() const
virtual bool needMorePendingTasks() const override
Definition: MPIMaster.cc:91
virtual bool finishedTasksEmpty() const override
Definition: MPIMaster.cc:207
virtual void flush() override
Definition: MPIMaster.cc:225
MPIMaster(bool *p_program_terminated)
Definition: MPIMaster.cc:19