diff --git a/modules/platforms/cpp/cmake/dependencies.cmake b/modules/platforms/cpp/cmake/dependencies.cmake index 1966568c55ae..8aa0702c024a 100644 --- a/modules/platforms/cpp/cmake/dependencies.cmake +++ b/modules/platforms/cpp/cmake/dependencies.cmake @@ -43,6 +43,24 @@ function(fetch_dependency NAME URL MD5) endif() endfunction() +function(add_asio_dependency) + message(STATUS "Download dependency: asio") + + FetchContent_Declare( + asio + URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz + URL_HASH MD5=6699ac1dea111c20d024f25e06e573db + ) + + FetchContent_Populate(asio) + + add_library(asio INTERFACE) + + target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include) + + target_compile_definitions(asio INTERFACE ASIO_STANDALONE) +endfunction() + if (${USE_LOCAL_DEPS}) find_package(msgpack REQUIRED) if (${msgpack_FOUND}) @@ -76,6 +94,7 @@ else() fetch_dependency(uni-algo https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz 6e0cce94a6b45ebee7b904316df9f87f) if (${ENABLE_TESTS}) fetch_dependency(googletest https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz c8340a482851ef6a3fe618a082304cfc) + add_asio_dependency() endif() endif() diff --git a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt index ea4abb5be512..402291e4304d 100644 --- a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt +++ b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt @@ -24,4 +24,4 @@ set(SOURCES connection_test.cpp ) -ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple) \ No newline at end of file +ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple) \ No newline at end of file diff --git a/modules/platforms/cpp/tests/fake_server/connection_test.cpp b/modules/platforms/cpp/tests/fake_server/connection_test.cpp index d1146b1ae3ec..be2abdde2be7 100644 --- a/modules/platforms/cpp/tests/fake_server/connection_test.cpp +++ b/modules/platforms/cpp/tests/fake_server/connection_test.cpp @@ -15,13 +15,15 @@ * limitations under the License. */ -#include "tests/client-test/ignite_runner_suite.h" -#include "ignite/client/ignite_client.h" #include "fake_server.h" +#include "ignite/client/ignite_client.h" +#include "proxy/asio_proxy.h" +#include "tests/client-test/ignite_runner_suite.h" #include #include + using namespace ignite; using namespace std::chrono_literals; @@ -76,3 +78,25 @@ TEST_F(connection_test, request_timeout) { EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code()); } } + +TEST_F(connection_test, using_asio) { + fake_server fs{50900, get_logger()}; + fs.start(); + + proxy::message_listener listener; + + proxy::asio_proxy proxy{ + {proxy::configuration(50800, "127.0.0.1:50900", &listener)} + }; + + + ignite_client_configuration cfg; + cfg.set_logger(get_logger()); + cfg.set_endpoints(get_endpoints()); + + auto cl = ignite_client::start(cfg, 5s); + + auto cluster_nodes = cl.get_cluster_nodes(); + + ASSERT_EQ(1, cluster_nodes.size()); +} diff --git a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h b/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h new file mode 100644 index 000000000000..82cdd53a0852 --- /dev/null +++ b/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "message.h" +#include "message_listener.h" + +namespace ignite::proxy { + +using asio::ip::tcp; + +struct configuration { + asio::ip::port_type m_in_port; + std::string m_out_host_and_port; + message_listener* m_listener; + + configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener) + : m_in_port(m_in_port) + , m_out_host_and_port(m_out_host_and_port) + , m_listener(m_listener) { } +}; + +class session : public std::enable_shared_from_this { +public: + session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener) + : m_in_sock(std::move(in_sock)) + , m_out_sock(std::move(out_sock)) + , m_stopped(stopped) + , m_listener(listener) + { } + + ~session() { + std::cout << "Session destructed " << this << std::endl; + } + + void start() { do_serve(); } + + tcp::socket &get_out_sock() { return m_out_sock; } + + void set_writable(bool writable) { + m_in_to_out_writable = writable; + m_out_to_in_writable = writable; + } + + enum direction { forward, reverse }; + +private: + void do_serve() { + do_read(forward); + do_read(reverse); + } + + void do_read(direction direction) { + if (m_stopped.load()) + return; + + tcp::socket &src = direction == forward ? m_in_sock : m_out_sock; + + auto self(shared_from_this()); + + src.async_read_some(asio::buffer(buf, BUFF_SIZE), + [direction, self](const asio::error_code& ec, size_t len) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while reading from socket " + ec.message()); + } + + std::queue &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + // we have one-threaded executor no synchronization is needed + message& msg = queue.emplace(self->buf, len); + + if (self->m_listener) { + if (direction == forward) { + self->m_listener->register_out_message(msg); + } else { + self->m_listener->register_in_message(msg); + } + } + + if (writable) { // there are pending write operation on this socket + self->do_write(direction); + } + + self->do_read(direction); + }); + } + + void do_write(direction direction) { + tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock; + std::queue &queue = direction == forward ? m_in_to_out : m_out_to_in; + bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable; + + writable = false; // protects from writing same buffer twice (from head of queue). + + auto self(shared_from_this()); + if (!queue.empty()) { + message &msg = queue.front(); + + asio::async_write( + dst, asio::buffer(msg.m_arr, msg.m_size), + [direction, self](asio::error_code ec, size_t) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while writing to socket " + ec.message()); + } + + std::queue &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + queue.pop(); + + if (!queue.empty()) { + // makes writes on the same socket strictly ordered + self->do_write(direction); + } else { + writable = true; // now read operation can initiate writes + } + }); + } + } + + tcp::socket m_in_sock; + tcp::socket m_out_sock; + + bool m_in_to_out_writable{false}; + bool m_out_to_in_writable{false}; + + std::queue m_in_to_out; + std::queue m_out_to_in; + + static constexpr size_t BUFF_SIZE = 4096; + + char buf[BUFF_SIZE]{}; + + std::atomic_bool& m_stopped; + + message_listener* m_listener{nullptr}; +}; + +class asio_proxy { +public: + asio_proxy(std::vector configurations) + : m_resolver(m_io_context) + , m_in_sock(m_io_context) + { + for (auto &cfg : configurations) { + m_conn_map.emplace( + cfg.m_in_port, + proxy_entry{m_io_context, cfg.m_in_port, cfg.m_out_host_and_port, cfg.m_listener} + ); + } + + do_serve(); + + m_executor = std::make_unique([this]() { + m_io_context.run(); + }); + } + + ~asio_proxy() { + m_stopped.store(true); + m_io_context.stop(); + + m_executor->join(); + } + +private: + struct proxy_entry { + tcp::acceptor m_in_acceptor; + std::string m_out_host; + std::string m_out_port; + message_listener* m_listener; + + proxy_entry(asio::io_context& io_context, asio::ip::port_type in_port, const std::string& out_host_and_port, message_listener* listener) + : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), in_port)) + , m_listener(listener) + { + auto colon_pos = out_host_and_port.find(':'); + + if (colon_pos == std::string::npos) { + throw std::runtime_error("Incorrect host and part format. Expected 'hostname:port' but got " + out_host_and_port); + } + + m_out_host = out_host_and_port.substr(0, colon_pos); + m_out_port = out_host_and_port.substr(colon_pos + 1); + } + }; + + + void do_serve() { + for (auto& [_, entry]: m_conn_map) { + do_accept(entry); + } + } + + void do_accept(proxy_entry& entry) { + if (m_stopped.load()) { + return; + } + + entry.m_in_acceptor.async_accept(m_in_sock, [this, &entry](asio::error_code ec) { + if (ec) { + throw std::runtime_error("Error accepting incoming connection " + ec.message()); + } + + auto ses = std::make_shared(std::move(m_in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener); + + m_resolver.async_resolve(entry.m_out_host, entry.m_out_port, + [ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param) + if (ec) { + throw std::runtime_error("Error resolving server's address " + ec.message()); + } + + asio::async_connect( + ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) { + if (ec) { + std::cout << e.port(); + throw std::runtime_error("Error connecting to server " + ec.message()); + } + + ses->set_writable(true); + ses->start(); + }); + }); + + do_accept(entry); + }); + } + + std::map m_conn_map{}; + + asio::io_context m_io_context{}; + std::unique_ptr m_executor{}; + + tcp::resolver m_resolver; + tcp::socket m_in_sock; + + std::atomic_bool m_stopped{false}; +}; +} // namespace ignite::proxy diff --git a/modules/platforms/cpp/tests/fake_server/proxy/message.h b/modules/platforms/cpp/tests/fake_server/proxy/message.h new file mode 100644 index 000000000000..f4f4964a1fe5 --- /dev/null +++ b/modules/platforms/cpp/tests/fake_server/proxy/message.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// + +#pragma once + +#include +#include + +namespace ignite::proxy { + +struct message { + char *m_arr{nullptr}; + size_t m_size = 0; + + friend void swap(message& lhs, message& rhs) noexcept { + using std::swap; + swap(lhs.m_arr, rhs.m_arr); + swap(lhs.m_size, rhs.m_size); + } + + message(char *arr, size_t size) + : m_size(size) + { + m_arr = new char[m_size]; + std::memcpy(m_arr, arr, size); + } + + message(const message &other) + : m_size(other.m_size) + { + m_arr = new char[m_size]; + std::memcpy(m_arr, other.m_arr, m_size); + } + + message(message &&other) noexcept + { + swap(*this, other); + } + + message& operator=(message rhs) noexcept { + swap(*this, rhs); + return *this; + } + + ~message() { delete[] m_arr; } +}; +} // namespace ignite::proxy \ No newline at end of file diff --git a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h new file mode 100644 index 000000000000..025dd6bb0766 --- /dev/null +++ b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once +#include +#include + +#include "message.h" + +namespace ignite::proxy { + +class message_listener { +public: + void register_out_message(message msg) { + m_out_queue.push(std::move(msg)); + } + + void register_in_message(message msg) { + m_in_queue.push(std::move(msg)); + } +private: + std::queue m_out_queue{}; + std::queue m_in_queue{}; +}; +} // namespace ignite::proxy