10 #include "spdlog/spdlog.h" 12 #include "mpi/mpi_utils.h" 13 #include "mpi/mpi_common.h" 14 #include "controller/AbstractController.h" 16 #include "MPIMaster.h" 21 m_comm_size(get_mpi_comm_world_size()),
22 m_map_manager_to_task(get_mpi_comm_world_size()),
23 m_message_buffers(get_mpi_comm_world_size())
27 for (
int i = 0; i < m_comm_size; i++)
29 m_message_requests.push_back(MPI_REQUEST_NULL);
30 m_signal_requests.push_back(MPI_REQUEST_NULL);
31 m_idle_managers.insert(i);
40 MPI_Finalized(&finalized);
46 for (
int i = 0; i < m_comm_size; i++)
48 if (m_message_requests[i] != MPI_REQUEST_NULL)
49 MPI_Request_free(&m_message_requests[i]);
50 if (m_signal_requests[i] != MPI_REQUEST_NULL)
51 MPI_Request_free(&m_signal_requests[i]);
58 return m_state != terminated;
70 assert(m_state != terminated);
93 return m_pending_tasks.size() < m_comm_size;
97 void MPIMaster::doNormalStuff()
101 assert(m_state != terminated);
107 m_state = terminated;
118 p_controller->iterate();
121 if (m_master_manager_terminated)
124 sendSignalToAllManagers(TERMINATE_MANAGER_SIGNAL);
127 m_state = terminated;
132 if (m_worker_flushed)
134 spdlog::debug(
"MPIMaster::doNormalStuff: Flushing workers!");
137 sendSignalToAllManagers(FLUSH_WORKER_SIGNAL);
140 m_worker_flushed =
false;
148 delegateToManagers();
152 void MPIMaster::doFlushingStuff()
156 assert(m_state != terminated);
162 m_state = terminated;
167 if (m_master_manager_terminated)
170 sendSignalToAllManagers(TERMINATE_MANAGER_SIGNAL);
173 m_state = terminated;
178 discardMessagesErrorCodesAndSignals();
181 if (m_idle_managers.size() == m_comm_size)
186 spdlog::debug(
"MPIMaster::doFlushingStuff: " 187 "transition to normal state!");
188 spdlog::debug(
"Idle managers:");
189 for (
auto it = m_idle_managers.begin();
190 it != m_idle_managers.end(); it++)
191 spdlog::debug(
"{}", *it);
192 spdlog::debug(
"-- END --");
203 m_pending_tasks.push(input_string);
209 return m_finished_tasks.empty();
215 return m_finished_tasks.front();
221 m_finished_tasks.pop();
227 m_worker_flushed =
true;
236 m_master_manager_terminated =
true;
240 void MPIMaster::listenToManagers()
243 while (probeMessage())
246 int manager_rank = probeMessageManager();
249 std::string&& output_string = receiveMessage(manager_rank);
252 int error_code = receiveErrorCode(manager_rank);
255 m_map_manager_to_task[manager_rank]->recordOutputAndErrorCode(
256 output_string, error_code);
259 m_idle_managers.insert(manager_rank);
264 void MPIMaster::popBusyQueue()
267 if (m_busy_tasks.empty())
272 while (!m_busy_tasks.empty() && !m_busy_tasks.front().isPending())
274 spdlog::debug(
"MPIMaster::popBusyQueue: " 275 "Moving TaskHandler from busy to finished!");
276 spdlog::debug(
"finished, busy, pending: {}, {}, {}",
277 m_finished_tasks.size(), m_busy_tasks.size(),
278 m_pending_tasks.size());
281 m_finished_tasks.push(std::move(m_busy_tasks.front()));
286 spdlog::debug(
"MPIMaster::popBusyQueue: " 287 "Done moving TaskHandler from busy to finished!");
288 spdlog::debug(
"finished, busy, pending: {}, {}, {}",
289 m_finished_tasks.size(), m_busy_tasks.size(),
290 m_pending_tasks.size());
295 void MPIMaster::delegateToManagers()
299 spdlog::debug(
"MPIMaster::delegateToManagers: entered!");
300 spdlog::debug(
"Idle managers:");
301 for (
auto it = m_idle_managers.begin();
302 it != m_idle_managers.end(); it++)
303 spdlog::debug(
"{}", *it);
304 spdlog::debug(
"-- END --");
308 auto it = m_idle_managers.begin();
309 for (; (it != m_idle_managers.end()) && !m_pending_tasks.empty(); it++)
311 spdlog::debug(
"MPIMaster::delegateToManagers: " 312 "Moving TaskHandler from pending to busy!");
313 spdlog::debug(
"finished, busy, pending: {}, {}, {}",
314 m_finished_tasks.size(), m_busy_tasks.size(),
315 m_pending_tasks.size());
318 sendMessageToManager(*it, m_pending_tasks.front().getInputString());
321 m_busy_tasks.push(std::move(m_pending_tasks.front()));
324 m_pending_tasks.pop();
327 m_map_manager_to_task[*it] = &m_busy_tasks.back();
329 spdlog::debug(
"MPIMaster::delegateToManagers: " 330 "Done moving TaskHandler from pending to busy!");
331 spdlog::debug(
"finished, busy, pending: {}, {}, {}",
332 m_finished_tasks.size(), m_busy_tasks.size(),
333 m_pending_tasks.size());
337 m_idle_managers.erase(m_idle_managers.begin(), it);
341 spdlog::debug(
"MPIMaster::delegateToManagers: exiting");
342 spdlog::debug(
"Idle managers:");
343 for (
auto it = m_idle_managers.begin();
344 it != m_idle_managers.end(); it++)
345 spdlog::debug(
"{}", *it);
346 spdlog::debug(
"-- END --");
351 void MPIMaster::flushQueues()
353 while (!m_finished_tasks.empty()) m_finished_tasks.pop();
354 while (!m_busy_tasks.empty()) m_busy_tasks.pop();
355 while (!m_pending_tasks.empty()) m_pending_tasks.pop();
359 void MPIMaster::discardMessagesErrorCodesAndSignals()
362 while (probeMessage())
365 int manager_rank = probeMessageManager();
368 receiveMessage(manager_rank);
371 receiveErrorCode(manager_rank);
374 m_idle_managers.insert(manager_rank);
378 while (probeSignal())
381 int manager_rank = probeSignalManager();
384 if (receiveSignal(manager_rank) == WORKER_FLUSHED_SIGNAL)
385 m_idle_managers.insert(manager_rank);
390 bool MPIMaster::probeMessage()
const 392 return iprobe_wrapper(MPI_ANY_SOURCE, MANAGER_MSG_TAG, MPI_COMM_WORLD);
396 bool MPIMaster::probeSignal()
const 398 return iprobe_wrapper(MPI_ANY_SOURCE, MANAGER_SIGNAL_TAG, MPI_COMM_WORLD);
402 int MPIMaster::probeMessageManager()
const 405 MPI_Probe(MPI_ANY_SOURCE, MANAGER_MSG_TAG, MPI_COMM_WORLD, &status);
406 return static_cast<int>(status.MPI_SOURCE);
410 int MPIMaster::probeSignalManager()
const 413 MPI_Probe(MPI_ANY_SOURCE, MANAGER_SIGNAL_TAG, MPI_COMM_WORLD, &status);
414 return static_cast<int>(status.MPI_SOURCE);
418 std::string MPIMaster::receiveMessage(
int manager_rank)
const 421 assert(probeMessage());
423 return receive_string(MPI_COMM_WORLD, manager_rank, MANAGER_MSG_TAG);
427 int MPIMaster::receiveSignal(
int manager_rank)
const 430 assert(probeSignal());
432 return receive_integer(MPI_COMM_WORLD, manager_rank, MANAGER_SIGNAL_TAG);
436 int MPIMaster::receiveErrorCode(
int manager_rank)
const 438 return receive_integer(MPI_COMM_WORLD, manager_rank,
439 MANAGER_ERROR_CODE_TAG);
443 void MPIMaster::sendMessageToManager(
int manager_rank,
444 const std::string& message_string)
448 spdlog::debug(
"MPIMaster::sendMessageToManager: " 449 "sending to manager_rank {}",
450 " and message:\n{}", manager_rank, message_string);
451 spdlog::debug(
"Idle managers:");
452 for (
auto it = m_idle_managers.begin();
453 it != m_idle_managers.end(); it++)
454 spdlog::debug(
"{}", *it);
455 spdlog::debug(
"-- END --");
459 MPI_Wait(&m_message_requests[manager_rank], MPI_STATUS_IGNORE);
462 m_message_buffers[manager_rank].assign(message_string);
467 m_message_buffers[manager_rank].c_str(),
468 m_message_buffers[manager_rank].size() + 1,
469 MPI_CHAR, manager_rank, MASTER_MSG_TAG,
471 &m_message_requests[manager_rank]);
475 void MPIMaster::sendSignalToAllManagers(
int signal)
478 for (
int manager_rank = 0; manager_rank < m_comm_size; manager_rank++)
479 MPI_Wait(&m_signal_requests[manager_rank], MPI_STATUS_IGNORE);
482 m_signal_buffer = signal;
486 for (
int manager_rank = 0; manager_rank < m_comm_size; manager_rank++)
487 MPI_Isend(&m_signal_buffer, 1, MPI_INT, manager_rank,
488 MASTER_SIGNAL_TAG, MPI_COMM_WORLD,
489 &m_signal_requests[manager_rank]);
virtual void terminate() override
virtual void iterate() override
virtual TaskHandler & frontFinishedTask() override
virtual void pushPendingTask(const std::string &input_string) override
const char * g_program_name
virtual ~MPIMaster() override
virtual bool isActive() const override
virtual void popFinishedTask() override
std::weak_ptr< AbstractController > m_p_controller
bool programTerminated() const
virtual bool needMorePendingTasks() const override
virtual bool finishedTasksEmpty() const override
virtual void flush() override
MPIMaster(bool *p_program_terminated)