crawlserv++  [under development]
Application for crawling and analyzing textual content of websites.
Thread.hpp
Go to the documentation of this file.
1 /*
2  *
3  * ---
4  *
5  * Copyright (C) 2022 Anselm Schmidt (ans[ät]ohai.su)
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version in addition to the terms of any
11  * licences already herein identified.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program. If not, see <https://www.gnu.org/licenses/>.
20  *
21  * ---
22  *
23  * Thread.hpp
24  *
25  * Interface for a thread which implements all module-independent thread functionality
26  * like connecting to the database, managing the thread status (including pausing the thread),
27  * running the thread ticks and catching exceptions thrown by the thread.
28  *
29  * Created on: Oct 10, 2018
30  * Author: ans
31  */
32 
33 #ifndef MODULE_THREAD_HPP_
34 #define MODULE_THREAD_HPP_
35 
36 // do not catch thread exceptions: use only for debugging!
37 //#define MODULE_THREAD_DEBUG_NOCATCH
38 
39 #include "Database.hpp"
40 
41 #include "../Helper/DateTime.hpp"
42 #include "../Main/Database.hpp"
43 #include "../Main/Exception.hpp"
44 #include "../Struct/ModuleOptions.hpp"
45 #include "../Struct/ThreadOptions.hpp"
46 #include "../Struct/ThreadStatus.hpp"
47 #include "../Wrapper/DatabaseLock.hpp"
48 
49 #include <atomic> // std::atomic
50 #include <chrono> // std::chrono
51 #include <cmath> // std::lround
52 #include <condition_variable> // std::condition_variable
53 #include <cstdint> // std::uint8_t, std::int64_t, std::uint64_t
54 #include <exception> // std::exception
55 #include <iostream> // std::cout, std::flush
56 #include <mutex> // std::lock_guard, std::mutex, std::unique_lock
57 #include <queue> // std::queue
58 #include <string> // std::string
59 #include <string_view> // std::string_view, std::string_view_literals
60 #include <thread> // std::this_thread, std::thread
61 #include <utility> // std::swap
62 
63 namespace crawlservpp::Module {
64 
65  /*
66  * CONSTANTS
67  */
68 
69  using std::string_view_literals::operator""sv;
70 
73 
75  inline constexpr auto sleepOnConnectionErrorS{10};
76 
78  inline constexpr auto sleepMs{800};
79 
81  inline constexpr auto statusPrefixInterrupted{"INTERRUPTED "sv};
82 
84  inline constexpr auto statusPrefixPaused{"PAUSED "sv};
85 
87 
88  /*
89  * DECLARATION
90  */
91 
93  class Thread {
94  // for convenience
96 
100 
102 
103  public:
106 
107  Thread(
108  Main::Database& dbBase,
109  const ThreadOptions& threadOptions,
110  const ThreadStatus& threadStatus
111  );
112 
113  Thread(
114  Main::Database& dbBase,
115  const ThreadOptions& threadOptions
116  );
117 
119  virtual ~Thread() = default;
120 
124 
125  std::uint64_t getId() const;
126  std::uint64_t getWebsite() const;
127  std::uint64_t getUrlList() const;
128  std::uint64_t getConfig() const;
129  bool isShutdown() const;
130  bool isRunning() const;
131  bool isFinished() const;
132  bool isPaused() const;
133 
137 
138  void start();
139  bool pause();
140  void unpause();
141  void stop();
142  void interrupt();
143  void end();
144  void reset();
145 
149 
150  void warpTo(std::uint64_t target);
151 
153 
156 
160 
163  Thread(Thread&) = delete;
164 
166  Thread& operator=(Thread&) = delete;
167 
169  Thread(Thread&&) = delete;
170 
172  Thread& operator=(Thread&&) = delete;
173 
175 
176  protected:
179 
182 
186 
188  std::string websiteNamespace;
189 
191  std::string urlListNamespace;
192 
194 
197  std::string configuration;
198 
202 
203  bool isInterrupted() const;
204  std::string getStatusMessage() const;
205  float getProgress() const;
206  std::uint64_t getLast() const;
207  std::int64_t getWarpedOverAndReset();
208 
212 
213  void setStatusMessage(const std::string& statusMessage);
214  void setProgress(float newProgress);
215  void setLast(std::uint64_t lastId);
216  void incrementLast();
217  void incrementProcessed();
218 
222 
223  void sleep(std::uint64_t ms) const;
224  void allowPausing();
225  void disallowPausing();
226  void pauseByThread();
227 
231 
232  bool isLogLevel(std::uint8_t level) const;
233  void log(std::uint8_t level, const std::string& logEntry);
234  void log(std::uint8_t level, std::queue<std::string>& logEntries);
235 
239 
241 
245  virtual void onInit() = 0;
246 
248 
252  virtual void onTick() = 0;
253 
255 
259  virtual void onPause() = 0;
260 
262 
266  virtual void onUnpause() = 0;
267 
269 
273  virtual void onClear() = 0;
274 
276 
280  virtual void onReset() = 0;
281 
283 
284  private:
285  Main::Database& databaseClass; // access to the database for the class
286 
287  std::atomic<bool> pausable{true}; // thread is pausable
288  std::atomic<bool> running{true}; // thread is running (or paused)
289  std::atomic<bool> paused{false}; // thread is paused
290  std::atomic<bool> toReset{false}; // thread needs to be reset
291  std::atomic<bool> interrupted{false}; // thread has been interrupted by shutdown
292  std::atomic<bool> terminated{false}; // thread has been terminated due to an exception
293  std::atomic<bool> shutdown{false}; // shutdown in progress
294  std::atomic<bool> finished{false}; // shutdown is finished
295 
296  std::uint64_t id{}; // the ID of the thread in the database
297  std::string module; // the module of the thread (used for logging)
298  ThreadOptions options; // options for the thread
299 
300  std::uint64_t last{}; // last ID for the thread
301  std::atomic<std::uint64_t> overwriteLast{}; // ID to overwrite last ID with ("time travel")
302  std::int64_t warpedOver{}; // no. of IDs that have been warped over (might be negative, ONLY for threads!)
303  std::uint64_t processed{}; // no. of IDs that have been processed
304 
305  std::condition_variable pauseCondition; // condition variable to wait for unpause
306  mutable std::mutex pauseLock; // lock for accessing the condition variable
307 
308  std::string status; // status message of the thread (without pause state)
309  mutable std::mutex statusLock; // lock for accessing the status message
310 
311  float progress{}; // current progress of the thread, in percent
312  mutable std::mutex progressLock; // lock for accessing the current progress
313 
314  std::thread thread; // pointer to the thread
315 
316  // timing statistics (in seconds)
317  std::chrono::steady_clock::time_point startTimePoint{std::chrono::steady_clock::time_point::min()};
318  std::chrono::steady_clock::time_point pauseTimePoint{std::chrono::steady_clock::time_point::min()};
319  std::chrono::duration<std::uint64_t> runTime{std::chrono::duration<std::uint64_t>::zero()};
320  std::chrono::duration<std::uint64_t> pauseTime{std::chrono::duration<std::uint64_t>::zero()};
321 
322  // internal timing functions
323  std::uint64_t getRunTime() const;
324  void updateRunTime();
325  void updatePauseTime();
326 
327  // internal thread functions
328  void init();
329  void tick();
330  void wait();
331  void clear();
332 
333  // pause checker
334  bool isUnpaused() const;
335 
336  // internal helper functions
337  void onEnd();
338  void clearException(const std::exception& e, const std::string& inFunction);
339  void clearException(const std::string& inFunction);
340 
341  // main function
342  void main();
343  };
344 
345 } /* namespace crawlservpp::Module */
346 
347 #endif /* MODULE_THREAD_HPP_ */
bool isInterrupted() const
Checks whether the thread has been interrupted.
Definition: Thread.cpp:458
bool pause()
Pauses the thread.
Definition: Thread.cpp:279
void stop()
Shuts down the thread.
Definition: Thread.cpp:326
virtual void onInit()=0
Initializes the module.
constexpr auto sleepMs
Number of milliseconds to sleep before checking whether the thread is still running.
Definition: Thread.hpp:78
std::string getStatusMessage() const
Gets the current status message.
Definition: Thread.cpp:470
virtual ~Thread()=default
Default destructor.
float getProgress() const
Gets the current progress, in percent.
Definition: Thread.cpp:485
std::uint64_t getUrlList() const
Gets the ID of the URL list used by the thread.
Definition: Thread.cpp:177
Thread status containing its ID, status message, pause state, and progress.
Definition: ThreadStatus.hpp:54
void allowPausing()
Allows the thread to be paused.
Definition: Thread.cpp:693
Class handling database access for threads.
Definition: Database.hpp:91
bool isFinished() const
Checks whether the shutdown of the thread has been finished.
Definition: Thread.cpp:233
std::uint64_t getLast() const
Gets the value of the last ID processed by the thread.
Definition: Thread.cpp:499
bool isRunning() const
Checks whether the thread is still supposed to run.
Definition: Thread.cpp:221
#define MAIN_EXCEPTION_CLASS()
Macro used to easily define classes for general exceptions.
Definition: Exception.hpp:50
virtual void onReset()=0
Resets the module.
Thread options containing the name of the module run, as well as the IDs of the website, URL list, and configuration used.
Definition: ThreadOptions.hpp:40
Abstract class providing module-independent thread functionality.
Definition: Thread.hpp:93
constexpr auto sleepOnConnectionErrorS
Number of seconds to sleep on connection errors.
Definition: Thread.hpp:75
bool isLogLevel(std::uint8_t level) const
Checks whether a certain logging level is enabled.
Definition: Thread.cpp:741
void unpause()
Unpauses the thread.
Definition: Thread.cpp:302
Class handling database access for the command-and-control and its threads.
Definition: Database.hpp:366
virtual void onClear()=0
Clears the module.
Module options containing the thread ID, as well as ID and namespace of website and URL list used by ...
Definition: ModuleOptions.hpp:40
std::string urlListNamespace
Namespace of the URL list used by the thread.
Definition: Thread.hpp:191
bool isPaused() const
Checks whether the thread has been paused.
Definition: Thread.cpp:245
void incrementProcessed()
Increments the number of IDs processed by the thread.
Definition: Thread.cpp:648
std::uint64_t getConfig() const
Gets the ID of the configuration used by the thread.
Definition: Thread.cpp:192
std::uint64_t getWebsite() const
Gets the ID of the website used by the thread.
Definition: Thread.cpp:163
void pauseByThread()
Forces the thread to pause.
Definition: Thread.cpp:712
void interrupt()
Interrupts the thread due to an exception.
Definition: Thread.cpp:359
virtual void onUnpause()=0
Unpauses the module.
void log(std::uint8_t level, const std::string &logEntry)
Adds a thread-specific log entry to the database, if the current logging level is high enough...
Definition: Thread.cpp:779
std::string websiteNamespace
Namespace of the website used by the thread.
Definition: Thread.hpp:188
void sleep(std::uint64_t ms) const
Lets the thread sleep for the specified number of milliseconds.
Definition: Thread.cpp:667
Thread & operator=(Thread &)=delete
Deleted copy assignment operator.
void setStatusMessage(const std::string &statusMessage)
Sets the status message of the thread.
Definition: Thread.cpp:544
std::int64_t getWarpedOverAndReset()
Gets the number of IDs that have been jumped over, and resets them.
Definition: Thread.cpp:517
void warpTo(std::uint64_t target)
Jumps to the specified target ID ("time travel").
Definition: Thread.cpp:433
void incrementLast()
Increments the last ID processed by the thread.
Definition: Thread.cpp:633
void reset()
Will reset the thread before the next tick.
Definition: Thread.cpp:409
void start()
Starts running the thread.
Definition: Thread.cpp:257
Thread(Main::Database &dbBase, const ThreadOptions &threadOptions, const ThreadStatus &threadStatus)
Constructor initializing a previously interrupted thread.
Definition: Thread.cpp:52
Class for database connection exceptions.
Definition: Database.hpp:749
void setLast(std::uint64_t lastId)
Sets the last ID processed by the thread.
Definition: Thread.cpp:611
bool isShutdown() const
Checks whether the thread is shutting down or has shut down.
Definition: Thread.cpp:207
constexpr auto statusPrefixInterrupted
Status message prefix for interrupted threads.
Definition: Thread.hpp:81
constexpr auto statusPrefixPaused
Status message prefix for paused threads.
Definition: Thread.hpp:84
std::uint64_t getId() const
Gets the ID of the thread.
Definition: Thread.cpp:149
virtual void onPause()=0
Pauses the module.
void disallowPausing()
Disallows the thread to be paused.
Definition: Thread.cpp:703
void setProgress(float newProgress)
Sets the progress of the thread.
Definition: Thread.cpp:580
virtual void onTick()=0
Performs a module tick.
Namespace for the different modules run by threads.
Template class for safe in-scope database locks.
Definition: DatabaseLock.hpp:54
void end()
Waits for the thread until shutdown is completed.
Definition: Thread.cpp:390
std::string configuration
JSON string of the configuration used by the thread.
Definition: Thread.hpp:197
Database database
Database connection for the thread.
Definition: Thread.hpp:181