DASH  0.3.0
InputStream.h
1 #ifndef DASH__IO__HDF5__INPUT_STREAM_H__
2 #define DASH__IO__HDF5__INPUT_STREAM_H__
3 
4 #ifdef DASH_ENABLE_HDF5
5 
6 #include <string>
7 #include <future>
8 
9 #include <dash/Matrix.h>
10 #include <dash/Array.h>
11 
12 namespace dash {
13 namespace io {
14 namespace hdf5 {
15 
24 
25 class InputStream : public ::dash::io::IOSBase<IOStreamMode> {
26  private:
27 
28  using self_t = InputStream;
29 
30  std::string _filename;
31  std::string _dataset;
32  type_converter _converter;
33  hdf5_options _foptions = hdf5_options();
34  bool _use_cust_conv = false;
35  dash::launch _launch_policy;
36 
37  std::vector<std::shared_future<void> > _async_ops;
38 
39  public:
50  InputStream(dash::launch lpolicy, std::string filename)
51  : _filename(filename), _dataset("data"), _launch_policy(lpolicy) {
52  if (_launch_policy == dash::launch::async && !dash::is_multithreaded()) {
53  _launch_policy = dash::launch::sync;
54  DASH_LOG_WARN(
55  "Requested ASIO but DART does not support "
56  "multi-threaded access. Blocking IO is used"
57  "as fallback");
58  }
59  }
60 
79  explicit InputStream(std::string filename)
80  : InputStream(dash::launch::sync, filename) {}
81 
82  ~InputStream() {
83  if (!_async_ops.empty()) {
84  _async_ops.back().wait();
85  }
86  }
87 
88  InputStream() = delete;
89  InputStream(const self_t & other) = delete;
90  InputStream(self_t && other) = default;
91 
92  self_t & operator= (const self_t & other) = delete;
93  self_t & operator= (self_t && other) = default;
94 
100  DASH_LOG_DEBUG("flush input stream", _async_ops.size());
101  if (!_async_ops.empty()) {
102  _async_ops.back().wait();
103  }
104  DASH_LOG_DEBUG("input stream flushed");
105  return *this;
106  }
107 
108  // IO Manipulators
109 
111  friend InputStream& operator>>(InputStream& is, const dataset tbl) {
112  is._dataset = tbl._dataset;
113  return is;
114  }
115 
118  is._foptions.pattern_metadata_key = pk._key;
119  return is;
120  }
121 
124  is._foptions.restore_pattern = rs._restore;
125  return is;
126  }
127 
130  is._converter = conv;
131  is._use_cust_conv = true;
132  return is;
133  }
134 
136  template <typename Container_t>
137  friend InputStream& operator>>(InputStream& is, Container_t& matrix);
138 
139  private:
140  template <typename Container_t>
141  void _load_object_impl(Container_t& container) {
142  if (_use_cust_conv) {
143  StoreHDF::read(container, _filename, _dataset, _foptions, _converter);
144  } else {
145  StoreHDF::read(container, _filename, _dataset, _foptions);
146  }
147  }
148 
149  template <typename Container_t>
150  void _load_object_impl_async(Container_t& container) {
151  auto pos = _async_ops.size();
152 
153  // copy state of stream
154  auto s_filename = _filename;
155  auto s_dataset = _dataset;
156  auto s_foptions = _foptions;
157  auto s_use_cust_conv = _use_cust_conv;
158  type_converter_fun_type s_converter = _converter;
159 
160  // pass pos by value as it might be out of scope when function is called
161  std::shared_future<void> fut = std::async(
162  std::launch::async, [&, pos, s_filename, s_dataset, s_foptions,
163  s_converter, s_use_cust_conv]() {
164  if (pos != 0) {
165  // wait for previous tasks
166  auto last_task = _async_ops[pos - 1];
167  DASH_LOG_DEBUG("waiting for future", pos);
168  last_task.wait();
169  }
170  DASH_LOG_DEBUG("execute async io task");
171 
172  if (s_use_cust_conv) {
173  StoreHDF::read(container, s_filename, s_dataset, s_foptions,
174  s_converter);
175  } else {
176  StoreHDF::read(container, s_filename, s_dataset, s_foptions);
177  }
178  DASH_LOG_DEBUG("execute async io task done");
179  });
180  _async_ops.push_back(fut);
181  }
182 };
183 
184 } // namespace hfd5
185 } // namespace io
186 } // namespace dash
187 
188 #include <dash/io/hdf5/internal/InputStream-inl.h>
189 
190 #endif // DASH_ENABLE_HDF5
191 
192 #endif // DASH__IO__HDF5__INPUT_STREAM_H__
bool is_multithreaded()
Check whether DASH has been initialized with support for multi-threaded access.
This class is a simple memory pool which holds allocates elements of size ValueType.
Definition: AllOf.h:8
Stream manipulator class to set the dash pattern key of the dataset.
Definition: IOManip.h:32
InputStream & flush()
Synchronizes with the data source.
Definition: InputStream.h:99
std::string pattern_metadata_key
Metadata attribute key in HDF5 file.
Definition: StorageDriver.h:69
async launch policy
friend InputStream & operator>>(InputStream &is, setpattern_key pk)
set metadata key at which the pattern will be stored
Definition: InputStream.h:117
friend InputStream & operator>>(InputStream &is, const type_converter conv)
custom type converter function to convert native type to HDF5 type
Definition: InputStream.h:129
bool restore_pattern
Restore pattern from metadata if HDF5 file contains any.
Definition: StorageDriver.h:67
friend InputStream & operator>>(InputStream &is, const dataset tbl)
set name of dataset
Definition: InputStream.h:111
InputStream(std::string filename)
Creates an HDF5 input stream using blocking IO.
Definition: InputStream.h:79
Stream manipulator class to specify the hdf5 dataset.
Definition: IOManip.h:20
InputStream(dash::launch lpolicy, std::string filename)
Creates an HDF5 input stream using a launch policy.
Definition: InputStream.h:50
static std::enable_if< _compatible_pattern< typename Container_t::pattern_type >) &&_is_origin_view< Container_t >), void >::type read(Container_t &matrix, std::string filename, std::string datapath, hdf5_options foptions=hdf5_options(), type_converter_fun_type to_h5_dt_converter=get_h5_datatype< typename Container_t::value_type >)
Read an HDF5 dataset into a dash container using parallel IO if the matrix is already allocated...
synchronous launch policy
friend InputStream & operator>>(InputStream &is, restore_pattern rs)
set whether pattern layout should be restored from metadata
Definition: InputStream.h:123
Converter function to convert non-POT types and especially structs to HDF5 types. ...
Definition: IOManip.h:113
Stream manipulator class to set whether the pattern should be restored from the hdf5 dataset metadata...
Definition: IOManip.h:45
Base type for device-specific IO streams.
Definition: IOStream.h:176
Options which can be passed to dash::io::StoreHDF::write to specify how existing structures are treat...
Definition: StorageDriver.h:56
Type facade wrapping dash::io::IOSBaseMode and its device-dependent specializations.
Definition: IOStream.h:46