crawlserv++  [under development]
Application for crawling and analyzing textual content of websites.
Thread.hpp
Go to the documentation of this file.
1 /*
2  *
3  * ---
4  *
5  * Copyright (C) 2022 Anselm Schmidt (ans[ät]ohai.su)
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version in addition to the terms of any
11  * licences already herein identified.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program. If not, see <https://www.gnu.org/licenses/>.
20  *
21  * ---
22  *
23  * Thread.hpp
24  *
25  * Abstract implementation of the Thread interface
26  * for analyzer threads to be inherited by the algorithm class.
27  *
28  * Created on: Oct 11, 2018
29  * Author: ans
30  */
31 
32 #ifndef MODULE_ANALYZER_THREAD_HPP_
33 #define MODULE_ANALYZER_THREAD_HPP_
34 
35 #include "Config.hpp"
36 #include "Database.hpp"
37 
38 #include "../Config.hpp"
39 #include "../Thread.hpp"
40 
41 #include "../../Data/Corpus.hpp"
42 #include "../../Data/Data.hpp"
43 #include "../../Helper/CommaLocale.hpp"
44 #include "../../Helper/DateTime.hpp"
45 #include "../../Helper/Json.hpp"
46 #include "../../Helper/Memory.hpp"
47 #include "../../Main/Exception.hpp"
48 #include "../../Network/FTPUpload.hpp"
49 #include "../../Query/Container.hpp"
50 #include "../../Struct/CorpusProperties.hpp"
51 #include "../../Struct/QueryProperties.hpp"
52 #include "../../Struct/QueryStruct.hpp"
53 #include "../../Struct/StatusSetter.hpp"
54 #include "../../Struct/ThreadOptions.hpp"
55 #include "../../Struct/ThreadStatus.hpp"
56 #include "../../Timer/Simple.hpp"
57 
58 #include "../../_extern/rapidjson/include/rapidjson/document.h"
59 
60 #include <algorithm> // std::all_of, std::any_of, std::remove_if
61 #include <chrono> // std::chrono
62 #include <cstddef> // std::size_t
63 #include <cstdint> // std::uint64_t
64 #include <map> // std::map
65 #include <queue> // std::queue
66 #include <sstream> // std::ostringstream
67 #include <stdexcept> // std::logic_error, std::runtime_error
68 #include <string> // std::string
69 #include <string_view> // std::string_view
70 #include <utility> // std::swap
71 #include <vector> // std::vector
72 
74 
77 
79  constexpr auto combineUpdateStatusEvery{100000};
80 
82 
84  class Thread : public Module::Thread, public Query::Container, protected Config {
85  // for convenience
86  using Corpus = Data::Corpus;
87 
94 
95  public:
98 
99  Thread(
100  Main::Database& dbBase,
101  const ThreadOptions& threadOptions,
102  const ThreadStatus& threadStatus
103  );
104 
105  Thread(Main::Database& dbBase, const ThreadOptions& threadOptions);
106 
108 
109  protected:
112 
115 
119 
121  std::vector<Corpus> corpora;
122 
126 
128 
135  virtual std::string_view getName() const = 0;
136 
140 
141  void onInit() override;
142  void onTick() override;
143  void onPause() override;
144  void onUnpause() override;
145  void onClear() override;
146  void onReset() override;
147 
151 
152  void initQueries() override;
153  void deleteQueries() override;
154  void addOptionalQuery(std::uint64_t queryId, QueryStruct& propertiesTo);
155  void addQueries(
156  const std::vector<std::uint64_t>& queryIds,
157  std::vector<QueryStruct>& propertiesTo
158  );
159 
163 
165 
174  virtual void onAlgoInitTarget() = 0;
175 
177 
182  virtual void onAlgoInit() = 0;
183 
185 
190  virtual void onAlgoTick() = 0;
191 
193 
198  virtual void onAlgoPause() = 0;
199 
201 
206  virtual void onAlgoUnpause() = 0;
207 
209 
214  virtual void onAlgoClear() = 0;
215 
219 
220  void finished();
221  void pause();
222 
226 
227  [[nodiscard]] std::string getTargetTableName() const;
228  bool addCorpora(bool isCombine, StatusSetter& statusSetter);
229  void checkCorpusSources(StatusSetter& statusSetter);
230 
234 
235  void uploadResult();
236  void cleanUpCorpora();
237  void cleanUpQueries();
238 
240 
243 
244  private:
245  // queries
246  /*
247  * make sure to initialize AND delete them!
248  * -> setUpQueries(), cleanUpQueries()
249  */
250  std::vector<QueryStruct> queryFilterQueries;
251 
252  // restart timer
253  std::chrono::time_point<std::chrono::steady_clock> idleStart{};
254 
255  // initialization functions
256  void setUpConfig(std::queue<std::string>& warningsTo);
257  void setUpLogging();
258  void setUpDatabase();
259  void setUpTarget();
260  void setUpSqlStatements();
261  void setUpQueries();
262  void setUpAlgorithm();
263  void logWarnings(std::queue<std::string>& warnings);
264 
265  // internal helper functions
266  void addCorpus(std::size_t index, StatusSetter& statusSetter);
267  void combineCorpora(StatusSetter& statusSetter);
268  void filterCorpusByQuery(std::size_t index, StatusSetter& statusSetter);
269 
270  // internal static helper template
271  template<typename Allocator>
272  [[nodiscard]] static rapidjson::Value createJSONValue(
273  Data::Type type,
274  Data::Value value,
275  const std::string& originalType,
276  Allocator& allocator
277  ) {
278  rapidjson::Value result;
279 
280  switch(type) {
281  case Data::Type::_bool:
282  result.SetBool(value._b);
283 
284  break;
285 
286  case Data::Type::_int32:
287  result.SetInt(value._i32);
288 
289  break;
290 
291  case Data::Type::_uint32:
292  result.SetUint(value._ui32);
293 
294  break;
295 
296  case Data::Type::_int64:
297  result.SetUint(value._i64);
298 
299  break;
300 
301  case Data::Type::_uint64:
302  result.SetUint(value._ui64);
303 
304  break;
305 
306  case Data::Type::_double:
307  result.SetDouble(value._d);
308 
309  break;
310 
311  case Data::Type::_string:
312  result.SetString(value._s, allocator);
313 
314  break;
315 
316  default:
317  throw Thread::Exception("Cannot write unknown data type '" + originalType + "' to JSON");
318  }
319 
320  return result;
321  }
322 
323  // hide other functions not to be used by the thread
324  void start();
325  void unpause();
326  void stop();
327  void interrupt();
328  };
329 
330 } /* namespace crawlservpp::Module::Analyzer */
331 
332 #endif /* MODULE_ANALYZER_THREAD_HPP_ */
32-bit integer.
Definition: Data.hpp:74
bool addCorpora(bool isCombine, StatusSetter &statusSetter)
Gets the contents of all corpora, filters and combines them if necessary.
Definition: Thread.cpp:318
Query properties containing its name, text, type, and result type(s).
Definition: QueryProperties.hpp:39
std::uint32_t _ui32
Unsigned 32-bit integer value.
Definition: Data.hpp:106
Namespace for analyzer classes.
virtual void onAlgoInit()=0
Initializes the algorithm.
virtual void onAlgoTick()=0
Performs an algorithm tick.
virtual void onAlgoUnpause()=0
Unpauses the algorithm.
virtual void onAlgoPause()=0
Pauses the algorithm.
virtual void onAlgoClear()=0
Clears the algorithm.
String.
Definition: Data.hpp:89
Boolean value.
Definition: Data.hpp:71
std::vector< Corpus > corpora
Vector of corpora for the analyzer thread.
Definition: Thread.hpp:121
Query container.
Definition: Container.hpp:76
std::string getTargetTableName() const
Gets the full name of the target table.
Definition: Thread.cpp:294
Abstract class providing thread functionality to algorithm (child) classes.
Definition: Thread.hpp:84
void onTick() override
Performs an algorithm tick.
Definition: Thread.cpp:100
void cleanUpQueries()
Clean up all queries and free their memory.
Definition: Thread.cpp:486
Thread status containing its ID, status message, pause state, and progress.
Definition: ThreadStatus.hpp:54
std::int64_t _i64
64-bit integer value.
Definition: Data.hpp:109
void onReset() override
Resets the algorithm.
Definition: Thread.cpp:163
void onInit() override
Initializes the analyzer, the target table, and the algorithm.
Definition: Thread.cpp:76
#define MAIN_EXCEPTION_CLASS()
Macro used to easily define classes for general exceptions.
Definition: Exception.hpp:50
constexpr auto combineUpdateStatusEvery
The number of tokens after which the status will be updated when combining corpora.
Definition: Thread.hpp:79
virtual std::string_view getName() const =0
Returns the name of the algorithm.
Corpus properties containing the type, table, and column name of its source.
Definition: CorpusProperties.hpp:41
Unsigned 32-bit integer.
Definition: Data.hpp:77
void deleteQueries() override
Does nothing.
Definition: Thread.cpp:190
Thread options containing the name of the module run, as well as the IDs of the website, URL list, and configuration used.
Definition: ThreadOptions.hpp:40
Abstract class providing module-independent thread functionality.
Definition: Thread.hpp:93
Class representing a text corpus.
Definition: Corpus.hpp:165
Class handling database access for the command-and-control and its threads.
Definition: Database.hpp:366
Type
Data types.
Definition: Data.hpp:66
std::uint64_t _ui64
Unsigned 64-bit integer value.
Definition: Data.hpp:112
Class for analyzer exceptions to be used by algorithms.
Definition: Thread.hpp:242
void onPause() override
Pauses the analyzer.
Definition: Thread.cpp:132
Structure containing all the data needed to keep the status of a thread updated.
Definition: StatusSetter.hpp:57
Unsigned 64-bit integer.
Definition: Data.hpp:83
double _d
Floating point value (with double precision).
Definition: Data.hpp:115
Floating point value (with double precision).
Definition: Data.hpp:86
Class providing database functionality for analyzer threads by implementing Wrapper::Database.
Definition: Database.hpp:188
std::string _s
String value.
Definition: Data.hpp:119
Database database
Database connection for the analyzer thread.
Definition: Thread.hpp:114
virtual void onAlgoInitTarget()=0
Initializes the target table for the algorithm.
void checkCorpusSources(StatusSetter &statusSetter)
Checks the specified sources for creating the corpus.
Definition: Thread.cpp:359
std::int32_t _i32
32-bit integer value.
Definition: Data.hpp:103
void onUnpause() override
Unpauses the analyzer.
Definition: Thread.cpp:141
void addOptionalQuery(std::uint64_t queryId, QueryStruct &propertiesTo)
Adds an optional query.
Definition: Thread.cpp:205
void uploadResult()
Upload the specified result via FTP.
Definition: Thread.cpp:374
Structure to identify a query including its type and result type(s).
Definition: QueryStruct.hpp:40
bool _b
Boolean value.
Definition: Data.hpp:100
Abstract configuration for analyzers, to be implemented by algorithm classes.
Definition: Config.hpp:103
64-bit integer.
Definition: Data.hpp:80
Thread(Main::Database &dbBase, const ThreadOptions &threadOptions, const ThreadStatus &threadStatus)
Constructor initializing a previously interrupted analyzer thread.
Definition: Thread.cpp:44
void initQueries() override
Does nothing.
Definition: Thread.cpp:183
A generic value.
Definition: Data.hpp:96
void onClear() override
Clears the algorithm.
Definition: Thread.cpp:150
void addQueries(const std::vector< std::uint64_t > &queryIds, std::vector< QueryStruct > &propertiesTo)
Adds multiple queries at once, ignoring empty ones.
Definition: Thread.cpp:227
void cleanUpCorpora()
Clean up all corpora and free their memory.
Definition: Thread.cpp:481
void pause()
Pauses the thread.
Definition: Thread.cpp:281
void finished()
Sets the status of the analyzer to finished.
Definition: Thread.cpp:257