33 #ifndef _IDENTT_HTTP_SERVER_BASE_HPP_ 34 #define _IDENTT_HTTP_SERVER_BASE_HPP_ 45 #include <unordered_set> 50 template <
class socket_type>
class HttpServer;
52 template <
class socket_type>
60 class Response :
public std::enable_shared_from_this<Response>,
public std::ostream {
64 std::unique_ptr<asio::streambuf> streambuf = std::unique_ptr<asio::streambuf>(
new asio::streambuf());
66 std::shared_ptr<Session> session;
69 Mutex send_queue_mutex;
70 std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue GUARDED_BY(send_queue_mutex);
72 Response(std::shared_ptr<Session> session_,
long timeout_content) noexcept : std::ostream(
nullptr), session(std::move(session_)), timeout_content(timeout_content)
74 rdbuf(streambuf.get());
77 template <
typename size_type>
78 void write_header(
const CaseInsensitiveMultimap &header, size_type size)
80 bool content_length_written =
false;
81 bool chunked_transfer_encoding =
false;
82 for(
auto &field : header) {
83 if(!content_length_written && case_insensitive_equal(field.first,
"content-length"))
84 content_length_written =
true;
85 else if(!chunked_transfer_encoding && case_insensitive_equal(field.first,
"transfer-encoding") && case_insensitive_equal(field.second,
"chunked"))
86 chunked_transfer_encoding =
true;
88 *
this << field.first <<
": " << field.second <<
"\r\n";
91 *
this <<
"Content-Length: " << size <<
"\r\n\r\n";
96 void send_from_queue() REQUIRES(send_queue_mutex)
98 auto self = this->shared_from_this();
99 asio::async_write(*self->session->connection->socket, *send_queue.begin()->first, [
self](
const error_code &ec, std::size_t ) {
100 auto lock =
self->session->connection->handler_runner->continue_lock();
106 auto it =
self->send_queue.begin();
107 auto callback = std::move(it->second);
108 self->send_queue.erase(it);
109 if(self->send_queue.size() > 0)
110 self->send_from_queue();
118 std::vector<std::function<void(const error_code &)>> callbacks;
119 for(
auto &pair : self->send_queue) {
121 callbacks.emplace_back(std::move(pair.second));
123 self->send_queue.clear();
126 for(
auto &callback : callbacks)
133 void send_on_delete(
const std::function<
void(
const error_code &)> &callback =
nullptr) noexcept
135 session->connection->set_timeout(timeout_content);
136 auto self = this->shared_from_this();
137 asio::async_write(*session->connection->socket, *streambuf, [
self, callback](
const error_code &ec, std::size_t ) {
138 self->session->connection->cancel_timeout();
139 auto lock = self->session->connection->handler_runner->continue_lock();
148 std::size_t size() noexcept
150 return streambuf->size();
156 void send(
const std::function<
void(
const error_code &)> &callback =
nullptr) noexcept
158 session->connection->set_timeout(timeout_content);
160 std::shared_ptr<asio::streambuf> streambuf = std::move(this->streambuf);
161 this->streambuf = std::unique_ptr<asio::streambuf>(
new asio::streambuf());
162 rdbuf(this->streambuf.get());
165 send_queue.emplace_back(streambuf, callback);
166 if(send_queue.size() == 1)
171 void write(
const char_type *ptr, std::streamsize n)
173 std::ostream::write(ptr, n);
177 void write(StatusCode status_code = StatusCode::success_ok,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap())
179 *
this <<
"HTTP/1.1 " << identt::http::status_code(status_code) <<
"\r\n";
180 write_header(header, 0);
184 void write(StatusCode status_code, string_view content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap())
186 *
this <<
"HTTP/1.1 " << identt::http::status_code(status_code) <<
"\r\n";
187 write_header(header, content.size());
193 void write(StatusCode status_code, std::istream &content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap())
195 *
this <<
"HTTP/1.1 " << identt::http::status_code(status_code) <<
"\r\n";
196 content.seekg(0, std::ios::end);
197 auto size = content.tellg();
198 content.seekg(0, std::ios::beg);
199 write_header(header, size);
201 *
this << content.rdbuf();
205 void write(string_view content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap())
207 write(StatusCode::success_ok, content, header);
211 void write(std::istream &content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap())
213 write(StatusCode::success_ok, content, header);
217 void write(
const CaseInsensitiveMultimap &header)
219 write(StatusCode::success_ok, std::string(), header);
233 std::size_t size() noexcept
235 return streambuf.size();
242 auto size = streambuf.size();
244 read(&str[0], static_cast<std::streamsize>(size));
248 return std::string();
253 asio::streambuf &streambuf;
254 Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {}
262 asio::streambuf streambuf;
263 std::weak_ptr<Connection> connection;
264 std::string optimization = std::to_string(0);
266 Request(std::size_t max_request_streambuf_size,
const std::shared_ptr<Connection> &connection_) noexcept : streambuf(max_request_streambuf_size), connection(connection_), content(streambuf) {}
269 std::string method, path, query_string, http_version;
273 CaseInsensitiveMultimap header;
281 asio::ip::tcp::endpoint remote_endpoint() const noexcept
284 if(
auto connection = this->connection.lock())
285 return connection->socket->lowest_layer().remote_endpoint();
289 return asio::ip::tcp::endpoint();
293 DEPRECATED std::string remote_endpoint_address() const noexcept
296 if(
auto connection = this->connection.lock())
297 return connection->socket->lowest_layer().remote_endpoint().address().to_string();
301 return std::string();
305 DEPRECATED
unsigned short remote_endpoint_port() const noexcept
308 if(
auto connection = this->connection.lock())
309 return connection->socket->lowest_layer().remote_endpoint().port();
324 using RespPtr = std::shared_ptr<Response>;
325 using ReqPtr = std::shared_ptr<Request>;
328 class Connection :
public std::enable_shared_from_this<Connection> {
330 template <
typename... Args>
331 Connection(std::shared_ptr<ScopeRunner> handler_runner_, Args &&... args) noexcept : handler_runner(std::move(handler_runner_)), socket(
new socket_type(std::forward<Args>(args)...)) {}
333 std::shared_ptr<ScopeRunner> handler_runner;
335 std::unique_ptr<socket_type> socket;
337 std::unique_ptr<asio::steady_timer> timer;
339 void close() noexcept
342 socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
343 socket->lowest_layer().cancel(ec);
346 void set_timeout(
long seconds) noexcept
353 timer = std::unique_ptr<asio::steady_timer>(
new asio::steady_timer(get_socket_executor(*socket), std::chrono::seconds(seconds)));
354 std::weak_ptr<Connection> self_weak(this->shared_from_this());
355 timer->async_wait([self_weak](
const error_code &ec) {
357 if(
auto self = self_weak.lock())
363 void cancel_timeout() noexcept
377 Session(std::size_t max_request_streambuf_size, std::shared_ptr<Connection> connection_) noexcept : connection(std::move(connection_)), request(
new Request(max_request_streambuf_size, connection)) {}
379 std::shared_ptr<Connection> connection;
380 std::shared_ptr<Request> request;
387 Config(
unsigned short port) noexcept : port(port) {}
394 long timeout_request = 5;
396 long timeout_content = 300;
399 std::size_t max_request_streambuf_size = std::numeric_limits<std::size_t>::max();
404 bool reuse_address =
true;
406 bool fast_open =
false;
412 class regex_orderable :
public std::regex {
416 regex_orderable(
const char *regex_cstr) : std::regex(regex_cstr), str(regex_cstr) {}
417 regex_orderable(std::string regex_str_) : std::regex(regex_str_), str(std::move(regex_str_)) {}
418 bool operator<(
const regex_orderable &rhs)
const noexcept
420 return str < rhs.str;
427 std::map<regex_orderable, std::map<std::string, std::function<void(std::shared_ptr<typename HttpServerBase<socket_type>::Response>, std::shared_ptr<typename HttpServerBase<socket_type>::Request>)>>>
resource;
430 std::map<std::string, std::function<void(std::shared_ptr<typename HttpServerBase<socket_type>::Response>, std::shared_ptr<typename HttpServerBase<socket_type>::Request>)>>
default_resource;
433 std::function<void(std::shared_ptr<typename HttpServerBase<socket_type>::Request>,
const error_code &)>
on_error;
436 std::function<void(std::unique_ptr<socket_type> &, std::shared_ptr<typename HttpServerBase<socket_type>::Request>)>
on_upgrade;
446 std::lock_guard<std::mutex> lock(start_stop_mutex);
448 asio::ip::tcp::endpoint endpoint;
449 if(config.address.size() > 0)
450 endpoint = asio::ip::tcp::endpoint(make_address(config.address), config.port);
452 endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v6(), config.port);
455 acceptor = std::unique_ptr<asio::ip::tcp::acceptor>(
new asio::ip::tcp::acceptor(*io_whatever));
456 acceptor->open(endpoint.protocol());
457 acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address));
458 if(config.fast_open) {
459 #if defined(__linux__) && defined(TCP_FASTOPEN) 462 acceptor->set_option(asio::detail::socket_option::integer<IPPROTO_TCP, TCP_FASTOPEN>(qlen), ec);
465 acceptor->bind(endpoint);
469 return acceptor->local_endpoint().port();
490 std::lock_guard<std::mutex> lock(start_stop_mutex);
498 for(
auto &connection : connections->set)
500 connections->set.clear();
508 handler_runner->
stop();
513 std::mutex start_stop_mutex;
515 std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
519 std::unordered_set<Connection *>
set GUARDED_BY(mutex);
521 std::shared_ptr<Connections> connections;
523 std::shared_ptr<ScopeRunner> handler_runner;
527 virtual void after_bind() {}
528 virtual void accept() = 0;
530 template <
typename... Args>
531 std::shared_ptr<Connection> create_connection(Args &&... args) noexcept
533 auto connections = this->connections;
534 auto connection = std::shared_ptr<Connection>(
new Connection(handler_runner, std::forward<Args>(args)...), [connections](
Connection *connection) {
537 auto it = connections->set.find(connection);
538 if(it != connections->set.end())
539 connections->set.erase(it);
545 connections->set.emplace(connection.get());
550 void read(
const std::shared_ptr<Session> &session)
552 session->connection->set_timeout(config.timeout_request);
553 asio::async_read_until(*session->connection->socket, session->request->streambuf,
"\r\n\r\n", [
this, session](
const error_code &ec, std::size_t bytes_transferred) {
554 session->connection->cancel_timeout();
555 auto lock = session->connection->handler_runner->continue_lock();
558 session->request->header_read_time = std::chrono::system_clock::now();
559 if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
560 auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
561 response->write(StatusCode::client_error_payload_too_large);
563 this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
572 std::size_t num_additional_bytes = session->request->streambuf.size() - bytes_transferred;
574 if(!RequestMessage::parse(session->request->content, session->request->method, session->request->path,
575 session->request->query_string, session->request->http_version, session->request->header)) {
577 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
582 auto header_it = session->request->header.find(
"Content-Length");
583 if(header_it != session->request->header.end()) {
584 unsigned long long content_length = 0;
586 content_length = stoull(header_it->second);
588 catch(
const std::exception &) {
590 this->
on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
593 if(content_length > num_additional_bytes) {
594 session->connection->set_timeout(config.timeout_content);
595 asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [
this, session](
const error_code &ec, std::size_t ) {
596 session->connection->cancel_timeout();
597 auto lock = session->connection->handler_runner->continue_lock();
600 if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
601 auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
602 response->write(StatusCode::client_error_payload_too_large);
604 this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
609 this->find_resource(session);
611 this->
on_error(session->request, ec);
615 this->find_resource(session);
617 else if((header_it = session->request->header.find(
"Transfer-Encoding")) != session->request->header.end() && header_it->second ==
"chunked") {
618 auto chunks_streambuf = std::make_shared<asio::streambuf>(this->config.max_request_streambuf_size);
621 std::ostream ostream(chunks_streambuf.get());
622 auto size = session->request->streambuf.size();
623 std::unique_ptr<char[]> buffer(
new char[size]);
624 session->request->content.read(buffer.get(),
static_cast<std::streamsize
>(size));
625 ostream.write(buffer.get(),
static_cast<std::streamsize
>(size));
627 this->read_chunked_transfer_encoded(session, chunks_streambuf);
630 this->find_resource(session);
633 this->
on_error(session->request, ec);
637 void read_chunked_transfer_encoded(
const std::shared_ptr<Session> &session,
const std::shared_ptr<asio::streambuf> &chunks_streambuf)
640 asio::async_read_until(*session->connection->socket, *chunks_streambuf,
"\r\n", [
this, session, chunks_streambuf](
const error_code &ec,
size_t bytes_transferred) {
641 session->connection->cancel_timeout();
642 auto lock = session->connection->handler_runner->continue_lock();
645 if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
646 auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
647 response->write(StatusCode::client_error_payload_too_large);
649 this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
654 std::istream istream(chunks_streambuf.get());
656 getline(istream, line);
657 bytes_transferred -= line.size() + 1;
659 unsigned long length = 0;
661 length = stoul(line, 0, 16);
665 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
669 auto num_additional_bytes = chunks_streambuf->size() - bytes_transferred;
671 if((2 + length) > num_additional_bytes) {
673 asio::async_read(*session->connection->socket, *chunks_streambuf, asio::transfer_exactly(2 + length - num_additional_bytes), [
this, session, chunks_streambuf, length](
const error_code &ec,
size_t ) {
674 session->connection->cancel_timeout();
675 auto lock = session->connection->handler_runner->continue_lock();
678 if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
679 auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
680 response->write(StatusCode::client_error_payload_too_large);
682 this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
687 this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length);
689 this->
on_error(session->request, ec);
693 this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length);
696 this->
on_error(session->request, ec);
700 void read_chunked_transfer_encoded_chunk(
const std::shared_ptr<Session> &session,
const std::shared_ptr<asio::streambuf> &chunks_streambuf,
unsigned long length)
702 std::istream istream(chunks_streambuf.get());
704 std::ostream ostream(&session->request->streambuf);
705 std::unique_ptr<char[]> buffer(
new char[length]);
706 istream.read(buffer.get(),
static_cast<std::streamsize
>(length));
707 ostream.write(buffer.get(),
static_cast<std::streamsize
>(length));
708 if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
710 response->write(StatusCode::client_error_payload_too_large);
712 this->
on_error(session->request, make_error_code::make_error_code(errc::message_size));
722 read_chunked_transfer_encoded(session, chunks_streambuf);
724 this->find_resource(session);
727 void find_resource(
const std::shared_ptr<Session> &session)
731 auto it = session->request->header.find(
"Upgrade");
732 if(it != session->request->header.end()) {
736 auto it = connections->set.find(session->connection.get());
737 if(it != connections->set.end())
738 connections->set.erase(it);
741 on_upgrade(session->connection->socket, session->request);
746 for(
auto ®ex_method :
resource) {
747 auto it = regex_method.second.find(session->request->method);
748 if(it != regex_method.second.end()) {
750 if(std::regex_match(session->request->path, sm_res, regex_method.first)) {
751 session->request->path_match = std::move(sm_res);
752 write(session, it->second);
759 write(session, it->second);
762 void write(
const std::shared_ptr<Session> &session,
767 auto response = std::shared_ptr<Response>(response_ptr);
768 response->send_on_delete([
this, response](
const error_code &ec) {
770 if(response->close_connection_after_response)
773 auto range = response->session->request->header.equal_range(
"Connection");
774 for(
auto it = range.first; it != range.second; it++) {
775 if(case_insensitive_equal(it->second,
"close"))
777 else if(case_insensitive_equal(it->second,
"keep-alive")) {
779 this->read(new_session);
783 if(response->session->request->http_version >=
"1.1") {
785 this->read(new_session);
790 this->
on_error(response->session->request, ec);
795 resource_function(response, session->request);
797 catch(
const std::exception &) {
799 on_error(session->request, make_error_code::make_error_code(errc::operation_canceled));
808 #endif // _IDENTT_HTTP_SERVER_BASE_HPP_ long timeout_content
Timeout on content handling. Defaults to 300 seconds.
Definition: HttpServerBase.hpp:396
Definition: HttpServerBase.hpp:53
void stop() noexcept
Stop accepting new requests, and close current connections.
Definition: HttpServerBase.hpp:488
void write(StatusCode status_code, string_view content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, header fields, and content.
Definition: HttpServerBase.hpp:184
std::shared_ptr<::identt::http::io_whatever > io_whatever
If you want to reuse an already created asio::io_service, store its pointer here before calling start...
Definition: HttpServerBase.hpp:439
Scoped mutex guard class that is annotated for Clang Thread Safety Analysis.
Definition: Mutex.hpp:121
void write(const CaseInsensitiveMultimap &header)
Convenience function for writing success status line, and header fields.
Definition: HttpServerBase.hpp:217
Definition: HttpServerBase.hpp:384
bool close_connection_after_response
If set to true, force server to close the connection after the response have been sent...
Definition: HttpServerBase.hpp:226
void write(const char_type *ptr, std::streamsize n)
Write directly to stream buffer using std::ostream::write.
Definition: HttpServerBase.hpp:171
Makes it possible to for instance cancel Asio handlers without stopping asio::io_service.
Definition: Utility.hpp:535
std::function< void(std::shared_ptr< typename HttpServerBase< socket_type >::Request >, const error_code &)> on_error
Called when an error occurs.
Definition: HttpServerBase.hpp:433
std::size_t max_request_streambuf_size
Maximum size of request stream buffer.
Definition: HttpServerBase.hpp:399
Definition: HttpServerBase.hpp:375
static CaseInsensitiveMultimap parse(const std::string &query_string) noexcept
Returns query keys with percent-decoded values.
Definition: Utility.hpp:168
std::string string() noexcept
Convenience function to return content as std::string. The stream buffer is consumed.
Definition: HttpServerBase.hpp:238
Response class where the content of the response is sent to client when the object is about to be des...
Definition: HttpServerBase.hpp:60
Definition: HttpServerBase.hpp:257
void accept_and_run()
If you know the server port in advance, use start() instead.
Definition: HttpServerBase.hpp:474
std::chrono::system_clock::time_point header_read_time
The time point when the request header was fully read.
Definition: HttpServerBase.hpp:279
unsigned short bind()
If you know the server port in advance, use start() instead.
Definition: HttpServerBase.hpp:444
Definition: HttpServerBase.hpp:328
Definition: CryptoBase.hpp:49
Config config
Set before calling start().
Definition: HttpServerBase.hpp:409
std::string address
IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation.
Definition: HttpServerBase.hpp:402
void write(StatusCode status_code, std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, header fields, and content.
Definition: HttpServerBase.hpp:193
Definition: HttpServerBase.hpp:517
std::function< void(std::unique_ptr< socket_type > &, std::shared_ptr< typename HttpServerBase< socket_type >::Request >)> on_upgrade
Called on upgrade requests.
Definition: HttpServerBase.hpp:436
void send(const std::function< void(const error_code &)> &callback=nullptr) noexcept
Send the content of the response stream to client.
Definition: HttpServerBase.hpp:156
std::map< std::string, std::function< void(std::shared_ptr< typename HttpServerBase< socket_type >::Response >, std::shared_ptr< typename HttpServerBase< socket_type >::Request >)> > default_resource
If the request path does not match a resource regex, this function is called.
Definition: HttpServerBase.hpp:430
void write(std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing success status line, header fields, and content. ...
Definition: HttpServerBase.hpp:211
void start()
Start the server by calling bind() and accept_and_run()
Definition: HttpServerBase.hpp:481
Definition: HttpServerBase.hpp:229
std::smatch path_match
The result of the resource regular expression match of the request path.
Definition: HttpServerBase.hpp:276
Definition: HttpServer.hpp:42
unsigned short port
Port number to use. Defaults to 80 for HTTP and 443 for HTTPS. Set to 0 get an assigned port...
Definition: HttpServerBase.hpp:391
std::map< regex_orderable, std::map< std::string, std::function< void(std::shared_ptr< typename HttpServerBase< socket_type >::Response >, std::shared_ptr< typename HttpServerBase< socket_type >::Request >)> > > resource
Use this container to add resources for specific request paths depending on the given regex and metho...
Definition: HttpServerBase.hpp:427
void write(string_view content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing success status line, header fields, and content. ...
Definition: HttpServerBase.hpp:205
void write(StatusCode status_code=StatusCode::success_ok, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, potential header fields, and empty content.
Definition: HttpServerBase.hpp:177
CaseInsensitiveMultimap parse_query_string() const noexcept
Returns query keys with percent-decoded values.
Definition: HttpServerBase.hpp:317