DASH  0.3.0
main.cpp
1 
5 #include <libdash.h>
6 
7 #include <iostream>
8 #include <iomanip>
9 #include <vector>
10 #include <string>
11 #include <cstring>
12 #include <limits>
13 
14 #ifdef DASH_ENABLE_IPM
15 #include <mpi.h>
16 #endif
17 
18 using std::cout;
19 using std::endl;
20 using std::setw;
21 using std::setprecision;
22 
23 static void *_aligned_malloc(size_t size, size_t alignment) {
24  void *buffer;
25  posix_memalign(&buffer, alignment, size);
26  return buffer;
27 }
28 
29 typedef double ElementType;
30 typedef int64_t index_t;
31 typedef dash::Array<
32  ElementType,
33  index_t,
34  dash::CSRPattern<1, dash::ROW_MAJOR, index_t>
35  > Array_t;
36 typedef dash::util::Timer<
37  dash::util::TimeMeasure::Clock
38  > Timer;
39 
40 #ifndef DASH__ALGORITHM__COPY__USE_WAIT
41 const std::string dash_async_copy_variant = "flush";
42 #else
43 const std::string dash_async_copy_variant = "wait";
44 #endif
45 
48 
49 typedef struct benchmark_params_t {
50  size_t size_base;
51  size_t size_min;
52  size_t num_iterations;
53  size_t num_repeats;
54  size_t min_repeats;
55  size_t rep_base;
56  bool verify;
57  bool local_only;
58  bool flush_cache;
60 
61 typedef enum local_copy_method_t {
62  MEMCPY,
63  STD_COPY,
64  DASH_COPY,
65  DASH_COPY_ASYNC
66 } local_copy_method;
67 
68 typedef struct measurement_t {
69  double time_copy_s;
70  double time_copy_min_us;
71  double time_copy_max_us;
72  double time_copy_med_us;
73  double time_copy_sdv_us;
74  double time_init_s;
75  double mb_per_s;
76 } measurement;
77 
78 measurement copy_block_to_local(
79  size_t size,
80  size_t repeat,
81  size_t num_repeats,
82  index_t source_unit_id,
83  index_t target_unit_id,
84  index_t init_unit_id,
85  const benchmark_params & params,
86  local_copy_method l_copy_method = DASH_COPY);
87 
88 void print_measurement_header();
89 void print_measurement_record(
90  const std::string & scenario,
91  const std::string & local_copy_method,
92  const bench_cfg_params & cfg_params,
93  int unit_src,
94  int unit_dest,
95  int unit_init,
96  size_t size,
97  int num_repeats,
98  double secs,
100  const benchmark_params & params);
101 
102 benchmark_params parse_args(int argc, char * argv[]);
103 
104 void print_params(
105  const dash::util::BenchmarkParams & bench_cfg,
106  const benchmark_params & params);
107 
108 int main(int argc, char** argv)
109 {
110  dash::init(&argc, &argv);
111 #ifdef DASH_ENABLE_IPM
112  MPI_Pcontrol(0, "off");
113  MPI_Pcontrol(0, "clear");
114 #endif
115 
116  // 0: real, 1: virt
117  Timer::Calibrate(0);
118 
120 
121  measurement res;
122  double time_s;
123  auto ts_start = Timer::Now();
124  size_t num_numa_nodes = uloc.num_numa();
125  size_t num_local_cores = uloc.node_domain().num_cores();
126  // Number of physical cores in a single NUMA domain (7 on SuperMUC):
127  size_t numa_node_cores = num_local_cores / num_numa_nodes;
128  // Number of physical cores on a single socket (14 on SuperMUC):
129  size_t socket_cores = numa_node_cores * 2;
130  // Number of processing nodes:
131  size_t num_nodes = dash::util::Locality::NumNodes();
132 
133  dash::util::BenchmarkParams bench_params("bench.07.local-copy");
134  bench_params.print_header();
135  bench_params.print_pinning();
136 
137  benchmark_params params = parse_args(argc, argv);
138  size_t num_iterations = params.num_iterations;
139  size_t num_repeats = params.num_repeats;
140  size_t size_inc = params.size_min;
141 
142  auto bench_cfg = bench_params.config();
143 
144  print_params(bench_params, params);
145 
146  print_measurement_header();
147 
148  // Unit that owns the elements to be copied:
149  dart_unit_t u_src;
150  // Unit that creates the local copy:
151  dart_unit_t u_dst;
152  // Unit that initializes the array range to be copied at the source unit:
153  dart_unit_t u_init;
154  // Unit used as default destination:
155  dart_unit_t u_loc = (numa_node_cores % dash::size());
156 
157 #if 1
158  num_repeats = params.num_repeats;
159  for (size_t i = 0; i < num_iterations && num_repeats > 0;
160  ++i, num_repeats /= params.rep_base)
161  {
162  auto block_size = std::pow(params.size_base,i) * size_inc;
163  auto size = block_size * dash::size();
164 
165  num_repeats = std::max<size_t>(num_repeats, params.min_repeats);
166 
167  u_src = u_loc;
168  u_dst = u_loc;
169  u_init = (u_dst + num_local_cores) % dash::size();
170  ts_start = Timer::Now();
171  res = copy_block_to_local(size, i, num_repeats, u_src, u_dst, u_init,
172  params, STD_COPY);
173  time_s = Timer::ElapsedSince(ts_start) * 1.0e-06;
174  print_measurement_record("local", "std::copy", bench_cfg,
175  u_src, u_dst, u_init, size, num_repeats,
176  time_s, res, params);
177  }
178 #endif
179 
180 #if 1
181  num_repeats = params.num_repeats;
182  for (size_t i = 0; i < num_iterations && num_repeats > 0;
183  ++i, num_repeats /= params.rep_base)
184  {
185  auto block_size = std::pow(params.size_base,i) * size_inc;
186  auto size = block_size * dash::size();
187 
188  num_repeats = std::max<size_t>(num_repeats, params.min_repeats);
189 
190  u_src = u_loc;
191  u_dst = u_loc;
192  u_init = (u_dst + num_local_cores) % dash::size();
193  ts_start = Timer::Now();
194  res = copy_block_to_local(size, i, num_repeats, u_src, u_dst, u_init,
195  params);
196  time_s = Timer::ElapsedSince(ts_start) * 1.0e-06;
197  print_measurement_record("local", "dash::copy", bench_cfg,
198  u_src, u_dst, u_init, size, num_repeats,
199  time_s, res, params);
200  }
201 #endif
202 
203 #if 1
204  num_repeats = params.num_repeats;
205  for (size_t i = 0; i < num_iterations && num_repeats > 0;
206  ++i, num_repeats /= params.rep_base)
207  {
208  auto block_size = std::pow(params.size_base,i) * size_inc;
209  auto size = block_size * dash::size();
210 
211  num_repeats = std::max<size_t>(num_repeats, params.min_repeats);
212 
213  u_src = u_loc;
214  u_dst = (u_src + numa_node_cores) % dash::size();
215  u_init = (u_dst + num_local_cores) % dash::size();
216  ts_start = Timer::Now();
217  res = copy_block_to_local(size, i, num_repeats, u_src, u_dst, u_init,
218  params, DASH_COPY);
219  time_s = Timer::ElapsedSince(ts_start) * 1.0e-06;
220  print_measurement_record("socket.b", "dash::copy", bench_cfg,
221  u_src, u_dst, u_init, size, num_repeats,
222  time_s, res, params);
223  }
224 #endif
225 
226 #if 1
227  num_repeats = params.num_repeats;
228  for (size_t i = 0; i < num_iterations && num_repeats > 0;
229  ++i, num_repeats /= params.rep_base)
230  {
231  auto block_size = std::pow(params.size_base,i) * size_inc;
232  auto size = block_size * dash::size();
233 
234  num_repeats = std::max<size_t>(num_repeats, params.min_repeats);
235 
236  u_src = u_loc;
237  u_dst = (u_src + num_local_cores) % dash::size();
238  u_init = (u_src + numa_node_cores) % dash::size();
239  ts_start = Timer::Now();
240  res = copy_block_to_local(size, i, num_repeats, u_src, u_dst, u_init,
241  params, DASH_COPY_ASYNC);
242  time_s = Timer::ElapsedSince(ts_start) * 1.0e-06;
243  print_measurement_record("rmt.async", "dash::copy", bench_cfg,
244  u_src, u_dst, u_init, size, num_repeats,
245  time_s, res, params);
246  }
247 #endif
248 
249  if( dash::myid()==0 ) {
250  cout << "Benchmark finished" << endl;
251  }
252 
253  dash::finalize();
254  return 0;
255 }
256 
257 measurement copy_block_to_local(
258  size_t size,
259  size_t iteration,
260  size_t num_repeats,
261  index_t source_unit_id,
262  index_t target_unit_id,
263  index_t init_unit_id,
264  const benchmark_params & params,
265  local_copy_method l_copy_method)
266 {
267  typedef typename Array_t::pattern_type pattern_t;
268  pattern_t pattern(size, dash::BLOCKED);
269 
270  measurement result;
271  result.time_init_s = 0;
272  result.time_copy_s = 0;
273  result.time_copy_min_us = 0;
274  result.time_copy_max_us = 0;
275  result.mb_per_s = 0;
276 
278 
279  auto myid = dash::myid();
280  // Index of block to copy. Use block of succeeding neighbor
281  // which is expected to be in same NUMA domain for unit 0:
282  index_t block_index = source_unit_id;
283  auto source_block = pattern.block(block_index);
284  size_t block_size = source_block.size();
285  size_t block_bytes = block_size * sizeof(ElementType);
286  index_t copy_start_idx = source_block.offset(0);
287  index_t copy_end_idx = copy_start_idx + block_size;
288  auto block_unit_id = pattern.unit_at(copy_start_idx);
289  // Size of shared cache on highest locality level, in bytes:
290  size_t cache_size = uloc.cache_line_size(2);
291  // Alignment:
292  size_t align_size = 128;
293  // Ensure cache_size is multiple of alignment:
294  cache_size = (cache_size % align_size == 0)
295  ? cache_size
296  : ((cache_size / align_size) + 1) * align_size;
297 
298  // Total time spent in copy operations:
299  dash::Shared<double> time_copy_us;
300  // Total time spent in initialization of array values:
301  dash::Shared<double> time_init_us;
302  // Minimum duration for a single copy operation:
303  dash::Shared<double> time_copy_min_us;
304  // Maximum duration for a single copy operation:
305  dash::Shared<double> time_copy_max_us;
306  // Median of duration of copy operations:
307  dash::Shared<double> time_copy_med_us;
308  // Standard deviation of duration of copy operations:
309  dash::Shared<double> time_copy_sdv_us;
310 
311  DASH_LOG_DEBUG("copy_block_to_local()",
312  "size:", size,
313  "block index:", block_index,
314  "block size:", block_size,
315  "copy index range:", copy_start_idx, "-", copy_end_idx);
316 
317  if (source_unit_id != block_unit_id) {
319  "copy_block_to_local: Invalid distribution of global array");
320  }
321 
322  // Prepare local buffer:
323  ElementType * local_array = nullptr;
324  if (myid == target_unit_id) {
325  local_array = static_cast<ElementType *>(
326  _aligned_malloc(block_bytes, align_size));
327  }
328 
329  Array_t global_array;
330  global_array.allocate(size, dash::BLOCKED);
331 
332  std::srand(time(NULL));
333 
334  double total_copy_us = 0;
335  double total_init_us = 0;
336  std::vector<double> history_copy_us;
337  // Perform measurement:
338  for (size_t r = 0; r < num_repeats; ++r) {
339  dash::barrier();
340  Timer::timestamp_t ts_init_start = Timer::Now();
341 
342  // Global pointer to copy input begin:
343  auto src_g_begin = global_array.begin() + copy_start_idx;
344  // Global pointer to copy input end:
345  auto src_g_end = global_array.begin() + copy_end_idx;
346  // Local pointer to copy input begin, or nullptr if not local:
347  ElementType * src_l_begin = src_g_begin.local();
348 
349  // ------------------------------------------------------------------------
350  // -- Initialize global array: --------------------------------------------
351  for (size_t l = 0; l < block_size; ++l) {
352  global_array.local[l] = (l+1) * (myid+1)
353  + (std::rand() / RAND_MAX);
354  }
355  dash::barrier();
356  // -- Prevent copying from cache: -----------------------------------------
357  if (params.flush_cache && myid == init_unit_id) {
358  // Prevent copying from L3 cache by initializing values to be copied on
359  // a remote node, i.e. on different node than target unit:
360  ElementType * block_values = new ElementType[block_size];
361  for (size_t p = 0; p < block_size; ++p) {
362  block_values[p] = ((myid+1) * 100000)
363  + (p * 1000);
364  }
365  // Copy block values to global array:
366  dash::copy(block_values,
367  block_values + block_size,
368  src_g_begin);
369  // Free local block values after they have been copied to source block:
370  delete[] block_values;
371  }
372  dash::barrier();
373  // -- Finished initialization of global array. ----------------------------
374  // ------------------------------------------------------------------------
375 
376 #ifdef DASH_ENABLE_IPM
377  MPI_Pcontrol(0, "on");
378 #endif
379  // -- Copy array block from source to destination rank: -------------------
380  if(myid == target_unit_id) {
381  total_init_us += Timer::ElapsedSince(ts_init_start);
382  ElementType * copy_lend = nullptr;
383 
384  auto ts_copy_start = Timer::Now();
385  if (l_copy_method == STD_COPY) {
386  copy_lend = std::copy(src_l_begin,
387  src_l_begin + block_size,
388  local_array);
389  } else if (l_copy_method == MEMCPY) {
390  copy_lend = local_array + block_size;
391  std::memcpy(local_array, src_l_begin, block_size * sizeof(ElementType));
392  } else if (l_copy_method == DASH_COPY_ASYNC) {
393  copy_lend = dash::copy_async(src_g_begin,
394  src_g_end,
395  local_array).get();
396  } else {
397  copy_lend = dash::copy(src_g_begin,
398  src_g_end,
399  local_array);
400  }
401  auto copy_us = Timer::ElapsedSince(ts_copy_start);
402  total_copy_us += copy_us;
403  history_copy_us.push_back(copy_us);
404 
405  // -- Finished copy from source to destination rank. --------------------
406 
407  // -- Validate values: --------------------------------------------------
408  if (copy_lend != local_array + block_size) {
410  "copy_block_to_local: " <<
411  "Unexpected end of copy output range " <<
412  "expected: " << local_array + block_size << " " <<
413  "actual: " << copy_lend);
414  }
415  if (params.verify) {
416  for (size_t l = 0; l < block_size; ++l) {
417  ElementType expected = global_array[copy_start_idx + l];
418  ElementType actual = local_array[l];
419  if (actual != expected) {
421  "copy_block_to_local: Validation failed " <<
422  "for copied element at offset " << l << " " <<
423  "in repetition " << r << ": " <<
424  "expected: " << expected << " " <<
425  "actual: " << actual);
426  }
427  }
428  }
429  // -- Finished validation. ----------------------------------------------
430  } // if target unit
431 #ifdef DASH_ENABLE_IPM
432  MPI_Pcontrol(0, "off");
433 #endif
434  // Wait for validation, otherwise values in global array could be
435  // overwritten when other units start with next repetition:
436  dash::barrier();
437  } // for repeats
438 
439  // Free buffers:
440  if (local_array != nullptr) {
441  free(local_array);
442  }
443 
444  if(myid == target_unit_id) {
445  time_copy_us.set(total_copy_us);
446  time_init_us.set(total_init_us);
447 
448  std::sort(history_copy_us.begin(), history_copy_us.end());
449  time_copy_med_us.set(history_copy_us[history_copy_us.size() / 2]);
450  time_copy_sdv_us.set(dash::math::sigma(history_copy_us.begin(),
451  history_copy_us.end()));
452  time_copy_min_us.set(history_copy_us.front());
453  time_copy_max_us.set(history_copy_us.back());
454 
455  }
456 
457  global_array.deallocate();
458 
459  DASH_LOG_DEBUG(
460  "copy_block_to_local",
461  "Waiting for completion of copy operation");
462  dash::barrier();
463 
464  double mb_copied = static_cast<double>(block_bytes * num_repeats)
465  / 1024.0 / 1024.0;
466 
467  result.time_init_s = time_init_us.get() * 1.0e-6;
468  result.time_copy_s = time_copy_us.get() * 1.0e-6;
469  result.time_copy_min_us = time_copy_min_us.get();
470  result.time_copy_max_us = time_copy_max_us.get();
471  result.time_copy_med_us = time_copy_med_us.get();
472  result.time_copy_sdv_us = time_copy_sdv_us.get();
473  result.mb_per_s = mb_copied / result.time_copy_s;
474 
475  return result;
476 }
477 
478 void print_measurement_header()
479 {
480  if (dash::myid() == 0) {
481  cout << std::right
482  << std::setw(5) << "units" << ","
483  << std::setw(9) << "mpi.impl" << ","
484  << std::setw(10) << "scenario" << ","
485  << std::setw(12) << "copy.type" << ","
486  << std::setw(7) << "src.u" << ","
487  << std::setw(7) << "dest.u" << ","
488  << std::setw(7) << "init.u" << ","
489  << std::setw(8) << "repeats" << ","
490  << std::setw(9) << "block.n" << ","
491  << std::setw(9) << "block.kb" << ","
492  << std::setw(7) << "init.s" << ","
493  << std::setw(8) << "copy.s" << ","
494  << std::setw(12) << "copy.min.us" << ","
495  << std::setw(12) << "copy.med.us" << ","
496  << std::setw(12) << "copy.max.us" << ","
497  << std::setw(12) << "copy.sdv.us" << ","
498  << std::setw(7) << "time.s" << ","
499  << std::setw(9) << "mb/s"
500  << endl;
501  }
502 }
503 
504 void print_measurement_record(
505  const std::string & scenario,
506  const std::string & local_copy_method,
507  const bench_cfg_params & cfg_params,
508  int unit_src,
509  int unit_dest,
510  int unit_init,
511  size_t size,
512  int num_repeats,
513  double secs,
514  measurement measurement,
515  const benchmark_params & params)
516 {
517  if (dash::myid() == 0) {
518  std::string mpi_impl = dash__toxstr(DASH_MPI_IMPL_ID);
519  size_t block_n = size / dash::size();
520  size_t g_size_kb = (size * sizeof(ElementType)) / 1024;
521  size_t block_kb = (block_n * sizeof(ElementType)) / 1024;
522  double mbps = measurement.mb_per_s;
523  double init_s = measurement.time_init_s;
524  double copy_s = measurement.time_copy_s;
525  double copy_min_us = measurement.time_copy_min_us;
526  double copy_max_us = measurement.time_copy_max_us;
527  double copy_med_us = measurement.time_copy_med_us;
528  double copy_sdv_us = measurement.time_copy_sdv_us;
529  cout << std::right
530  << std::setw(5) << dash::size() << ","
531  << std::setw(9) << mpi_impl << ","
532  << std::setw(10) << scenario << ","
533  << std::setw(12) << local_copy_method << ","
534  << std::setw(7) << unit_src << ","
535  << std::setw(7) << unit_dest << ","
536  << std::setw(7) << unit_init << ","
537  << std::setw(8) << num_repeats << ","
538  << std::setw(9) << block_n << ","
539  << std::setw(9) << block_kb << ","
540  << std::fixed << setprecision(2) << setw(7) << init_s << ","
541  << std::fixed << setprecision(5) << setw(8) << copy_s << ","
542  << std::fixed << setprecision(2) << setw(12) << copy_min_us << ","
543  << std::fixed << setprecision(2) << setw(12) << copy_med_us << ","
544  << std::fixed << setprecision(2) << setw(12) << copy_max_us << ","
545  << std::fixed << setprecision(2) << setw(12) << copy_sdv_us << ","
546  << std::fixed << setprecision(2) << setw(7) << secs << ","
547  << std::fixed << setprecision(2) << setw(9) << mbps
548  << endl;
549  }
550 }
551 
552 benchmark_params parse_args(int argc, char * argv[])
553 {
554  benchmark_params params;
555  params.size_base = 4;
556  params.num_iterations = 8;
557  params.rep_base = params.size_base;
558  params.num_repeats = 0;
559  params.min_repeats = 1;
560  params.verify = false;
561  params.local_only = false;
562  params.flush_cache = false;
563  params.size_min = 64;
564 
565  for (auto i = 1; i < argc; i += 2) {
566  std::string flag = argv[i];
567  if (flag == "-sb") {
568  params.size_base = atoi(argv[i+1]);
569  } else if (flag == "-smin") {
570  params.size_min = atoi(argv[i+1]);
571  } else if (flag == "-i") {
572  params.num_iterations = atoi(argv[i+1]);
573  } else if (flag == "-rmax") {
574  params.num_repeats = atoi(argv[i+1]);
575  } else if (flag == "-rmin") {
576  params.min_repeats = atoi(argv[i+1]);
577  } else if (flag == "-rb") {
578  params.rep_base = atoi(argv[i+1]);
579  } else if (flag == "-verify") {
580  params.verify = true;
581  --i;
582  } else if (flag == "-lo") {
583  params.local_only = true;
584  --i;
585  } else if (flag == "-fcache") {
586  params.flush_cache = true;
587  --i;
588  }
589  }
590  if (params.num_repeats == 0) {
591  params.num_repeats = 8 * std::pow(params.rep_base, params.num_iterations);
592  }
593  return params;
594 }
595 
596 void print_params(
597  const dash::util::BenchmarkParams & bench_cfg,
598  const benchmark_params & params)
599 {
600  if (dash::myid() != 0) {
601  return;
602  }
603 
604  bench_cfg.print_section_start("Runtime arguments");
605  bench_cfg.print_param("-smin", "initial block size", params.size_min);
606  bench_cfg.print_param("-sb", "block size base", params.size_base);
607  bench_cfg.print_param("-rmax", "initial repeats", params.num_repeats);
608  bench_cfg.print_param("-rmin", "min, repeats", params.min_repeats);
609  bench_cfg.print_param("-rb", "rep. base", params.rep_base);
610  bench_cfg.print_param("-i", "iterations", params.num_iterations);
611  bench_cfg.print_param("-verify", "verification", params.verify);
612  bench_cfg.print_param("-lo", "local only", params.local_only);
613  bench_cfg.print_param("-fcache", "no copying from cache", params.flush_cache);
614  bench_cfg.print_section_end();
615 }
616 
global_unit_t myid()
Shortcut to query the global unit ID of the calling unit.
size_t size()
Return the number of units in the global team.
dash::Future< ValueType * > copy_async(InputIt in_first, InputIt in_last, OutputIt out_first)
Asynchronous variant of dash::copy.
int32_t dart_unit_t
Data type for storing a unit ID.
Definition: dart_types.h:154
DASH_CONSTEXPR local_type local() const
Convert global iterator to native pointer.
Definition: GlobIter.h:279
void finalize()
Finalize the DASH library and the underlying runtime system.
reference get()
Get a reference on the shared value.
Definition: Shared.h:239
void set(const value_type &val)
Set the value of the shared element.
Definition: Shared.h:226
void sort(GlobRandomIt begin, GlobRandomIt end)
Sorts the elements in the range, defined by [begin, end) in ascending order.
A distributed array.
Definition: Array.h:89
OutputIt copy(InputIt in_first, InputIt in_last, OutputIt out_first)
Copies the elements in the range, defined by [in_first, in_last), to another range beginning at out_f...
Shared access to a value in global memory across a team.
Definition: Shared.h:23
void barrier()
A global barrier involving all units.
void init(int *argc, char ***argv)
Initialize the DASH library and the underlying runtime system.
iterator begin() noexcept
Global pointer to the beginning of the array.
Definition: Array.h:1040
bool allocate(size_type nelem, dash::DistributionSpec< 1 > distribution, dash::Team &team=dash::Team::All())
Delayed allocation of global memory using a one-dimensional distribution spec.
Definition: Array.h:1319
local_type local
Local proxy object, allows use in range-based for loops.
Definition: Array.h:732
Wrapper of a single dart_unit_locality_t object.
Definition: UnitLocality.h:30