DASH  0.3.0
GlobHeapMem.h
1 #ifndef DASH__GLOB_HEAP_H__INCLUDED
2 #define DASH__GLOB_HEAP_H__INCLUDED
3 
4 #include <dash/dart/if/dart.h>
5 
6 #include <dash/Types.h>
7 #include <dash/GlobPtr.h>
8 #include <dash/GlobSharedRef.h>
9 #include <dash/Allocator.h>
10 #include <dash/Team.h>
11 #include <dash/Array.h>
12 #include <dash/Onesided.h>
13 
14 #include <dash/algorithm/MinMax.h>
15 #include <dash/algorithm/Copy.h>
16 
17 #include <dash/memory/GlobHeapPtr.h>
18 #include <dash/memory/GlobHeapLocalPtr.h>
19 
20 #include <dash/internal/Logging.h>
21 
22 #include <list>
23 #include <vector>
24 #include <iterator>
25 #include <sstream>
26 #include <iostream>
27 
28 
29 namespace dash {
30 
31 // Forward declarations
32 template<typename T, class MemSpaceT> class GlobHeapPtr;
33 
197 template <
199  typename ElementType,
200  class LocalMemorySpace = dash::HostSpace,
201  global_allocation_policy AllocationPolicy =
203  template <class> class LocalAlloc = allocator::DefaultAllocator>
204 class GlobHeapMem {
205 private:
206  typedef GlobHeapMem self_t;
207 
209 
211  typename std::remove_const<ElementType>::type,
212  LocalMemorySpace,
213  AllocationPolicy,
214  LocalAlloc>>;
215 
216  using local_allocator_traits =
217  std::allocator_traits<typename allocator_traits::local_allocator>;
218 
219 public:
220  using value_type = typename allocator_traits::value_type;
221 
222  using allocator_type = typename allocator_traits::allocator_type;
223 
224  using local_memory_space = typename memory_traits::memory_space_type;
225 
226  typedef typename allocator_type::size_type size_type;
227  typedef typename allocator_type::difference_type difference_type;
228  typedef typename allocator_type::difference_type index_type;
229 
230  typedef typename allocator_type::pointer raw_pointer;
231 
234 
237 
238  typedef value_type & local_reference;
239  typedef const value_type & const_local_reference;
240 
241  typedef typename local_pointer::bucket_type bucket_type;
242 
243  template <typename U>
244  using rebind =
246 
247  using local_void_pointer = void *;
248 
249 private:
250  typedef typename std::list<bucket_type> bucket_list;
251  typedef typename bucket_list::iterator bucket_iterator;
252 
253  typedef dash::Array<
254  size_type, int, dash::CSRPattern<1, dash::ROW_MAJOR, int> >
256 
257  typedef std::vector<std::vector<size_type> > bucket_cumul_sizes_map;
258 
259  template<typename T_, class GMem_>
260  friend class dash::GlobHeapPtr;
261 
262 private:
263  allocator_type _allocator;
264  dash::Team * _team;
265  dart_team_t _teamid;
266  size_type _nunits = 0;
267  local_pointer _lbegin = nullptr;
268  local_pointer _lend = nullptr;
273  bucket_list _buckets;
275  bucket_list _detach_buckets;
277  bucket_iterator _attach_buckets_first;
280  local_sizes_map _local_sizes;
286  bucket_cumul_sizes_map _bucket_cumul_sizes;
289  local_sizes_map _num_attach_buckets;
292  local_sizes_map _num_detach_buckets;
294  size_type _remote_size = 0;
296  index_type _begin_idx;
298  index_type _end_idx;
299 
300 public:
310  size_type n_local_elem = 0)
311  : GlobHeapMem(n_local_elem, nullptr)
312  {
313  }
314 
324  size_type n_local_elem,
326  dash::Team & team)
327  : GlobHeapMem(n_local_elem, nullptr, team)
328  {
329  }
330 
341  size_type n_local_elem,
343  LocalMemorySpace* r)
344  : GlobHeapMem(n_local_elem, r, dash::Team::All())
345  {
346  }
347 
348  GlobHeapMem(size_type n_local_elem, LocalMemorySpace* r, Team& team)
349  : _allocator(team, r),
350  _team(&team),
351  _teamid(team.dart_id()),
352  _nunits(team.size()),
353  _myid(team.myid()),
354  _attach_buckets_first(_buckets.end()),
355  _local_sizes(team.size(), team),
356  _bucket_cumul_sizes(team.size()),
357  _num_attach_buckets(team.size(), team),
358  _num_detach_buckets(team.size(), team),
359  _remote_size(0)
360  {
361  DASH_LOG_TRACE("GlobHeapMem.(ninit,nunits)",
362  n_local_elem, team.size());
363 
364  _local_sizes.local[0] = 0;
365  _num_attach_buckets.local[0] = 0;
366  _num_detach_buckets.local[0] = 0;
367 
368  DASH_LOG_TRACE("GlobHeapMem.GlobHeapMem",
369  "allocating initial memory space");
370  grow(n_local_elem);
371  commit();
372 
373  DASH_LOG_TRACE("GlobHeapMem.GlobHeapMem >");
374 
375  }
376 
381  {
382  DASH_LOG_TRACE("GlobHeapMem.~GlobHeapMem()");
383  }
384 
385  GlobHeapMem() = delete;
386 
390  GlobHeapMem(const self_t & other) = default;
391 
395  self_t & operator=(const self_t & rhs) = default;
396 
400  constexpr bool operator==(const self_t & rhs) const noexcept
401  {
402  return (_teamid == rhs._teamid &&
403  _nunits == rhs._nunits &&
404  _lbegin == rhs._lbegin &&
405  _lend == rhs._lend &&
406  _buckets == rhs._buckets &&
407  _detach_buckets == rhs._detach_buckets);
408  }
409 
413  constexpr bool operator!=(const self_t & rhs) const noexcept
414  {
415  return !(*this == rhs);
416  }
417 
422  constexpr size_type size() const noexcept
423  {
424  return _remote_size + local_size();
425  }
426 
430  constexpr size_type local_size() const noexcept
431  {
432  return _local_sizes.local[0];
433  }
434 
441  inline size_type local_size(team_unit_t unit) const
442  {
443  DASH_LOG_TRACE("GlobHeapMem.local_size(u)", "unit:", unit);
444  DASH_ASSERT_RANGE(0, unit, _nunits-1, "unit id out of range");
445  DASH_LOG_TRACE_VAR("GlobHeapMem.local_size",
446  _bucket_cumul_sizes[unit]);
447  size_type unit_local_size;
448  if (unit == _myid) {
449  // Value of _local_sizes[u] is the local size as visible by the unit,
450  // i.e. including size of unattached buckets.
451  unit_local_size = _local_sizes.local[0];
452  } else {
453  unit_local_size = _bucket_cumul_sizes[unit].back();
454  }
455  DASH_LOG_TRACE("GlobHeapMem.local_size >", unit_local_size);
456  return unit_local_size;
457  }
458 
465  constexpr dash::Team & team() const noexcept
466  {
467  return (_team != nullptr) ? *_team : dash::Team::Null();
468  }
469 
485  local_pointer grow(size_type num_elements)
486  {
487  DASH_LOG_DEBUG_VAR("GlobHeapMem.grow()", num_elements);
488  size_type local_size_old = _local_sizes.local[0];
489  DASH_LOG_TRACE("GlobHeapMem.grow",
490  "current local size:", local_size_old);
491  if (num_elements == 0) {
492  DASH_LOG_DEBUG("GlobHeapMem.grow >", "no grow");
493  return _lend;
494  }
495  // Update size of local memory space:
496  _local_sizes.local[0] += num_elements;
497  // Update number of local buckets marked for attach:
498  _num_attach_buckets.local[0] += 1;
499 
500  // Create new unattached bucket:
501  DASH_LOG_TRACE("GlobHeapMem.grow", "creating new unattached bucket:",
502  "size:", num_elements);
503  bucket_type bucket;
504  bucket.size = num_elements;
505  bucket.allocated_size = num_elements;
506  bucket.lptr = _allocator.allocate_local(bucket.size);
507  bucket.gptr = DART_GPTR_NULL;
508  bucket.attached = false;
509  // Add bucket to local memory space:
510  _buckets.push_back(bucket);
511  if (_attach_buckets_first == _buckets.end()) {
512  // Move iterator to first unattached bucket to position of new bucket:
513  _attach_buckets_first = _buckets.begin();
514  std::advance(_attach_buckets_first, _buckets.size() - 1);
515  }
516  _bucket_cumul_sizes[_myid].push_back(_local_sizes.local[0]);
517  DASH_LOG_TRACE("GlobHeapMem.grow", "added unattached bucket:",
518  "allocated size:", bucket.size,
519  "lptr:", bucket.lptr);
520  // Update local iteration space:
521  update_lbegin();
522  update_lend();
523  DASH_ASSERT_EQ(_local_sizes.local[0], _lend - _lbegin,
524  "local size differs from local iteration space size");
525  DASH_LOG_TRACE("GlobHeapMem.grow",
526  "new local size:", _local_sizes.local[0]);
527  DASH_LOG_TRACE("GlobHeapMem.grow",
528  "local buckets:", _buckets.size(),
529  "unattached buckets:", _num_attach_buckets.local[0]);
530  DASH_LOG_TRACE("GlobHeapMem.grow >");
531  // Return local iterator to start of allocated memory:
532  return _lbegin + local_size_old;
533  }
534 
553  void shrink(size_type num_elements)
554  {
555  // This function updates the size of the local memory space of the
556  // calling unit u.
557  // The following members are updated:
558  //
559  // _local_sizes[u]:
560  // Size of local memory space as visible to unit u.
561  //
562  // _bucket_cumul_sizes:
563  // An array mapping units to a list of their cumulative bucket sizes
564  // (i.e. postfix sum) which is required to iterate over the
565  // non-contigous global dynamic memory space.
566  // For example, if unit 2 allocated buckets with sizes 1, 3 and 5,
567  // _bucket_cumul_sizes[2] is a list { 1, 4, 9 }.
568  //
569  // _buckets:
570  // List of local buckets that provide the underlying storage of the
571  // active unit's local memory space.
572  //
573  // Notes:
574  //
575  // It must be ensured that the updated cumulative bucket sizes of a
576  // remote unit can be resolved in \c update_local_size() after any
577  // possible combination of grow- and shrink-operations at the remote unit
578  // from the following information:
579  //
580  // - the cumulative bucket sizes of the remote unit at the time of the
581  // last commit
582  // - the remote unit's local size at the time of the last commit
583  // - the remote unit's current local size (including unattached buckets)
584  // - the number of the remote unit's unattached buckets and their size
585 
586  DASH_LOG_DEBUG_VAR("GlobHeapMem.shrink()", num_elements);
587  DASH_ASSERT_LT(num_elements, local_size() + 1,
588  "cannot shrink size " << local_size() << " "
589  "by " << num_elements << " elements");
590  if (num_elements == 0) {
591  DASH_LOG_DEBUG("GlobHeapMem.shrink >", "no shrink");
592  return;
593  }
594  DASH_LOG_TRACE("GlobHeapMem.shrink",
595  "current local size:", _local_sizes.local[0]);
596  DASH_LOG_TRACE("GlobHeapMem.shrink",
597  "current local buckets:", _buckets.size());
598  // Position of iterator to first unattached bucket:
599  auto attach_buckets_first_pos = std::distance(_buckets.begin(),
600  _attach_buckets_first);
601  DASH_LOG_TRACE_VAR("GlobHeapMem.shrink", attach_buckets_first_pos);
602  // Number of elements left to deallocate:
603  auto num_dealloc = num_elements;
604  // Try to reduce local capacity by deallocating un-attached local buckets
605  // as they do not have to be detached collectively.
606  // Unattached buckets can be removed from memory space immediately as
607  // remote units cannot have a pending reference on them.
608  while (!_buckets.back().attached && num_dealloc > 0) {
609  bucket_type & bucket_last = _buckets.back();
610  // Shrink / remove unattached buckets starting at newest bucket:
611  if (bucket_last.size <= num_dealloc) {
612  DASH_LOG_TRACE("GlobHeapMem.shrink", "remove unattached bucket:",
613  "size:", bucket_last.size);
614  // Mark entire bucket for deallocation below:
615  num_dealloc -= bucket_last.size;
616  _local_sizes.local[0] -= bucket_last.size;
617  _bucket_cumul_sizes[_myid].pop_back();
618  // End iterator of _buckets about to change, update iterator to first
619  // unattached bucket if it references the removed bucket:
620  auto it_last_bucket = _buckets.end();
621  std::advance(it_last_bucket, -1);
622  if (_attach_buckets_first == it_last_bucket) {
623  // Iterator to first unattached bucket references last bucket:
624  DASH_LOG_TRACE("GlobHeapMem.shrink",
625  "updating iterator to first unattached bucket");
626  std::advance(_attach_buckets_first, -1);
627  }
628  _allocator.deallocate_local(bucket_last.lptr, bucket_last.allocated_size);
629  _buckets.pop_back();
630  if (_attach_buckets_first->attached) {
631  // Updated iterator to first unattached bucket references attached
632  // bucket:
633  _attach_buckets_first = _buckets.end();
634  }
635  // Update number of local buckets marked for attach:
636  DASH_ASSERT_GT(_num_attach_buckets.local[0], 0,
637  "Last bucket unattached but number of buckets marked "
638  "for attach is 0");
639  _num_attach_buckets.local[0] -= 1;
640  } else if (bucket_last.size > num_dealloc) {
641  auto const size_new = bucket_last.size - num_dealloc;
642 
643  DASH_LOG_TRACE("GlobHeapMem.shrink", "shrink unattached bucket:",
644  "old size:", bucket_last.size,
645  "new size:", size_new);
646  _local_sizes.local[0] -= num_dealloc;
647  _bucket_cumul_sizes[_myid].back() -= num_dealloc;
648  num_dealloc = 0;
649 
650  //Deallocate old local bucket
651  _allocator.deallocate_local(bucket_last.lptr, bucket_last.allocated_size);
652  //Allocate new bucket with specified size
653  bucket_last.lptr = _allocator.allocate_local(size_new);
654  if (bucket_last.lptr == nullptr) {
656  "GlobHeapMem.shrink: Allocating bucket of size failed");
657  }
658  bucket_last.size = size_new;
659  bucket_last.allocated_size = size_new;
660  }
661  }
662  // Number of elements to deallocate exceeds capacity of un-attached
663  // buckets, deallocate attached buckets:
664  auto num_dealloc_gbuckets = 0;
665  // Shrink attached buckets starting at newest bucket:
666  for (auto bucket_it = _buckets.rbegin();
667  bucket_it != _buckets.rend();
668  ++bucket_it) {
669  if (!bucket_it->attached) {
670  continue;
671  }
672  if (num_dealloc == 0) {
673  break;
674  }
675  if (bucket_it->size <= num_dealloc) {
676  // mark entire bucket for deallocation:
677  num_dealloc_gbuckets++;
678  _num_detach_buckets.local[0] += 1;
679  _local_sizes.local[0] -= bucket_it->size;
680  _bucket_cumul_sizes[_myid].back() -= bucket_it->size;
681  num_dealloc -= bucket_it->size;
682  } else if (bucket_it->size > num_dealloc) {
683  DASH_LOG_TRACE("GlobHeapMem.shrink", "shrink attached bucket:",
684  "old size:", bucket_it->size,
685  "new size:", bucket_it->size - num_dealloc);
686  bucket_it->size -= num_dealloc;
687  _local_sizes.local[0] -= num_dealloc;
688  _bucket_cumul_sizes[_myid].back() -= num_dealloc;
689  num_dealloc = 0;
690  }
691  }
692  // Mark attached buckets for deallocation.
693  // Requires separate loop as iterators on _buckets could be invalidated.
694  DASH_LOG_DEBUG_VAR("GlobHeapMem.shrink", num_dealloc_gbuckets);
695  while (num_dealloc_gbuckets-- > 0) {
696  auto dealloc_bucket = _buckets.back();
697  DASH_LOG_TRACE("GlobHeapMem.shrink", "deallocate attached bucket:"
698  "size:", dealloc_bucket.size,
699  "lptr:", dealloc_bucket.lptr);
700  // Mark bucket to be detached in next call of commit():
701  _detach_buckets.push_back(dealloc_bucket);
702  // Unregister bucket:
703  _buckets.pop_back();
704  }
705  // Update local iterators as bucket iterators might have changed:
706  update_lbegin();
707  update_lend();
708 
709  DASH_LOG_TRACE("GlobHeapMem.shrink",
710  "cumulative bucket sizes:", _bucket_cumul_sizes[_myid]);
711  DASH_LOG_TRACE("GlobHeapMem.shrink",
712  "new local size:", _local_sizes.local[0],
713  "new iteration space size:", std::distance(
714  _lbegin, _lend));
715  DASH_LOG_TRACE("GlobHeapMem.shrink",
716  "total number of buckets:", _buckets.size(),
717  "unattached buckets:", std::distance(
718  _attach_buckets_first,
719  _buckets.end()));
720  DASH_LOG_DEBUG("GlobHeapMem.shrink >");
721  }
722 
739  void commit()
740  {
741  DASH_LOG_DEBUG("GlobHeapMem.commit()");
742  DASH_LOG_TRACE_VAR("GlobHeapMem.commit", _buckets.size());
743 
744  // First detach, then attach to minimize number of elements allocated
745  // at the same time:
746  size_type num_detached_elem = commit_detach();
747  size_type num_attached_elem = commit_attach();
748 
749  if (num_detached_elem > 0 || num_attached_elem > 0) {
750  // Update _begin iterator:
751  DASH_LOG_TRACE("GlobHeapMem.commit", "updating _begin");
752  _begin_idx = 0;
753  DASH_LOG_TRACE("GlobHeapMem.commit", "updating _end");
754  _end_idx = size();
755  }
756  // Update local iterators as bucket iterators might have changed:
757  DASH_LOG_TRACE("GlobHeapMem.commit", "updating _lbegin");
758  update_lbegin();
759  DASH_LOG_TRACE("GlobHeapMem.commit", "updating _lend");
760  update_lend();
761  DASH_LOG_DEBUG("GlobHeapMem.commit >", "finished");
762  }
763 
783  void resize(size_type num_elements)
784  {
785  DASH_LOG_DEBUG("GlobHeapMem.resize()", "new size:", num_elements);
786  index_type diff_capacity = num_elements - size();
787  if (diff_capacity > 0) {
788  grow(diff_capacity);
789  } else if (diff_capacity < 0) {
790  shrink(-diff_capacity);
791  }
792  DASH_LOG_DEBUG("GlobHeapMem.resize >");
793  }
794 
798  pointer begin() noexcept
799  {
800  return pointer(this, _begin_idx);
801  }
802 
806  constexpr const_pointer begin() const noexcept
807  {
808  return const_pointer(this, _begin_idx);
809  }
810 
814  pointer end() noexcept
815  {
816  return pointer(this, _end_idx);
817  }
818 
822  const_pointer end() const noexcept
823  {
824  return const_pointer(this, _end_idx);
825  }
826 
831  inline local_pointer & lbegin() noexcept
832  {
833  return _lbegin;
834  }
835 
840  inline const_local_pointer lbegin() const noexcept
841  {
842  return _lbegin;
843  }
844 
849  inline local_pointer & lend() noexcept
850  {
851  return _lend;
852  }
853 
858  inline const_local_pointer & lend() const noexcept
859  {
860  return _lend;
861  }
862 
868  template<typename ValueType = value_type>
869  void put_value(
870  const ValueType & newval,
871  index_type global_index)
872  {
873  DASH_LOG_TRACE("GlobHeapMem.put_value(newval, gidx = %d)",
874  global_index);
875  auto git = pointer(this, global_index);
876  dash::put_value(newval, git);
877  }
878 
884  template<typename ValueType = value_type>
885  void get_value(
886  ValueType * ptr,
887  index_type global_index) const
888  {
889  DASH_LOG_TRACE("GlobHeapMem.get_value(newval, gidx = %d)",
890  global_index);
891  auto git = pointer(this, global_index);
892  dash::get_value(ptr, git);
893  }
894 
901  void barrier() const
902  {
903  DASH_ASSERT_RETURNS(
904  dart_barrier(_teamid),
905  DART_OK);
906  }
907 
912  template<typename IndexT>
913  pointer at(
915  team_unit_t unit,
917  IndexT local_index)
918  {
919  DASH_LOG_DEBUG("GlobHeapMem.at()",
920  "unit:", unit, "lidx:", local_index);
921  if (_nunits == 0) {
922  DASH_THROW(dash::exception::RuntimeError, "No units in team");
923  }
924  pointer git(this, unit, local_index);
925  DASH_LOG_DEBUG_VAR("GlobHeapMem.at >", git);
926  return git;
927  }
928 
933  template<typename IndexT>
934  const_pointer at(
936  team_unit_t unit,
938  IndexT local_index) const
939  {
940  DASH_LOG_DEBUG("GlobHeapMem.at() const",
941  "unit:", unit, "lidx:", local_index);
942  if (_nunits == 0) {
943  DASH_THROW(dash::exception::RuntimeError, "No units in team");
944  }
945  const_pointer git(this, unit, local_index);
946  DASH_LOG_DEBUG_VAR("GlobHeapMem.at const >", git);
947  return git;
948  }
949 
950  inline const bucket_list & local_buckets() const
951  {
952  return _buckets;
953  }
954 
955 private:
956 
962  void update_lbegin() noexcept
963  {
964  DASH_LOG_TRACE("GlobHeapMem.update_lbegin()");
965  local_pointer unit_lbegin(
966  // iteration space
967  _buckets.begin(), _buckets.end(),
968  // position in iteration space
969  0,
970  // bucket at position in iteration space,
971  // offset in bucket
972  _buckets.begin(), 0);
973  DASH_LOG_TRACE("GlobHeapMem.update_lbegin >", unit_lbegin);
974  _lbegin = unit_lbegin;
975  }
976 
981  void update_lend() noexcept
982  {
983  DASH_LOG_TRACE("GlobHeapMem.update_lend()");
984  local_pointer unit_lend(
985  // iteration space
986  _buckets.begin(), _buckets.end(),
987  // position in iteration space
988  local_size(),
989  // bucket at position in iteration space,
990  // offset in bucket
991  _buckets.end(), 0);
992  DASH_LOG_TRACE("GlobHeapMem.update_lend >", unit_lend);
993  _lend = unit_lend;
994  }
995 
996 
1000  size_type commit_detach()
1001  {
1002  DASH_LOG_TRACE("GlobHeapMem.commit_detach()");
1003  DASH_LOG_TRACE("GlobHeapMem.commit_detach",
1004  "local buckets to detach:", _num_detach_buckets.local[0]);
1005  // Number of elements successfully deallocated from global memory in
1006  // this commit:
1007  size_type num_detached_elem = 0;
1008  for (auto bucket_it = _detach_buckets.begin();
1009  bucket_it != _detach_buckets.cend(); ++bucket_it) {
1010  DASH_LOG_TRACE("GlobHeapMem.commit_detach", "detaching bucket:",
1011  "size:", bucket_it->size,
1012  "lptr:", bucket_it->lptr,
1013  "gptr:", bucket_it->gptr);
1014  // Detach bucket from global memory region and deallocate its local
1015  // memory segment:
1016  if (bucket_it->attached) {
1017  _allocator.deallocate(bucket_it->gptr, bucket_it->allocated_size);
1018  num_detached_elem += bucket_it->size;
1019  bucket_it->attached = false;
1020  }
1021  }
1022  _detach_buckets.clear();
1023  DASH_LOG_TRACE("GlobHeapMem.commit_detach >",
1024  "globally deallocated elements:", num_detached_elem);
1025  return num_detached_elem;
1026  }
1027 
1031  size_type commit_attach()
1032  {
1033  DASH_LOG_TRACE("GlobHeapMem.commit_attach()");
1034  DASH_LOG_TRACE("GlobHeapMem.commit_attach",
1035  "local buckets to attach:", _num_attach_buckets.local[0]);
1036  // Unregister buckets marked for detach in global memory:
1037  _num_attach_buckets.barrier();
1038  // Minumum and maximum number of buckets to be attached by any unit:
1039  auto min_max_attach = gather_min_max(_num_attach_buckets.begin(),
1040  _num_attach_buckets.end());
1041  auto min_attach_buckets = min_max_attach.first;
1042  auto max_attach_buckets = min_max_attach.second;
1043  DASH_LOG_TRACE("GlobHeapMem.commit_attach",
1044  "min. attach buckets:", min_attach_buckets);
1045  DASH_LOG_TRACE("GlobHeapMem.commit_attach",
1046  "max. attach buckets:", max_attach_buckets);
1047  // Number of buckets successfully attached in this commit:
1048  size_type num_attached_buckets = 0;
1049  // Number of elements allocated in global memory in this commit:
1050  size_type num_attached_elem = 0;
1051  // Number of elements at remote units before the commit:
1052  size_type old_remote_size = _remote_size;
1053  _remote_size = update_remote_size();
1054  // Whether at least one remote unit needs to attach additional global
1055  // memory:
1056  bool has_remote_attach = _remote_size > old_remote_size;
1057  DASH_LOG_TRACE_VAR("GlobHeapMem.commit_attach", old_remote_size);
1058  DASH_LOG_TRACE_VAR("GlobHeapMem.commit_attach", _remote_size);
1059  DASH_LOG_TRACE_VAR("GlobHeapMem.commit_attach", size());
1060  DASH_LOG_TRACE_VAR("GlobHeapMem.commit_attach", has_remote_attach);
1061  // Plausibility check:
1062  DASH_ASSERT(!has_remote_attach || max_attach_buckets > 0);
1063 
1064  // Attach local unattached buckets in global memory space.
1065  // As bucket sizes differ between units, units must collect gptr's
1066  // (dart_gptr_t) and size of buckets attached by other units and store
1067  // them locally so a remote unit's local index can be mapped to the
1068  // remote unit's bucket.
1069  if (min_attach_buckets == 0 && max_attach_buckets == 0) {
1070  DASH_LOG_TRACE("GlobHeapMem.commit_attach", "no attach");
1071  DASH_ASSERT(_attach_buckets_first == _buckets.end());
1072  DASH_ASSERT(_buckets.empty() || _buckets.back().attached);
1073  }
1074  DASH_LOG_TRACE("GlobHeapMem.commit_attach", "attaching",
1075  std::distance(_attach_buckets_first, _buckets.end()),
1076  "buckets");
1077  for (; _attach_buckets_first != _buckets.end(); ++_attach_buckets_first) {
1078  bucket_type & bucket = *_attach_buckets_first;
1079  DASH_ASSERT(!bucket.attached);
1080  DASH_LOG_TRACE("GlobHeapMem.commit_attach", "attaching bucket");
1081  DASH_LOG_TRACE_VAR("GlobHeapMem.commit_attach", bucket.size);
1082  DASH_LOG_TRACE_VAR("GlobHeapMem.commit_attach", bucket.lptr);
1083  // Attach bucket's local memory segment in global memory:
1084  bucket.gptr = _allocator.attach(bucket.lptr, bucket.size);
1085  bucket.attached = true;
1086  DASH_LOG_TRACE("GlobHeapMem.commit_attach", "attached bucket:",
1087  "gptr:", bucket.gptr);
1088  num_attached_elem += bucket.size;
1089  _num_attach_buckets.local[0] -= 1;
1090  num_attached_buckets++;
1091  }
1092  DASH_ASSERT(_attach_buckets_first == _buckets.end());
1093  // All units must attach the same number of buckets collectively.
1094  // Attach empty buckets if this unit attached less than the maximum
1095  // number of buckets attached by any other unit in this commit:
1096  DASH_LOG_TRACE("GlobHeapMem.commit_attach",
1097  "local buckets attached:", num_attached_buckets);
1098  DASH_LOG_TRACE("GlobHeapMem.commit_attach",
1099  "buckets required to attach:", max_attach_buckets);
1100  while (num_attached_buckets < max_attach_buckets) {
1101  DASH_LOG_TRACE("GlobHeapMem.commit_attach", "attaching null bucket");
1102  bucket_type bucket;
1103  bucket.size = 0;
1104  bucket.allocated_size = 0;
1105  bucket.lptr = _allocator.allocate_local(0);
1106  bucket.gptr = _allocator.attach(bucket.lptr, bucket.size);
1107  bucket.attached = true;
1108  DASH_ASSERT(!DART_GPTR_ISNULL(bucket.gptr));
1109  _buckets.push_back(bucket);
1110  num_attached_buckets++;
1111  DASH_LOG_TRACE("GlobHeapMem.commit_attach", "attached null bucket:",
1112  "gptr:", bucket.gptr,
1113  "left:", max_attach_buckets - num_attached_buckets);
1114  }
1115  DASH_LOG_TRACE("GlobHeapMem.commit_attach >",
1116  "globally allocated elements:", num_attached_elem);
1117  return num_attached_elem;
1118  }
1119 
1120  template<
1121  typename GlobalIt,
1122  typename ValueType = typename GlobalIt::value_type >
1123  std::pair<ValueType, ValueType> gather_min_max(
1124  const GlobalIt & first,
1125  const GlobalIt & last)
1126  {
1127  // TODO: Unoptimized, use dash::min_max_element once it is available
1128  //
1129  DASH_LOG_TRACE("GlobHeapMem.gather_min_max()");
1130  std::vector<ValueType> lcopy(dash::distance(first, last));
1131  auto lcopy_end = dash::copy(first, last, lcopy.data());
1132  auto min_lptr = std::min_element(lcopy.data(), lcopy_end);
1133  auto max_lptr = std::max_element(lcopy.data(), lcopy_end);
1134  std::pair<ValueType, ValueType> min_max;
1135  min_max.first = *min_lptr;
1136  min_max.second = *max_lptr;
1137  DASH_LOG_TRACE("GlobHeapMem.gather_min_max >",
1138  "min:", min_max.first,
1139  "max:", min_max.second);
1140  return min_max;
1141  }
1142 
1147  size_type update_remote_size()
1148  {
1149  // This function updates local snapshots of the remote unit's local
1150  // sizes.
1151  // The following members are updated:
1152  //
1153  // _remote_size:
1154  // The sum of all remote units' local size, including the sizes of
1155  // unattached buckets.
1156  //
1157  // _bucket_cumul_sizes:
1158  // An array mapping units to a list of their cumulative bucket sizes
1159  // (i.e. postfix sum) which is required to iterate over the
1160  // non-contigous global dynamic memory space.
1161  // For example, if unit 2 allocated buckets with sizes 1, 3 and 5,
1162  // _bucket_cumul_sizes[2] is a list { 1, 4, 9 }.
1163  //
1164  // Outline:
1165  //
1166  // 1. Create local copy of the distributed array _num_attach_buckets that
1167  // contains the number of unattached buckets of every unit.
1168  // 2. Temporarily attach an array attach_bucket_sizes in global memory
1169  // that contains the sizes of this unit's unattached buckets.
1170  // 3. At this point, every unit published the number of buckets it will
1171  // attach in the next commit, and their sizes.
1172  // The current local size Lu of every unit, including its unattached
1173  // buckets, is stored in _local_sizes.
1174  // 4. For every remote unit u:
1175  // - If unit u has one unattached bucket, append the unit's current
1176  // local size Lu to the unit's list of cumulative bucket sizes.
1177  // - If unit u has more than one unattached bucket, the sizes of the
1178  // single buckets must be retrieved from the vector
1179  // attach_bucket_sizes temporarily attached by u in step 1.
1180  // 5. Detach vector attach_bucket_sizes.
1181 
1182  DASH_LOG_TRACE("GlobHeapMem.update_remote_size()");
1183  size_type new_remote_size = 0;
1184  // Number of unattached buckets of every unit:
1185  std::vector<size_type> num_unattached_buckets(_nunits, 0);
1186  _num_attach_buckets.barrier();
1187  dash::copy(_num_attach_buckets.begin(), _num_attach_buckets.end(),
1188  num_unattached_buckets.data());
1189 
1190 #ifdef DASH_ENABLE_TRACE_LOGGING
1191  std::for_each(std::begin(_num_attach_buckets),
1192  std::end(_num_attach_buckets),
1193  [](size_type const& bsz) {
1194  DASH_LOG_TRACE("GlobMemHeap.update_remote_size()",
1195  "num_buckets at unit: ", bsz);
1196  });
1197 #endif
1198 
1199  std::vector<size_type> attach_buckets_sizes;
1200  std::vector<size_type> displs;
1201  std::vector<size_type> team_unattached_bucket_sizes;
1202 
1203  auto atLeast2 = std::find_if(num_unattached_buckets.begin(),
1204  num_unattached_buckets.end(),
1205  [](size_type const& sz) { return sz > 1; });
1206 
1207  //At least one remote unit has more than two buckets to attach
1208  //so we make all to all communication
1209  if (atLeast2 != num_unattached_buckets.end()) {
1210  // Attach array of local unattached bucket sizes to allow remote units
1211  // to
1212  // query the sizes of this unit's unattached buckets.
1213  for (auto bit = _attach_buckets_first; bit != _buckets.end(); ++bit) {
1214  attach_buckets_sizes.push_back(bit->size);
1215  }
1216 
1217  DASH_ASSERT(attach_buckets_sizes.size() ==
1218  _num_attach_buckets.local[0]);
1219  DASH_LOG_TRACE_VAR("GlobHeapMem.update_remote_size",
1220  attach_buckets_sizes);
1221 
1222  dash::dart_storage<size_type> ds(attach_buckets_sizes.size());
1223 
1224  // Accumulate number of unattached buckets of each unit
1225  auto const n_team_unattached_buckets =
1226  std::accumulate(std::begin(num_unattached_buckets),
1227  std::end(num_unattached_buckets), 0);
1228 
1229  team_unattached_bucket_sizes.reserve(n_team_unattached_buckets);
1230 
1231  displs.reserve(_team->size());
1232  displs[0] = 0;
1233 
1234  //calculate the displs of each unit
1235  std::partial_sum(std::begin(num_unattached_buckets),
1236  // We stop at last element
1237  std::end(num_unattached_buckets) - 1,
1238  // We start at offset 1, since disp[0] = 0
1239  std::begin(displs) + 1);
1240 
1241  DASH_ASSERT_RETURNS(dart_allgatherv(
1242  // array of locally unattached bucket sizes
1243  attach_buckets_sizes.data(),
1244  // number of locally unattached buckets
1245  ds.nelem,
1246  // DART Datatype
1247  ds.dtype,
1248  // receive buffer
1249  team_unattached_bucket_sizes.data(),
1250  // receive counts
1251  num_unattached_buckets.data(),
1252  // receive displs
1253  displs.data(),
1254  // DART Team
1255  _team->dart_id()),
1256  DART_OK);
1257  }
1258 
1259  for (size_type u = 0; u < _nunits; ++u) {
1260  if (u == _myid) {
1261  continue;
1262  }
1263  DASH_LOG_TRACE("GlobHeapMem.update_remote_size",
1264  "collecting local bucket sizes of unit", u);
1265  // Last known local attached capacity of remote unit:
1266  auto& u_bucket_cumul_sizes = _bucket_cumul_sizes[u];
1267  // Request current locally allocated capacity of remote unit:
1268  size_type u_local_size_old = u_bucket_cumul_sizes.size() == 0
1269  ? 0
1270  : u_bucket_cumul_sizes.back();
1271  size_type u_local_size_new = _local_sizes[u];
1272  DASH_LOG_TRACE_VAR("GlobHeapMem.update_remote_size",
1273  u_local_size_old);
1274  DASH_LOG_TRACE_VAR("GlobHeapMem.update_remote_size",
1275  u_local_size_old);
1276  difference_type u_local_size_diff = u_local_size_new - u_local_size_old;
1277  new_remote_size += u_local_size_new;
1278  // Number of unattached buckets of unit u:
1279  size_type u_num_attach_buckets = num_unattached_buckets[u];
1280  DASH_LOG_TRACE_VAR("GlobHeapMem.update_remote_size",
1281  u_num_attach_buckets);
1282  if (u_num_attach_buckets == 1) {
1283  // One unattached bucket at unit u, no need to request single bucket
1284  // sizes:
1285  u_bucket_cumul_sizes.push_back(u_local_size_new);
1286  }
1287  else if (u_num_attach_buckets > 1) {
1288  auto const u_end = displs[u] + u_num_attach_buckets;
1289  for (int bi = displs[u]; bi < u_end; ++bi) {
1290  size_type single_bkt_size = team_unattached_bucket_sizes[bi];
1291  size_type cumul_bkt_size = single_bkt_size;
1292  DASH_LOG_TRACE_VAR("GlobHeapMem.update_remote_size",
1293  single_bkt_size);
1294  if (u_bucket_cumul_sizes.size() > 0) {
1295  cumul_bkt_size += u_bucket_cumul_sizes.back();
1296  }
1297  u_bucket_cumul_sizes.push_back(cumul_bkt_size);
1298  }
1299  }
1300  // Local memory space of unit shrunk:
1301  if (u_local_size_diff < 0 && u_bucket_cumul_sizes.size() > 0) {
1302  u_bucket_cumul_sizes.back() += u_local_size_diff;
1303  }
1304  }
1305 
1306  _team->barrier();
1307 #if DASH_ENABLE_TRACE_LOGGING
1308  for (size_type u = 0; u < _nunits; ++u) {
1309  DASH_LOG_TRACE("GlobHeapMem.update_remote_size",
1310  "unit", u,
1311  "cumulative bucket sizes:", _bucket_cumul_sizes[u]);
1312  }
1313 #endif
1314  DASH_LOG_TRACE("GlobHeapMem.update_remote_size >", new_remote_size);
1315  _remote_size = new_remote_size;
1316  return _remote_size;
1317  }
1318 
1322  dart_gptr_t dart_gptr_at(
1324  team_unit_t unit,
1326  index_type bucket_index,
1328  index_type bucket_phase) const
1329  {
1330  DASH_LOG_DEBUG("GlobHeapMem.dart_gptr_at(u,bi,bp)",
1331  unit, bucket_index, bucket_phase);
1332  if (_nunits == 0) {
1333  DASH_THROW(dash::exception::RuntimeError, "No units in team");
1334  }
1335  // Get the referenced bucket's dart_gptr:
1336  auto bucket_it = _buckets.begin();
1337  std::advance(bucket_it, bucket_index);
1338  auto dart_gptr = bucket_it->gptr;
1339  DASH_LOG_TRACE_VAR("GlobHeapMem.dart_gptr_at", bucket_it->attached);
1340  DASH_LOG_TRACE_VAR("GlobHeapMem.dart_gptr_at", bucket_it->gptr);
1341  if (unit == _myid) {
1342  DASH_LOG_TRACE_VAR("GlobHeapMem.dart_gptr_at", bucket_it->lptr);
1343  DASH_LOG_TRACE_VAR("GlobHeapMem.dart_gptr_at", bucket_it->size);
1344  DASH_ASSERT_LT(bucket_phase, bucket_it->size,
1345  "bucket phase out of bounds");
1346  }
1347  if (DART_GPTR_ISNULL(dart_gptr)) {
1348  DASH_LOG_TRACE("GlobHeapMem.dart_gptr_at",
1349  "bucket.gptr is DART_GPTR_NULL");
1351  } else {
1352  // Move dart_gptr to unit and local offset:
1353  DASH_ASSERT_RETURNS(
1354  dart_gptr_setunit(&dart_gptr, unit),
1355  DART_OK);
1356  DASH_ASSERT_RETURNS(
1357  dart_gptr_incaddr(
1358  &dart_gptr,
1359  bucket_phase * sizeof(value_type)),
1360  DART_OK);
1361  }
1362  DASH_LOG_DEBUG("GlobHeapMem.dart_gptr_at >", dart_gptr);
1363  return dart_gptr;
1364  }
1365 
1366 }; // class GlobHeapMem
1367 
1368 } // namespace dash
1369 
1370 #endif // DASH__GLOB_HEAP_H__INCLUDED
constexpr size_type size() const noexcept
Number of array elements in local memory.
Definition: Array.h:209
const_pointer end() const noexcept
Global pointer of the initial address of the global memory.
Definition: GlobHeapMem.h:822
constexpr auto end(RangeType &&range) -> decltype(std::forward< RangeType >(range).end())
Definition: Range.h:98
This class is a simple memory pool which holds allocates elements of size ValueType.
Definition: AllOf.h:8
Signals success.
Definition: dart_types.h:33
RandomAccessIt::difference_type distance(const RandomAccessIt &first, const RandomAccessIt &last)
Resolve the number of elements between two iterators.
Definition: Iterator.h:90
constexpr size_type size() const noexcept
Total number of elements in attached memory space, including size of local unattached memory segments...
Definition: GlobHeapMem.h:422
const_pointer at(team_unit_t unit, IndexT local_index) const
Resolve the global iterator referencing an element position in a unit&#39;s local memory.
Definition: GlobHeapMem.h:934
const_local_pointer & lend() const noexcept
Native pointer of the initial address of the local memory of the unit that initialized this GlobHeapM...
Definition: GlobHeapMem.h:858
void put_value(const ValueType &newval, index_type global_index)
Write value to global memory at given offset.
Definition: GlobHeapMem.h:869
constexpr auto begin(RangeType &&range) -> decltype(std::forward< RangeType >(range).begin())
Definition: Range.h:89
iterator end() noexcept
Global pointer to the end of the array.
Definition: Array.h:1057
constexpr bool operator!=(const self_t &rhs) const noexcept
Inequality comparison operator.
Definition: GlobHeapMem.h:413
GlobHeapMem(size_type n_local_elem, dash::Team &team)
Constructor, collectively allocates the given number of elements in local memory of every unit in a t...
Definition: GlobHeapMem.h:322
Encapsulates a memory allocation and deallocation strategy of global memory regions distributed acros...
GlobIter find_if(GlobIter first, GlobIter last, UnaryPredicate predicate)
Returns an iterator to the first element in the range [first,last) that satisfies the predicate p...
Definition: Find.h:104
void resize(size_type num_elements)
Resize capacity of local segment of global memory region to the given number of elements.
Definition: GlobHeapMem.h:783
Returns second operand.
Definition: Operation.h:201
void commit()
Commit changes of local memory region to global memory space.
Definition: GlobHeapMem.h:739
GlobHeapMem(size_type n_local_elem, LocalMemorySpace *r)
Constructor, collectively allocates the given number of elements in local memory of every unit in das...
Definition: GlobHeapMem.h:339
local_pointer & lend() noexcept
Native pointer of the initial address of the local memory of the unit that initialized this GlobHeapM...
Definition: GlobHeapMem.h:849
pointer begin() noexcept
Global pointer of the initial address of the global memory.
Definition: GlobHeapMem.h:798
size_t size() const
The number of units in this team.
Definition: Team.h:498
constexpr bool operator==(const self_t &rhs) const noexcept
Equality comparison operator.
Definition: GlobHeapMem.h:400
global_allocation_policy
const ElementType * min_element(const ElementType *l_range_begin, const ElementType *l_range_end, Compare compare=std::less< const ElementType &>())
Finds an iterator pointing to the element with the smallest value in the range [first,last).
Definition: MinMax.h:47
MemSpace memory_space_type
The Memory Space Type.
dart_gptr_t dart_gptr() const
Explicit conversion to dart_gptr_t.
Definition: GlobHeapPtr.h:236
Global memory region with dynamic size.
Definition: GlobHeapMem.h:204
#define DART_GPTR_NULL
A NULL global pointer.
Definition: dart_globmem.h:105
A Team instance specifies a subset of all available units.
Definition: Team.h:41
all units allocate invdividually in local memory and synchronize in epochs
local_pointer grow(size_type num_elements)
Increase capacity of local segment of global memory region by the given number of elements...
Definition: GlobHeapMem.h:485
size_type local_size(team_unit_t unit) const
Number of elements in local memory space of given unit.
Definition: GlobHeapMem.h:441
void put_value(const T &newval, const GlobPtrType &gptr)
Write a value to a global pointer.
Definition: Onesided.h:214
pointer at(team_unit_t unit, IndexT local_index)
Resolve the global iterator referencing an element position in a unit&#39;s local memory.
Definition: GlobHeapMem.h:913
void shrink(size_type num_elements)
Decrease capacity of local segment of global memory region by the given number of elements...
Definition: GlobHeapMem.h:553
Iterator on global buckets.
Definition: GlobHeapMem.h:32
DART Global pointer type.
Definition: dart_globmem.h:77
dart_ret_t dart_barrier(dart_team_t team)
DART Equivalent to MPI_Barrier.
#define DART_GPTR_ISNULL(gptr_)
Test for NULL global pointer.
Definition: dart_globmem.h:118
A distributed array.
Definition: Array.h:89
constexpr size_type local_size() const noexcept
Number of elements in local memory space.
Definition: GlobHeapMem.h:430
const_local_pointer lbegin() const noexcept
Native pointer of the initial address of the local memory of the unit that initialized this GlobHeapM...
Definition: GlobHeapMem.h:840
struct dash::unit_id< dash::local_unit, dart_team_unit_t > team_unit_t
Unit ID to use for team-local IDs.
Definition: Types.h:319
void barrier() const
Synchronize all units associated with this global memory instance.
Definition: GlobHeapMem.h:901
int16_t dart_team_t
Data type for storing a team ID.
Definition: dart_types.h:252
void barrier() const
Establish a barrier for all units operating on the array, publishing all changes to all units...
Definition: Array.h:1254
OutputIt copy(InputIt in_first, InputIt in_last, OutputIt out_first)
Copies the elements in the range, defined by [in_first, in_last), to another range beginning at out_f...
GlobIter max_element(const GlobIter &first, const GlobIter &last, Compare compare=Compare())
Finds an iterator pointing to the element with the greatest value in the range [first,last).
Definition: MinMax.h:332
~GlobHeapMem()
Destructor, collectively frees underlying global memory.
Definition: GlobHeapMem.h:380
static Team & Null()
The invariant Team instance representing an undefined team.
Definition: Team.h:229
GlobHeapMem(size_type n_local_elem=0)
Constructor, collectively allocates the given number of elements in local memory of every unit in das...
Definition: GlobHeapMem.h:308
#define DART_UNDEFINED_UNIT_ID
Undefined unit ID.
Definition: dart_types.h:160
struct dash::dart_operation ValueType
Reduce operands to their minimum value.
dart_ret_t dart_allgatherv(const void *sendbuf, size_t nsendelem, dart_datatype_t dtype, void *recvbuf, const size_t *nrecvelem, const size_t *recvdispls, dart_team_t teamid)
DART Equivalent to MPI allgatherv.
pointer end() noexcept
Global pointer of the initial address of the global memory.
Definition: GlobHeapMem.h:814
constexpr const_pointer begin() const noexcept
Global pointer of the initial address of the global memory.
Definition: GlobHeapMem.h:806
Convencience wrapper to determine the DART type and number of elements required for the given templat...
Definition: Types.h:295
constexpr dash::Team & team() const noexcept
The team containing all units accessing the global memory space.
Definition: GlobHeapMem.h:465
iterator begin() noexcept
Global pointer to the beginning of the array.
Definition: Array.h:1040
void for_each(const GlobInputIt &first, const GlobInputIt &last, UnaryFunction func)
Invoke a function on every element in a range distributed by a pattern.
Definition: ForEach.h:32
self_t & operator=(const self_t &rhs)=default
Assignment operator.
dash::gptrdiff_t distance(GlobPtr< T, MemSpaceT > gbegin, GlobPtr< T, MemSpaceT > gend)
Returns the number of hops from gbegin to gend.
Definition: GlobPtr.h:547
local_pointer & lbegin() noexcept
Native pointer of the initial address of the local memory of the unit that initialized this GlobHeapM...
Definition: GlobHeapMem.h:831
void get_value(T *ptr, const GlobPtrType &gptr)
Read a value fom a global pointer.
Definition: Onesided.h:230
dart_team_t dart_id() const
Index of this team relative to global team dash::Team::All().
Definition: Team.h:522
void get_value(ValueType *ptr, index_type global_index) const
Read value from global memory at given offset.
Definition: GlobHeapMem.h:885
local_type local
Local proxy object, allows use in range-based for loops.
Definition: Array.h:732