Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions modules/platforms/cpp/cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ function(fetch_dependency NAME URL MD5)
endif()
endfunction()

function(add_asio_dependency)
message(STATUS "Download dependency: asio")

FetchContent_Declare(
asio
URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
URL_HASH MD5=6699ac1dea111c20d024f25e06e573db
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The add_asio_dependency function verifies the downloaded asio tarball using an MD5 hash (URL_HASH MD5=...), which relies on a cryptographically broken algorithm. An attacker capable of influencing the download (e.g., via network or repository compromise) could craft a malicious archive with the same MD5, bypassing this integrity check and introducing backdoored code into your build. Switch this integrity check to a stronger hash supported by CMake (for example, using URL_HASH SHA256=...) and regenerate the checksum from a trusted source.

Suggested change
URL_HASH MD5=6699ac1dea111c20d024f25e06e573db
# NOTE: SHA256 checksum must be computed from a trusted copy of asio-1-36-0.tar.gz.
URL_HASH SHA256=0f1a5b0a7d8c9e3f2b4c6d8e0f1a2b3c4d5e6f708192a3b4c5d6e7f8091a2b3c

Copilot uses AI. Check for mistakes.
)

FetchContent_Populate(asio)

add_library(asio INTERFACE)

target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)

target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
Comment on lines +55 to +61
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_asio_dependency() uses FetchContent_Populate(asio) unconditionally and always creates the asio target. If this file is included more than once (or another target named asio exists), this can re-download/repopulate or fail with target redefinition. Consider following the existing fetch_dependency() pattern: FetchContent_GetProperties + if(NOT asio_POPULATED) and guard target creation with if(NOT TARGET asio).

Suggested change
FetchContent_Populate(asio)
add_library(asio INTERFACE)
target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)
target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
FetchContent_GetProperties(asio)
if (NOT asio_POPULATED)
FetchContent_Populate(asio)
endif()
if (NOT TARGET asio)
add_library(asio INTERFACE)
target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)
target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
endif()

Copilot uses AI. Check for mistakes.
endfunction()

if (${USE_LOCAL_DEPS})
find_package(msgpack REQUIRED)
if (${msgpack_FOUND})
Expand Down Expand Up @@ -76,6 +94,7 @@ else()
fetch_dependency(uni-algo https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz 6e0cce94a6b45ebee7b904316df9f87f)
if (${ENABLE_TESTS})
fetch_dependency(googletest https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz c8340a482851ef6a3fe618a082304cfc)
add_asio_dependency()
endif()
Comment on lines 95 to 98
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asio is only added in the non-USE_LOCAL_DEPS branch. When USE_LOCAL_DEPS=ON and ENABLE_TESTS=ON, modules/platforms/cpp/tests/fake_server now links against asio and the build will fail because the target isn’t defined. Consider defining the asio target in both branches (e.g., find_package(asio) for local deps or still fetching it for tests).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems incorrect suggestion

endif()

Expand Down
2 changes: 1 addition & 1 deletion modules/platforms/cpp/tests/fake_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ set(SOURCES
connection_test.cpp
)

ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple)
ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple)
28 changes: 26 additions & 2 deletions modules/platforms/cpp/tests/fake_server/connection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
* limitations under the License.
*/

#include "tests/client-test/ignite_runner_suite.h"
#include "ignite/client/ignite_client.h"
#include "fake_server.h"
#include "ignite/client/ignite_client.h"
#include "proxy/asio_proxy.h"
#include "tests/client-test/ignite_runner_suite.h"

#include <gtest/gtest.h>
#include <thread>


using namespace ignite;
using namespace std::chrono_literals;

Expand Down Expand Up @@ -76,3 +78,25 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}

TEST_F(connection_test, using_asio) {
fake_server fs{50900, get_logger()};
fs.start();

proxy::message_listener listener;

proxy::asio_proxy proxy{
{proxy::configuration(50800, "127.0.0.1:50900", &listener)}
};


ignite_client_configuration cfg;
cfg.set_logger(get_logger());
cfg.set_endpoints(get_endpoints());

auto cl = ignite_client::start(cfg, 5s);

auto cluster_nodes = cl.get_cluster_nodes();

ASSERT_EQ(1, cluster_nodes.size());
}
271 changes: 271 additions & 0 deletions modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#pragma once

#include <atomic>
#include <iostream>
#include <map>
#include <memory>
#include <queue>
#include <thread>
#include <vector>

#include <asio.hpp>
#include <asio/ts/internet.hpp>

#include "message.h"
#include "message_listener.h"

namespace ignite::proxy {

using asio::ip::tcp;

struct configuration {
asio::ip::port_type m_in_port;
std::string m_out_host_and_port;
message_listener* m_listener;

configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener)
: m_in_port(m_in_port)
, m_out_host_and_port(m_out_host_and_port)
, m_listener(m_listener) { }
};

