Pakman
MPIMasterStatic.cc
1 #include <thread>
2 #include <string>
3 #include <iostream>
4 #include <memory>
5 
6 #include <mpi.h>
7 
8 #include <getopt.h>
9 
10 #include "core/common.h"
11 #include "core/utils.h"
12 #include "core/LongOptions.h"
13 #include "core/Arguments.h"
14 #include "system/signal_handler.h"
15 #include "mpi/mpi_utils.h"
16 #include "mpi/mpi_common.h"
17 #include "main/help.h"
18 #include "controller/AbstractController.h"
19 
20 #include "Manager.h"
21 #include "MPIWorkerHandler.h"
22 
23 #include "MPIMaster.h"
24 
25 // Global MPI_Info
26 MPI_Info g_info;
27 
28 // Static help function
29 std::string MPIMaster::help()
30 {
31  return
32 R"(* Help message for 'mpi' master *
33 
34 Description:
35  The MPI master parallelizes instances of the simulator, also called
36  "workers", across all launched MPI processes. This means that every MPI
37  process is responsible for spawning workers. The correspondence between
38  workers and MPI processes is one-to-one; launching N MPI processes results in
39  N workers running in parallel.
40 
41  If no optional arguments are given, the simulator is, by default, assumed to
42  be a standard simulator, which means that it communicates with pakman
43  through its stdin and stdout.
44 
45  If the optional argument --mpi-simulator is given, the simulator is assumed
46  to communicate with pakman through MPI. The MPI simulator must then be
47  written with the header pakman_mpi_worker.h or PakmanMPIWorker.hpp.
48 
49  In order to maximize the number of CPU cycles devoted to the workers, the MPI
50  master is implemented using an event loop. The time spent sleeping at each
51  iteration of the event loop can be adjusted using the optional argument
52  --main-timeout.
53 
54  When a worker needs to be shut down, for example when the algorithm has
55  finished, pakman first sends SIGTERM to the worker. If the worker has not
56  exited after a fixed amount of time, it is killed by sending the SIGKILL
57  signal. The amount of time between sending SIGTERM and SIGKILL can be
58  changed using the optional argument --kill-timeout. This is only meaningful
59  for standard simulators because the MPI standard does not support signals for
60  processes that are spawned using MPI functions.
61 
62  Some MPI implementations do not automatically spawn dynamic MPI processes on
63  the same host as the spawning MPI process. The flag --force-host-spawn tries
64  to enforce spawning dynamic MPI processes on the same host by setting the
65  "host" key in MPI_Info to the same host as the spawning MPI process.
66 
67 MPI master options:
68  -m, --mpi-simulator simulator is spawned using MPI
69  -f, --force-host-spawn force MPI simulator to spawn on same host
70  as manager (requires -m option)
71  -p, --mpi-info=KEY_VAL_STR specify key-value pairs for MPI_Info object
72  to MPI_Comm_spawn as
73  'KEY1=VALUE1; KEY2=VALUE2; ...; KEYN=VALUEN'
74  (requires -m option). The characters '=' and
75  ';' can be escaped using a backslash.
76  -t, --main-timeout=TIME sleep for TIME ms in event loop (default 1)
77  -k, --kill-timeout=TIME wait for TIME ms before sending SIGKILL
78  (default 100)
79 )";
80 }
81 
82 Manager::worker_t get_worker(bool mpi_simulator)
83 {
84  if (mpi_simulator)
85  {
86  return Manager::mpi_worker;
87  }
88  else
89  return Manager::forked_worker;
90 }
91 
92 // Static addLongOptions function
94 {
95  lopts.add({"main-timeout", required_argument, nullptr, 't'});
96  lopts.add({"kill-timeout", required_argument, nullptr, 'k'});
97  lopts.add({"mpi-simulator", no_argument, nullptr, 'm'});
98  lopts.add({"force-host-spawn", no_argument, nullptr, 'f'});
99  lopts.add({"mpi-info", required_argument, nullptr, 'p'});
100 }
101 
102 // Static main function
103 void MPIMaster::run(controller_t controller, const Arguments& args)
104 {
105  // Initialize flags for mpi simulator and persistence
106  bool mpi_simulator = false;
107 
108  // Process optional arguments
109  if (args.isOptionalArgumentSet("main-timeout"))
110  {
111  std::string&& arg = args.optionalArgument("main-timeout");
112  g_main_timeout = std::chrono::milliseconds(std::stoi(arg));
113  }
114 
115  if (args.isOptionalArgumentSet("kill-timeout"))
116  {
117  std::string&& arg = args.optionalArgument("kill-timeout");
118  g_kill_timeout = std::chrono::milliseconds(std::stoi(arg));
119  }
120 
121  if (args.isOptionalArgumentSet("mpi-simulator"))
122  {
123  mpi_simulator = true;
124 
125  if (args.isOptionalArgumentSet("force-host-spawn"))
126  g_force_host_spawn = true;
127  }
128  else if (args.isOptionalArgumentSet("force-host-spawn"))
129  {
130  std::cout << "Error: option --mpi-simulator must be set "
131  "if --force-host-spawn is set\n";
132  ::help(mpi, controller, EXIT_FAILURE);
133  }
134  else if (args.isOptionalArgumentSet("mpi-info"))
135  {
136  std::cout << "Error: option --mpi-simulator must be set "
137  "if --mpi-info is set\n";
138  ::help(mpi, controller, EXIT_FAILURE);
139  }
140 
141  // Initialize the MPI environment
142  MPI_Init(nullptr, nullptr);
143 
144  // Create MPI_Info if using MPI simulator
145  if (mpi_simulator)
146  {
147  MPI_Info_create(&g_info);
148 
149  if (args.isOptionalArgumentSet("mpi-info"))
150  {
151  std::string&& arg = args.optionalArgument("mpi-info");
152  std::map<std::string, std::string> dict = parse_key_value_pairs(arg);
153 
154  for (auto it = dict.begin(); it != dict.end(); ++it)
155  MPI_Info_set(g_info, it->first.c_str(), it->second.c_str());
156  }
157  }
158 
159  // Get rank
160  int rank = 0;
161  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
162 
163  // Set signal handler
164  set_signal_handler();
165 
166  // Determine Worker type
167  Manager::worker_t worker_type =
168  get_worker(mpi_simulator);
169 
170  // Create controller
171  std::shared_ptr<AbstractController>
172  p_controller(AbstractController::makeController(controller, args));
173 
174  // Create Manager object
175  auto p_manager = std::make_shared<Manager>(p_controller->getSimulator(),
176  worker_type, &g_program_terminated);
177 
178  if (rank == 0)
179  {
180  // Create MPI master
181  auto p_master = std::make_shared<MPIMaster>(&g_program_terminated);
182 
183  // Associate with each other
184  p_master->assignController(p_controller);
185  p_controller->assignMaster(p_master);
186 
187  // Master & Manager event loop
188  while (p_master->isActive() || p_manager->isActive())
189  {
190  if (p_master->isActive())
191  p_master->iterate();
192 
193  if (p_manager->isActive())
194  p_manager->iterate();
195 
196  std::this_thread::sleep_for(g_main_timeout);
197  }
198  }
199  else
200  {
201  // Manager event loop
202  while (p_manager->isActive())
203  {
204  p_manager->iterate();
205 
206  std::this_thread::sleep_for(g_main_timeout);
207  }
208  }
209 
210  // Destroy Manager and Controller
211  p_manager.reset();
212  p_controller.reset();
213 
214  // Destroy g_info if it was allocated
215  if (mpi_simulator)
216  MPI_Info_free(&g_info);
217 
218  // Terminate any remaining Workers
220 
221  // Finalize
222  MPI_Finalize();
223 }
224 
225 // Static cleanup function
227 {
228  // Terminate all managers
229  int comm_size = get_mpi_comm_world_size();
230  int signal = TERMINATE_MANAGER_SIGNAL;
231 
232  for (int manager_rank = 1; manager_rank < comm_size; manager_rank++)
233  MPI_Send(&signal, 1, MPI_INT, manager_rank,
234  MASTER_SIGNAL_TAG, MPI_COMM_WORLD);
235 
236  // Terminate Worker associated with MPI process with rank 0
238 
239  // Finalize MPI if not yet finalized
240  int is_finalized = 0;
241  MPI_Finalized(&is_finalized);
242 
243  if (!is_finalized)
244  MPI_Finalize();
245 }
std::map< std::string, std::string > parse_key_value_pairs(const std::string &str)
Definition: utils.cc:209
static void cleanup()
controller_t
Definition: common.h:45
bool g_force_host_spawn
Definition: main.cc:29
bool g_program_terminated
Definition: main.cc:32
std::chrono::milliseconds g_kill_timeout
worker_t
Definition: Manager.h:36
std::chrono::milliseconds g_main_timeout
static void run(controller_t controller, const Arguments &args)
static void terminateStatic()
bool isOptionalArgumentSet(const std::string &option_name) const
Definition: Arguments.cc:38
static AbstractController * makeController(controller_t controller, const Arguments &args)
static std::string help()
void add(struct option long_opt)
Definition: LongOptions.cc:20
static void addLongOptions(LongOptions &lopts)
std::string optionalArgument(const std::string &option_name) const
Definition: Arguments.cc:45