1 #ifndef PAKMANMPIWORKER_HPP 2 #define PAKMANMPIWORKER_HPP 48 std::string& input_string, std::string& output_string)>
61 int run(
int argc,
char*argv[]);
72 static MPI_Comm getParentComm();
75 std::string receiveMessage();
81 void sendMessage(
const std::string& message_string);
84 void sendErrorCode(
int error_code);
87 MPI_Comm m_parent_comm = MPI_COMM_NULL;
90 std::function<int(
int argc,
char** argv,
const std::string&
91 input_string, std::string& output_string)> m_simulator;
94 static constexpr
int PAKMAN_ROOT = 0;
95 static constexpr
int PAKMAN_MANAGER_MSG_TAG = 2;
96 static constexpr
int PAKMAN_MANAGER_SIGNAL_TAG = 3;
97 static constexpr
int PAKMAN_WORKER_MSG_TAG = 5;
98 static constexpr
int PAKMAN_WORKER_ERROR_CODE_TAG = 6;
100 static constexpr
int PAKMAN_TERMINATE_WORKER_SIGNAL = 0;
105 std::function<
int(
int argc,
char** argv,
const std::string& input_string,
106 std::string& output_string)> simulator)
107 : m_simulator(simulator)
114 m_parent_comm = getParentComm();
117 if (m_parent_comm == MPI_COMM_NULL)
119 std::cerr <<
"Pakman Worker error: " 120 "MPI Worker was not spawned, exiting...\n";
125 bool continue_loop =
true;
126 while (continue_loop)
130 MPI_Probe(PAKMAN_ROOT, MPI_ANY_TAG, m_parent_comm, &status);
133 switch (status.MPI_TAG)
135 case PAKMAN_MANAGER_MSG_TAG:
138 std::string input_string = receiveMessage();
141 std::string output_string;
142 int error_code = m_simulator(argc, argv, input_string,
146 sendMessage(output_string);
149 sendErrorCode(error_code);
153 case PAKMAN_MANAGER_SIGNAL_TAG:
156 int signal = receiveSignal();
161 case PAKMAN_TERMINATE_WORKER_SIGNAL:
164 continue_loop =
false;
169 std::cerr <<
"Pakman Worker error: signal not recognised, " 178 std::cerr <<
"Pakman Worker error: " 179 "tag not recognised, exiting...\n";
186 MPI_Comm_disconnect(&m_parent_comm);
193 MPI_Comm PakmanMPIWorker::getParentComm()
195 MPI_Comm parent_comm;
196 MPI_Comm_get_parent(&parent_comm);
202 std::string PakmanMPIWorker::receiveMessage()
206 MPI_Probe(PAKMAN_ROOT, PAKMAN_MANAGER_MSG_TAG, m_parent_comm, &status);
210 MPI_Get_count(&status, MPI_CHAR, &count);
211 char *buffer =
new char[count];
212 MPI_Recv(buffer, count, MPI_CHAR, PAKMAN_ROOT, PAKMAN_MANAGER_MSG_TAG,
213 m_parent_comm, MPI_STATUS_IGNORE);
216 std::string message(buffer);
222 int PakmanMPIWorker::receiveSignal()
226 MPI_Recv(&signal, 1, MPI_INT, PAKMAN_ROOT, PAKMAN_MANAGER_SIGNAL_TAG,
227 m_parent_comm, MPI_STATUS_IGNORE);
234 void PakmanMPIWorker::sendMessage(
const std::string& message_string)
237 MPI_Send(message_string.c_str(), message_string.size() + 1, MPI_CHAR,
238 PAKMAN_ROOT, PAKMAN_WORKER_MSG_TAG, m_parent_comm);
242 void PakmanMPIWorker::sendErrorCode(
int error_code)
245 MPI_Send(&error_code, 1, MPI_INT, PAKMAN_ROOT,
246 PAKMAN_WORKER_ERROR_CODE_TAG, m_parent_comm);
249 #endif // PAKMANMPIWORKER_HPP PakmanMPIWorker(std::function< int(int argc, char **argv, const std::string &input_string, std::string &output_string)> simulator)
static constexpr int PAKMAN_EXIT_FAILURE
int run(int argc, char *argv[])
static constexpr int PAKMAN_EXIT_SUCCESS
~PakmanMPIWorker()=default