8 #include "spdlog/spdlog.h" 11 #include "mpi/mpi_common.h" 12 #include "mpi/mpi_utils.h" 14 #include "ForkedWorkerHandler.h" 15 #include "MPIWorkerHandler.h" 22 bool *p_program_terminated) :
23 m_simulator(simulator),
24 m_worker_type(worker_type),
25 m_p_program_terminated(p_program_terminated)
34 MPI_Finalized(&finalized);
40 if (m_message_request != MPI_REQUEST_NULL)
41 MPI_Request_free(&m_message_request);
42 if (m_signal_request != MPI_REQUEST_NULL)
43 MPI_Request_free(&m_signal_request);
44 if (m_error_code_request != MPI_REQUEST_NULL)
45 MPI_Request_free(&m_error_code_request);
51 return m_state != terminated;
59 assert(m_state != terminated);
78 void Manager::doIdleStuff()
81 assert(!m_p_worker_handler);
84 if (*m_p_program_terminated)
94 switch (receiveSignal())
96 case TERMINATE_MANAGER_SIGNAL:
98 spdlog::debug(
"Idle manager {}/{}: received " 99 "TERMINATE_MANAGER_SIGNAL!",
100 get_mpi_comm_world_rank(), get_mpi_comm_world_size());
103 m_state = terminated;
109 case FLUSH_WORKER_SIGNAL:
111 spdlog::debug(
"Idle manager {}/{}: received " 112 "FLUSH_WORKER_SIGNAL!",
113 get_mpi_comm_world_rank(), get_mpi_comm_world_size());
127 spdlog::debug(
"Idle manager {}/{}: received message!",
128 get_mpi_comm_world_rank(), get_mpi_comm_world_size());
131 std::string input_string = receiveMessage();
132 createWorker(input_string);
141 void Manager::doBusyStuff()
144 assert(m_p_worker_handler);
147 if (*m_p_program_terminated)
153 m_state = terminated;
160 switch (receiveSignal())
162 case TERMINATE_MANAGER_SIGNAL:
164 spdlog::debug(
"Busy manager {}/{}: received " 165 "TERMINATE_MANAGER_SIGNAL!",
166 get_mpi_comm_world_rank(), get_mpi_comm_world_size());
172 m_state = terminated;
175 case FLUSH_WORKER_SIGNAL:
177 spdlog::debug(
"Busy manager {}/{}: received " 178 "FLUSH_WORKER_SIGNAL!",
179 get_mpi_comm_world_rank(), get_mpi_comm_world_size());
185 sendSignalToMaster(WORKER_FLUSHED_SIGNAL);
199 if (m_p_worker_handler->isDone())
201 spdlog::debug(
"Busy manager {}/{}: Worker is done!",
202 get_mpi_comm_world_rank(), get_mpi_comm_world_size());
205 std::string output_string = m_p_worker_handler->getOutput();
208 int error_code = m_p_worker_handler->getErrorCode();
211 sendMessageToMaster(output_string);
214 sendErrorCodeToMaster(error_code);
225 assert(probeMessage() ==
false);
229 void Manager::createWorker(
const std::string& input_string)
232 assert(!m_p_worker_handler);
235 switch (m_worker_type)
240 std::unique_ptr<ForkedWorkerHandler>(
247 std::unique_ptr<MPIWorkerHandler>(
252 throw std::runtime_error(
"Worker type not recognised");
257 void Manager::flushWorker()
261 assert(m_p_worker_handler);
265 m_p_worker_handler.reset();
269 void Manager::terminateWorker()
273 assert(m_p_worker_handler);
276 m_p_worker_handler.reset();
280 bool Manager::probeMessage()
const 282 return iprobe_wrapper(MASTER_RANK, MASTER_MSG_TAG, MPI_COMM_WORLD);
286 bool Manager::probeSignal()
const 288 return iprobe_wrapper(MASTER_RANK, MASTER_SIGNAL_TAG, MPI_COMM_WORLD);
292 std::string Manager::receiveMessage()
const 295 assert(probeMessage());
297 return receive_string(MPI_COMM_WORLD, MASTER_RANK, MASTER_MSG_TAG);
301 int Manager::receiveSignal()
const 304 assert(probeSignal());
306 return receive_integer(MPI_COMM_WORLD, MASTER_RANK, MASTER_SIGNAL_TAG);
310 void Manager::sendMessageToMaster(
const std::string& message_string)
313 MPI_Wait(&m_message_request, MPI_STATUS_IGNORE);
316 m_message_buffer.assign(message_string);
321 m_message_buffer.c_str(),
322 m_message_buffer.size() + 1,
323 MPI_CHAR, MASTER_RANK, MANAGER_MSG_TAG,
329 void Manager::sendSignalToMaster(
int signal)
332 MPI_Wait(&m_signal_request, MPI_STATUS_IGNORE);
335 m_signal_buffer = signal;
339 MPI_Isend(&m_signal_buffer, 1, MPI_INT, MASTER_RANK,
340 MANAGER_SIGNAL_TAG, MPI_COMM_WORLD,
345 void Manager::sendErrorCodeToMaster(
int error_code)
348 MPI_Wait(&m_error_code_request, MPI_STATUS_IGNORE);
351 m_error_code_buffer = error_code;
355 MPI_Isend(&m_error_code_buffer, 1, MPI_INT, MASTER_RANK,
356 MANAGER_ERROR_CODE_TAG, MPI_COMM_WORLD,
357 &m_error_code_request);
Manager(const Command &simulator, worker_t worker_type, bool *p_program_terminated)