Pakman
PakmanMPIWorker.hpp
1 #ifndef PAKMANMPIWORKER_HPP
2 #define PAKMANMPIWORKER_HPP
3 
4 #include <iostream>
5 #include <string>
6 #include <functional>
7 #include <mpi.h>
8 
32 {
33  public:
34 
47  PakmanMPIWorker(std::function<int(int argc, char** argv, const
48  std::string& input_string, std::string& output_string)>
49  simulator);
50 
52  ~PakmanMPIWorker() = default;
53 
61  int run(int argc, char*argv[]);
62 
64  static constexpr int PAKMAN_EXIT_SUCCESS = 0;
65 
67  static constexpr int PAKMAN_EXIT_FAILURE = 1;
68 
69  private:
70 
71  // Get parent communicator
72  static MPI_Comm getParentComm();
73 
74  // Receive message from Pakman Manager
75  std::string receiveMessage();
76 
77  // Receive signal from Pakman Manager
78  int receiveSignal();
79 
80  // Send message to Pakman Manager
81  void sendMessage(const std::string& message_string);
82 
83  // Send error code to Pakman Manager
84  void sendErrorCode(int error_code);
85 
86  // Parent communicator
87  MPI_Comm m_parent_comm = MPI_COMM_NULL;
88 
89  // Simulator function
90  std::function<int(int argc, char** argv, const std::string&
91  input_string, std::string& output_string)> m_simulator;
92 
93  // Parent communication constants
94  static constexpr int PAKMAN_ROOT = 0;
95  static constexpr int PAKMAN_MANAGER_MSG_TAG = 2;
96  static constexpr int PAKMAN_MANAGER_SIGNAL_TAG = 3;
97  static constexpr int PAKMAN_WORKER_MSG_TAG = 5;
98  static constexpr int PAKMAN_WORKER_ERROR_CODE_TAG = 6;
99 
100  static constexpr int PAKMAN_TERMINATE_WORKER_SIGNAL = 0;
101 };
102 
103 // Constructor
105  std::function<int(int argc, char** argv, const std::string& input_string,
106  std::string& output_string)> simulator)
107 : m_simulator(simulator)
108 {
109 }
110 
111 int PakmanMPIWorker::run(int argc, char*argv[])
112 {
113  // Get parent communicator
114  m_parent_comm = getParentComm();
115 
116  // Check if I was spawned
117  if (m_parent_comm == MPI_COMM_NULL)
118  {
119  std::cerr << "Pakman Worker error: "
120  "MPI Worker was not spawned, exiting...\n";
121  return PAKMAN_EXIT_FAILURE;
122  }
123 
124  // Start loop
125  bool continue_loop = true;
126  while (continue_loop)
127  {
128  // Probe for message
129  MPI_Status status;
130  MPI_Probe(PAKMAN_ROOT, MPI_ANY_TAG, m_parent_comm, &status);
131 
132  // Check tag
133  switch (status.MPI_TAG)
134  {
135  case PAKMAN_MANAGER_MSG_TAG:
136  {
137  // Receive message
138  std::string input_string = receiveMessage();
139 
140  // Run simulation
141  std::string output_string;
142  int error_code = m_simulator(argc, argv, input_string,
143  output_string);
144 
145  // Send output
146  sendMessage(output_string);
147 
148  // Send error code
149  sendErrorCode(error_code);
150 
151  break;
152  }
153  case PAKMAN_MANAGER_SIGNAL_TAG:
154  {
155  // Receive signal
156  int signal = receiveSignal();
157 
158  // Check signal
159  switch (signal)
160  {
161  case PAKMAN_TERMINATE_WORKER_SIGNAL:
162  {
163  // Set loop condition to false
164  continue_loop = false;
165  break;
166  }
167  default:
168  {
169  std::cerr << "Pakman Worker error: signal not recognised, "
170  "exiting...\n";
171  return PAKMAN_EXIT_FAILURE;
172  }
173  }
174  break;
175  }
176  default:
177  {
178  std::cerr << "Pakman Worker error: "
179  "tag not recognised, exiting...\n";
180  return PAKMAN_EXIT_FAILURE;
181  }
182  }
183  }
184 
185  // Disconnect parent communicator
186  MPI_Comm_disconnect(&m_parent_comm);
187 
188  // Return successful error code
189  return PAKMAN_EXIT_SUCCESS;
190 }
191 
192 // Get parent communicator
193 MPI_Comm PakmanMPIWorker::getParentComm()
194 {
195  MPI_Comm parent_comm;
196  MPI_Comm_get_parent(&parent_comm);
197 
198  return parent_comm;
199 }
200 
201 // Receive message from Pakman Manager
202 std::string PakmanMPIWorker::receiveMessage()
203 {
204  // Probe to get status
205  MPI_Status status;
206  MPI_Probe(PAKMAN_ROOT, PAKMAN_MANAGER_MSG_TAG, m_parent_comm, &status);
207 
208  // Receive string
209  int count = 0;
210  MPI_Get_count(&status, MPI_CHAR, &count);
211  char *buffer = new char[count];
212  MPI_Recv(buffer, count, MPI_CHAR, PAKMAN_ROOT, PAKMAN_MANAGER_MSG_TAG,
213  m_parent_comm, MPI_STATUS_IGNORE);
214 
215  // Return string
216  std::string message(buffer);
217  delete[] buffer;
218  return message;
219 }
220 
221 // Receive signal from Pakman Manager
222 int PakmanMPIWorker::receiveSignal()
223 {
224  // Receive signal
225  int signal;
226  MPI_Recv(&signal, 1, MPI_INT, PAKMAN_ROOT, PAKMAN_MANAGER_SIGNAL_TAG,
227  m_parent_comm, MPI_STATUS_IGNORE);
228 
229  // Return signal
230  return signal;
231 }
232 
233 // Send message to Pakman Manager
234 void PakmanMPIWorker::sendMessage(const std::string& message_string)
235 {
236  // Send message
237  MPI_Send(message_string.c_str(), message_string.size() + 1, MPI_CHAR,
238  PAKMAN_ROOT, PAKMAN_WORKER_MSG_TAG, m_parent_comm);
239 }
240 
241 // Send error code to Pakman Manager
242 void PakmanMPIWorker::sendErrorCode(int error_code)
243 {
244  // Send error code
245  MPI_Send(&error_code, 1, MPI_INT, PAKMAN_ROOT,
246  PAKMAN_WORKER_ERROR_CODE_TAG, m_parent_comm);
247 }
248 
249 #endif // PAKMANMPIWORKER_HPP
PakmanMPIWorker(std::function< int(int argc, char **argv, const std::string &input_string, std::string &output_string)> simulator)
static constexpr int PAKMAN_EXIT_FAILURE
int run(int argc, char *argv[])
static constexpr int PAKMAN_EXIT_SUCCESS
~PakmanMPIWorker()=default