Pakman
MPIWorkerHandler.cc
1 #include <string>
2 #include <thread>
3 
4 #include "core/common.h"
5 #include "mpi/mpi_utils.h"
6 #include "mpi/mpi_common.h"
7 #include "mpi/spawn.h"
8 
9 #include "MPIWorkerHandler.h"
10 
11 // Initialize static child communicator of MPIWorkerHandler to the
12 // null communicators (MPI_COMM_NULL)
13 MPI_Comm MPIWorkerHandler::s_child_comm = MPI_COMM_NULL;
14 
16  const std::string& input_string) :
17  AbstractWorkerHandler(simulator, input_string)
18 {
19  // Spawn MPI child process if it has not yet been spawned
20  if (s_child_comm == MPI_COMM_NULL)
21  s_child_comm = spawn_worker(m_simulator);
22 
23  // Write input string to spawned MPI process
24  MPI_Send(input_string.c_str(), input_string.size() + 1, MPI_CHAR,
25  WORKER_RANK, MANAGER_MSG_TAG, s_child_comm);
26 }
27 
29 {
30  // Just discard results, do not terminate MPI child process
31  discardResults();
32 }
33 
35 {
36  // Probe for result if result has not yet been received
37  if ( !m_result_received &&
38  iprobe_wrapper(WORKER_RANK, WORKER_MSG_TAG, s_child_comm))
39  {
40  // Receive message
41  m_output_buffer.assign(receiveMessage());
42 
43  // Receive error code
44  m_error_code = receiveErrorCode();
45 
46  // Set flag
47  m_result_received = true;
48  }
49 
50  return m_result_received;
51 }
52 
53 std::string MPIWorkerHandler::receiveMessage() const
54 {
55  return receive_string(s_child_comm, WORKER_RANK, WORKER_MSG_TAG);
56 }
57 
58 int MPIWorkerHandler::receiveErrorCode() const
59 {
60  return receive_integer(s_child_comm, WORKER_RANK, WORKER_ERROR_CODE_TAG);
61 }
62 
63 void MPIWorkerHandler::discardResults()
64 {
65  // MPI does not provide process control, so
66  // we can only wait for the simulation to finish
67  // if it has not finished yet
68  if (!m_result_received)
69  {
70  // Timeout if message is not ready yet
71  while (!iprobe_wrapper(WORKER_RANK, WORKER_MSG_TAG, s_child_comm))
72  std::this_thread::sleep_for(g_main_timeout);
73 
74  // Receive message
75  receiveMessage();
76 
77  // Receive error code
78  receiveErrorCode();
79 
80  // Set flag
81  m_result_received = true;
82  }
83 }
84 
86 {
87  // If this function is called, the Worker must be in an idle state, so it
88  // is not necessary to discard results from Worker.
89 
90  // If s_child_comm is the null communicator, the Worker has already been
91  // terminated, so nothing needs to be done.
92  if (s_child_comm == MPI_COMM_NULL)
93  return;
94 
95  // Else, send termination signal to Worker
96  int signal = TERMINATE_WORKER_SIGNAL;
97  MPI_Send(&signal, 1, MPI_INT, WORKER_RANK, MANAGER_SIGNAL_TAG,
98  s_child_comm);
99 
100  // Free communicator
101  MPI_Comm_disconnect(&s_child_comm);
102 }
std::chrono::milliseconds g_main_timeout
static void terminateStatic()
virtual ~MPIWorkerHandler() override
virtual bool isDone() override
MPIWorkerHandler(const Command &simulator, const std::string &input_string)