class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener)
: m_in_sock(std::move(in_sock))
, m_out_sock(std::move(out_sock))
, m_stopped(stopped)
, m_listener(listener)
{ }

~session() {
std::cout << "Session destructed " << this << std::endl;
}
Comment on lines +57 to +59
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unconditional std::cout output in test utilities tends to pollute test logs and makes failures harder to read. Consider removing these prints or using the existing test logger / a compile-time debug flag instead.

Copilot uses AI. Check for mistakes.

void start() { do_serve(); }

tcp::socket &get_out_sock() { return m_out_sock; }

void set_writable(bool writable) {
m_in_to_out_writable = writable;
m_out_to_in_writable = writable;
}

enum direction { forward, reverse };

private:
void do_serve() {
do_read(forward);
do_read(reverse);
}
Comment on lines +73 to +76
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session::do_serve() starts two concurrent async_read_some operations (forward + reverse) but both use the same member buffer (buf). This can lead to overlapping reads corrupting each other’s data. Use separate buffers per direction (e.g., m_forward_buf/m_reverse_buf) or allocate a buffer per read operation.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thread only used. No sync needed


void do_read(direction direction) {
if (m_stopped.load())
return;

tcp::socket &src = direction == forward ? m_in_sock : m_out_sock;

auto self(shared_from_this());

src.async_read_some(asio::buffer(buf, BUFF_SIZE),
[direction, self](const asio::error_code& ec, size_t len) {
if (ec) {
if (ec == asio::error::eof) {
return;
}
throw std::runtime_error("Error while reading from socket " + ec.message());
}
Comment on lines +88 to +93
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions are thrown directly from Asio completion handlers. Because io_context::run() is executed on a background thread without a try/catch, an error (including operation_aborted during shutdown) will call std::terminate and can crash tests. Handle errors in the handler (treat operation_aborted/EOF as normal shutdown) and/or catch exceptions around m_io_context.run().

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intended at this point


std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in;
bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable;

// we have one-threaded executor no synchronization is needed
message& msg = queue.emplace(self->buf, len);

if (self->m_listener) {
if (direction == forward) {
self->m_listener->register_out_message(msg);
} else {
self->m_listener->register_in_message(msg);
}
}

if (writable) { // there are pending write operation on this socket
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment contradicts the current logic: writable is set to false when a write is in-flight and reset to true when the queue becomes empty, so if (writable) means “no write in progress; safe to start one”, not “there are pending write operation”. Please update the comment to reflect the actual meaning to avoid confusion during maintenance.

Suggested change
if (writable) { // there are pending write operation on this socket
if (writable) { // no write in progress on this socket; safe to start one

Copilot uses AI. Check for mistakes.
self->do_write(direction);
}

self->do_read(direction);
});
}

void do_write(direction direction) {
tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock;
std::queue<message> &queue = direction == forward ? m_in_to_out : m_out_to_in;
bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable;

writable = false; // protects from writing same buffer twice (from head of queue).

auto self(shared_from_this());
if (!queue.empty()) {
message &msg = queue.front();

asio::async_write(
dst, asio::buffer(msg.m_arr, msg.m_size),
[direction, self](asio::error_code ec, size_t) {
if (ec) {
if (ec == asio::error::eof) {
return;
}
throw std::runtime_error("Error while writing to socket " + ec.message());
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here: throwing from async_write completion handlers is unsafe and can terminate the process. Prefer handling the error in-place (close/cancel, stop the session) or route the error to the test harness via a callback/flag.

Suggested change
throw std::runtime_error("Error while writing to socket " + ec.message());
std::cerr << "Error while writing to socket: " << ec.message() << std::endl;
return;

Copilot uses AI. Check for mistakes.
}

std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in;
bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable;

queue.pop();

if (!queue.empty()) {
// makes writes on the same socket strictly ordered
self->do_write(direction);
} else {
writable = true; // now read operation can initiate writes
}
});
}
}

tcp::socket m_in_sock;
tcp::socket m_out_sock;

bool m_in_to_out_writable{false};
bool m_out_to_in_writable{false};

std::queue<message> m_in_to_out;
std::queue<message> m_out_to_in;

static constexpr size_t BUFF_SIZE = 4096;

char buf[BUFF_SIZE]{};

std::atomic_bool& m_stopped;

message_listener* m_listener{nullptr};
};

class asio_proxy {
public:
asio_proxy(std::vector<configuration> configurations)
: m_resolver(m_io_context)
, m_in_sock(m_io_context)
{
for (auto &cfg : configurations) {
m_conn_map.emplace(
cfg.m_in_port,
proxy_entry{m_io_context, cfg.m_in_port, cfg.m_out_host_and_port, cfg.m_listener}
);
}

do_serve();

m_executor = std::make_unique<std::thread>([this]() {
m_io_context.run();
});
}

~asio_proxy() {
m_stopped.store(true);
m_io_context.stop();

m_executor->join();
}

private:
struct proxy_entry {
tcp::acceptor m_in_acceptor;
std::string m_out_host;
std::string m_out_port;
message_listener* m_listener;

proxy_entry(asio::io_context& io_context, asio::ip::port_type in_port, const std::string& out_host_and_port, message_listener* listener)
: m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), in_port))
, m_listener(listener)
{
auto colon_pos = out_host_and_port.find(':');

if (colon_pos == std::string::npos) {
throw std::runtime_error("Incorrect host and part format. Expected 'hostname:port' but got " + out_host_and_port);
}

m_out_host = out_host_and_port.substr(0, colon_pos);
m_out_port = out_host_and_port.substr(colon_pos + 1);
}
};


void do_serve() {
for (auto& [_, entry]: m_conn_map) {
do_accept(entry);
}
}

void do_accept(proxy_entry& entry) {
if (m_stopped.load()) {
return;
}

entry.m_in_acceptor.async_accept(m_in_sock, [this, &entry](asio::error_code ec) {
if (ec) {
throw std::runtime_error("Error accepting incoming connection " + ec.message());
}

auto ses = std::make_shared<session>(std::move(m_in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener);

Comment on lines +232 to +238
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asio_proxy uses a single member m_in_sock for all async_accept calls. If multiple configurations/acceptors are used, there will be concurrent accepts writing into the same socket object, which is not safe. Create a fresh tcp::socket per pending accept (e.g., local socket moved into the handler) instead of sharing one member socket.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are using one thread

m_resolver.async_resolve(entry.m_out_host, entry.m_out_port,
[ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param)
if (ec) {
throw std::runtime_error("Error resolving server's address " + ec.message());
}

asio::async_connect(
ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) {
if (ec) {
std::cout << e.port();
throw std::runtime_error("Error connecting to server " + ec.message());
Comment on lines +248 to +249
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This std::cout << e.port(); looks like leftover debug output and will add noise on failures (and it prints without newline). Consider removing it or integrating it into the error message/log output in a controlled way.

Suggested change
std::cout << e.port();
throw std::runtime_error("Error connecting to server " + ec.message());
throw std::runtime_error(
"Error connecting to server on port " + std::to_string(e.port()) + ": " + ec.message());

Copilot uses AI. Check for mistakes.
}

ses->set_writable(true);
ses->start();
});
});

do_accept(entry);
});
Comment on lines +232 to +258
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asio_proxy starts an async_accept for each proxy_entry but all accepts share the same member socket m_in_sock. Multiple pending accepts must not use the same tcp::socket instance; this can lead to undefined behavior and incorrect/missed connections when more than one acceptor is active. Create a fresh tcp::socket per accept operation (e.g., local socket moved into the handler) or store a dedicated socket per entry.

Suggested change
entry.m_in_acceptor.async_accept(m_in_sock, [this, &entry](asio::error_code ec) {
if (ec) {
throw std::runtime_error("Error accepting incoming connection " + ec.message());
}
auto ses = std::make_shared<session>(std::move(m_in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener);
m_resolver.async_resolve(entry.m_out_host, entry.m_out_port,
[ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param)
if (ec) {
throw std::runtime_error("Error resolving server's address " + ec.message());
}
asio::async_connect(
ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) {
if (ec) {
std::cout << e.port();
throw std::runtime_error("Error connecting to server " + ec.message());
}
ses->set_writable(true);
ses->start();
});
});
do_accept(entry);
});
tcp::socket in_sock{m_io_context};
entry.m_in_acceptor.async_accept(in_sock,
[this, &entry, in_sock = std::move(in_sock)](asio::error_code ec) mutable {
if (ec) {
throw std::runtime_error("Error accepting incoming connection " + ec.message());
}
auto ses = std::make_shared<session>(std::move(in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener);
m_resolver.async_resolve(entry.m_out_host, entry.m_out_port,
[ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param)
if (ec) {
throw std::runtime_error("Error resolving server's address " + ec.message());
}
asio::async_connect(
ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) {
if (ec) {
std::cout << e.port();
throw std::runtime_error("Error connecting to server " + ec.message());
}
ses->set_writable(true);
ses->start();
});
});
do_accept(entry);
});

Copilot uses AI. Check for mistakes.
}

std::map<asio::ip::port_type, proxy_entry> m_conn_map{};

asio::io_context m_io_context{};
std::unique_ptr<std::thread> m_executor{};

tcp::resolver m_resolver;
tcp::socket m_in_sock;

std::atomic_bool m_stopped{false};
};
} // namespace ignite::proxy
Loading
Loading