From 03586e2232b239d8444ab2ccc40f0b197b087367 Mon Sep 17 00:00:00 2001 From: everoddandeven Date: Tue, 7 Apr 2026 22:05:16 +0200 Subject: [PATCH] Connection manager consolidation --- .github/workflows/test.yml | 4 +- bin/setup_test_environment.sh | 2 +- src/cpp/common/py_monero_common.cpp | 438 ++++++++------ src/cpp/common/py_monero_common.h | 355 +++++++++++- src/cpp/daemon/py_monero_daemon_model.cpp | 6 - src/cpp/daemon/py_monero_daemon_model.h | 9 - src/cpp/py_monero.cpp | 59 +- src/cpp/wallet/py_monero_wallet_rpc.cpp | 5 - src/cpp/wallet/py_monero_wallet_rpc.h | 2 +- src/python/monero_connection_manager.pyi | 36 +- src/python/monero_rpc_connection.pyi | 12 +- tests/config/config.ini | 6 +- tests/docker-compose.yml | 32 +- tests/test_monero_connection_manager.py | 642 +++++++++++---------- tests/test_monero_rpc_connection.py | 11 +- tests/test_monero_wallet_common.py | 1 + tests/utils/__init__.py | 6 +- tests/utils/block_utils.py | 10 +- tests/utils/connection_change_collector.py | 61 +- tests/utils/daemon_utils.py | 2 +- tests/utils/docker_wallet_rpc_manager.py | 301 ++++++++++ tests/utils/mining_utils.py | 5 +- tests/utils/rpc_connection_utils.py | 50 ++ tests/utils/test_utils.py | 181 +++--- tests/utils/to_multiple_tx_sender.py | 4 +- 25 files changed, 1523 insertions(+), 717 deletions(-) create mode 100644 tests/utils/docker_wallet_rpc_manager.py create mode 100644 tests/utils/rpc_connection_utils.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ad41341..8af8b35 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,7 @@ jobs: run: | sudo apt update sudo apt install -y build-essential cmake pkg-config libssl-dev libzmq3-dev libunbound-dev libsodium-dev libunwind8-dev liblzma-dev libreadline6-dev libexpat1-dev libpgm-dev qttools5-dev-tools libhidapi-dev libusb-1.0-0-dev libprotobuf-dev protobuf-compiler libudev-dev libboost-chrono-dev libboost-date-time-dev libboost-filesystem-dev libboost-locale-dev libboost-program-options-dev libboost-regex-dev libboost-serialization-dev libboost-system-dev libboost-thread-dev python3 ccache doxygen graphviz git curl autoconf libtool gperf nettle-dev libevent-dev debhelper python3-all python3-pip python3-pybind11 python3-pytest python3-pytest-rerunfailures python3-pytest-cov lcov python3-scikit-build-core - pip3 install pybind11-stubgen pytest pyproject-metadata --break-system-packages + pip3 install pybind11-stubgen pytest pytest-timeout pyproject-metadata --break-system-packages - name: Install expat run: | @@ -86,7 +86,7 @@ jobs: - name: Setup test environment run: | - docker compose -f tests/docker-compose.yml up -d node_1 node_2 xmr_wallet_1 xmr_wallet_2 + docker compose -f tests/docker-compose.yml up -d node_1 node_2 xmr_wallet_1 xmr_wallet_2 xmr_wallet_3 - name: Reset coverage counters run: | diff --git a/bin/setup_test_environment.sh b/bin/setup_test_environment.sh index 99c5770..3b23543 100755 --- a/bin/setup_test_environment.sh +++ b/bin/setup_test_environment.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash # start docker containers -sudo docker compose -f tests/docker-compose.yml up -d node_1 node_2 xmr_wallet_1 xmr_wallet_2 +sudo docker compose -f tests/docker-compose.yml up -d node_1 node_2 xmr_wallet_1 xmr_wallet_2 xmr_wallet_3 diff --git a/src/cpp/common/py_monero_common.cpp b/src/cpp/common/py_monero_common.cpp index 2e89814..4039952 100644 --- a/src/cpp/common/py_monero_common.cpp +++ b/src/cpp/common/py_monero_common.cpp @@ -1,3 +1,8 @@ +#include +#include +#include +#include +#include #include "py_monero_common.h" #include "utils/monero_utils.h" @@ -29,7 +34,8 @@ void PyThreadPoller::set_is_polling(bool is_polling) { } else { if (m_poll_loop_running) { m_poll_cv.notify_one(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); // TODO: in emscripten, m_sync_cv.notify_one() returns without waiting, so sleep; bug in emscripten upstream llvm? + // TODO: in emscripten, m_sync_cv.notify_one() returns without waiting, so sleep; bug in emscripten upstream llvm? + std::this_thread::sleep_for(std::chrono::milliseconds(1)); m_thread.join(); } } @@ -224,6 +230,17 @@ PyMoneroJsonRequest::PyMoneroJsonRequest(const std::string& method, const std::s if (params == nullptr) m_params = boost::none; } +PyMoneroGetBlocksByHeightRequest::PyMoneroGetBlocksByHeightRequest(uint64_t num_blocks) { + m_method = "get_blocks_by_height.bin"; + m_heights.reserve(num_blocks); + for (uint64_t i = 0; i < num_blocks; i++) m_heights.push_back(i); +} + +rapidjson::Value PyMoneroGetBlocksByHeightRequest::to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const { + rapidjson::Value root(rapidjson::kObjectType); + if (!m_heights.empty()) root.AddMember("heights", monero_utils::to_rapidjson_val(allocator, m_heights), allocator); + return root; +} std::string PyMoneroBinaryRequest::to_binary_val() const { auto json_val = serialize(); @@ -312,7 +329,10 @@ rapidjson::Value PyMoneroPathRequest::to_rapidjson_val(rapidjson::Document::Allo } rapidjson::Value PyMoneroJsonRequest::to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const { + // create root rapidjson::Value root(rapidjson::kObjectType); + + // set string values rapidjson::Value value_str(rapidjson::kStringType); if (m_version != boost::none) monero_utils::add_json_member("version", m_version.get(), allocator, root, value_str); @@ -373,6 +393,7 @@ rapidjson::Value PyMoneroRpcConnection::to_rapidjson_val(rapidjson::Document::Al bool PyMoneroConnectionPriorityComparator::compare(int p1, int p2) { if (p1 == p2) return false; + // 0 alway first if (p1 == 0) return true; if (p2 == 0) return false; return p1 > p2; @@ -386,6 +407,7 @@ bool PyMoneroRpcConnection::before(const std::shared_ptr& // order by availability then priority then by name if (c1->m_is_online == c2->m_is_online) { if (c1->m_priority == c2->m_priority) { + // order by priority in descending order return c1->m_uri.value_or("") < c2->m_uri.value_or(""); } // order by priority in descending order @@ -394,27 +416,23 @@ bool PyMoneroRpcConnection::before(const std::shared_ptr& if (c1->m_is_online != boost::none && c1->m_is_online.get()) return true; else if (c2->m_is_online != boost::none && c2->m_is_online.get()) return false; else if (c1->m_is_online == boost::none) return true; + // c1 is offline return false; } } bool PyMoneroRpcConnection::is_onion() const { - if (m_uri == boost::none) return false; - if (m_uri && m_uri->size() >= 6 && m_uri->compare(m_uri->size() - 6, 6, ".onion") == 0) { - return true; - } - return false; + // check onion uri + return m_uri != boost::none && m_uri->size() >= 6 && m_uri->compare(m_uri->size() - 6, 6, ".onion") == 0; } bool PyMoneroRpcConnection::is_i2p() const { - if (m_uri == boost::none) return false; - if (m_uri && m_uri->size() >= 8 && m_uri->compare(m_uri->size() - 8, 8, ".b32.i2p") == 0) { - return true; - } - return false; + // check i2p uri + return m_uri != boost::none && m_uri->size() >= 8 && m_uri->compare(m_uri->size() - 8, 8, ".b32.i2p") == 0; } void PyMoneroRpcConnection::set_credentials(const std::string& username, const std::string& password) { + // reset http client if (m_http_client != nullptr) { if (m_http_client->is_connected()) { m_http_client->disconnect(); @@ -427,6 +445,7 @@ void PyMoneroRpcConnection::set_credentials(const std::string& username, const s bool username_empty = username.empty(); bool password_empty = password.empty(); + // check username and password consistency if (!username_empty || !password_empty) { if (password_empty) { throw PyMoneroError("password cannot be empty because username is not empty"); @@ -437,14 +456,17 @@ void PyMoneroRpcConnection::set_credentials(const std::string& username, const s } } + // check username and password changes bool username_equals = (m_username == boost::none && username_empty) || (m_username != boost::none && *m_username == username); bool password_equals = (m_password == boost::none && password_empty) || (m_password != boost::none && *m_password == password); + // connection reset values if (!username_equals || !password_equals) { m_is_online = boost::none; m_is_authenticated = boost::none; } + // setup username and password if (!username_empty && !password_empty) { m_username = username; m_password = password; @@ -460,25 +482,36 @@ void PyMoneroRpcConnection::set_attribute(const std::string& key, const std::str std::string PyMoneroRpcConnection::get_attribute(const std::string& key) const { std::unordered_map::const_iterator i = m_attributes.find(key); - if (i == m_attributes.end()) + if (i == m_attributes.end()) { + // attribute not found return std::string(""); + } return i->second; } -bool PyMoneroRpcConnection::is_online() const { - return m_is_online.value_or(false); +boost::optional PyMoneroRpcConnection::is_connected() const { + if (m_is_online == boost::none) return boost::none; + return m_is_online.get() && (m_is_authenticated == boost::none || m_is_authenticated.get()); } -bool PyMoneroRpcConnection::is_authenticated() const { - return m_is_authenticated.value_or(false); -} +void PyMoneroRpcConnection::reset() { + boost::lock_guard lock(m_mutex); + if (!m_http_client) throw std::runtime_error("http client not set"); + + // disconnect http client + if (m_http_client->is_connected()) { + m_http_client->disconnect(); + } -bool PyMoneroRpcConnection::is_connected() const { - if (!is_online()) return false; - if (m_is_authenticated != boost::none) { - return is_authenticated(); + // set empty proxy + if(!m_http_client->set_proxy(m_proxy_uri.value_or(""))) { + throw std::runtime_error("Could not set proxy"); } - return true; + + // reset instance variables + m_is_online = boost::none; + m_is_authenticated = boost::none; + m_response_time = boost::none; } bool PyMoneroRpcConnection::check_connection(const boost::optional& timeout_ms) { @@ -487,35 +520,26 @@ bool PyMoneroRpcConnection::check_connection(const boost::optional& timeout boost::lock_guard lock(m_mutex); auto start = std::chrono::high_resolution_clock::now(); try { - if (!m_http_client) throw std::runtime_error("http client not set"); - if (m_http_client->is_connected()) { - m_http_client->disconnect(); - } - - if (m_proxy_uri != boost::none && !m_proxy_uri.get().empty()) { - if(!m_http_client->set_proxy(m_proxy_uri.get())) { - throw std::runtime_error("Could not set proxy"); - } - } + reset(); + // setup connection credentials if(m_username != boost::none && !m_username->empty() && m_password != boost::none && !m_password->empty()) { auto credentials = std::make_shared(); credentials->username = *m_username; credentials->password = *m_password; m_credentials = *credentials; } + else m_credentials = boost::none; - if (!m_http_client->set_server(m_uri.get(), m_credentials)) { + if (!m_http_client->set_server(m_uri.value_or(""), m_credentials)) { throw std::runtime_error("Could not set rpc connection: " + m_uri.get()); } m_http_client->connect(std::chrono::milliseconds(timeout_ms == boost::none ? m_timeout : *timeout_ms)); - std::vector heights; - heights.reserve(100); - for(long i = 0; i < 100; i++) heights.push_back(i); - py::dict params; - params["heights"] = heights; - send_binary_request("get_blocks_by_height.bin", params); + + // assume daemon connection + PyMoneroGetBlocksByHeightRequest request(100); + send_binary_request(request); m_is_online = true; m_is_authenticated = true; } @@ -525,6 +549,7 @@ bool PyMoneroRpcConnection::check_connection(const boost::optional& timeout m_response_time = boost::none; if (ex.code == 401) { + // TODO monero-project epee http client doesn't propagate 401 error code m_is_online = true; m_is_authenticated = false; } @@ -535,12 +560,19 @@ bool PyMoneroRpcConnection::check_connection(const boost::optional& timeout } } catch (const std::exception& ex) { - m_is_online = false; - m_is_authenticated = boost::none; - m_response_time = boost::none; + if(ex.what() == std::string("Network error") && m_http_client->is_connected()) { + // TODO implement custom epee http client with 401 error handler? + m_is_online = true; + m_is_authenticated = false; + } else { + m_is_online = false; + m_is_authenticated = boost::none; + m_response_time = boost::none; + } } if (*m_is_online) { + // set response time auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(end - start); m_response_time = duration.count(); @@ -554,14 +586,6 @@ void PyMoneroConnectionManager::add_listener(const std::shared_ptr &listener) { boost::lock_guard lock(m_listeners_mutex); + // find and remove listener m_listeners.erase(std::remove_if(m_listeners.begin(), m_listeners.end(), [&listener](std::shared_ptr iter){ return iter == listener; }), m_listeners.end()); } void PyMoneroConnectionManager::remove_listeners() { + // remove all listeners boost::lock_guard lock(m_listeners_mutex); m_listeners.clear(); } @@ -583,7 +609,12 @@ std::vector> PyMoneroConnect std::vector> PyMoneroConnectionManager::get_connections() const { boost::lock_guard lock(m_connections_mutex); - return m_connections; + // sort connections by priority + std::vector> sorted_connections(m_connections); + std::sort(sorted_connections.begin(), sorted_connections.end(), [this](const std::shared_ptr& c1, const std::shared_ptr& c2) { + return PyMoneroRpcConnection::before(c1, c2, m_current_connection); + }); + return sorted_connections; } std::shared_ptr PyMoneroConnectionManager::get_connection_by_uri(const std::string &uri) { @@ -592,20 +623,24 @@ std::shared_ptr PyMoneroConnectionManager::get_connection if (m_connection->m_uri == uri) return m_connection; } + // connection not found return nullptr; } void PyMoneroConnectionManager::add_connection(const std::shared_ptr& connection) { if (connection->m_uri == boost::none) throw std::runtime_error("Invalid connection uri"); boost::lock_guard lock(m_connections_mutex); + // check for duplicates for(const auto &m_connection : m_connections) { if (m_connection->m_uri == connection->m_uri) throw std::runtime_error("Connection URI already exists with connection manager: " + connection->m_uri.get()); } + // add connection m_connections.push_back(connection); } void PyMoneroConnectionManager::add_connection(const std::string &uri) { + // add new connection by uri std::shared_ptr connection = std::make_shared(); connection->m_uri = uri; add_connection(connection); @@ -615,12 +650,13 @@ void PyMoneroConnectionManager::remove_connection(const std::string &uri) { boost::lock_guard lock(m_connections_mutex); std::shared_ptr connection = get_connection_by_uri(uri); - if (connection == nullptr) throw std::runtime_error("Connection not found"); + // remove connection m_connections.erase(std::remove_if(m_connections.begin(), m_connections.end(), [&connection](std::shared_ptr iter){ return iter == connection; }), m_connections.end()); if (connection == m_current_connection) { + // remove also from current connection m_current_connection = nullptr; on_connection_changed(m_current_connection); } @@ -629,18 +665,24 @@ void PyMoneroConnectionManager::remove_connection(const std::string &uri) { void PyMoneroConnectionManager::set_connection(const std::shared_ptr& connection) { if (connection == m_current_connection) return; + // check if setting null connection if (connection == nullptr) { m_current_connection = nullptr; on_connection_changed(nullptr); return; } + // must provide uri if (connection->m_uri == boost::none || connection->m_uri->empty()) throw std::runtime_error("Connection is missing URI"); + // add or replace connection boost::lock_guard lock(m_connections_mutex); + // remove previuous connection auto prev_connection = get_connection_by_uri(connection->m_uri.get()); if (prev_connection != nullptr) m_connections.erase(std::remove_if(m_connections.begin(), m_connections.end(), [&prev_connection](std::shared_ptr iter){ return iter == prev_connection; }), m_connections.end()); + + // set connection and notify changes add_connection(connection); m_current_connection = connection; on_connection_changed(connection); @@ -648,16 +690,20 @@ void PyMoneroConnectionManager::set_connection(const std::shared_ptr(nullptr)); return; } + // check if connection already exists auto found = get_connection_by_uri(uri); if (found != nullptr) { + // set already found connection set_connection(found); } else { + // create new connection auto connection = std::make_shared(); connection->m_uri = uri; set_connection(connection); @@ -666,12 +712,10 @@ void PyMoneroConnectionManager::set_connection(const std::string& uri) { bool PyMoneroConnectionManager::has_connection(const std::string& uri) { auto connection = get_connection_by_uri(uri); - - if (connection != nullptr) return true; - return false; + return connection != nullptr; } -bool PyMoneroConnectionManager::is_connected() const { +boost::optional PyMoneroConnectionManager::is_connected() const { if (m_current_connection == nullptr) return false; return m_current_connection->is_connected(); } @@ -680,18 +724,23 @@ void PyMoneroConnectionManager::check_connection() { bool connection_changed = false; std::shared_ptr connection = get_connection(); if (connection != nullptr) { + // check current connection if (connection->check_connection(m_timeout)) connection_changed = true; std::vector> cons; cons.push_back(connection); process_responses(cons); } - if (m_auto_switch && !is_connected()) { + + if (m_auto_switch && !is_connected().value_or(false)) { + // switch to best available connection std::shared_ptr best_connection = get_best_available_connection(connection); if (best_connection != nullptr) { set_connection(best_connection); return; } } + + // notify changes if (connection_changed) on_connection_changed(connection); } @@ -700,81 +749,92 @@ void PyMoneroConnectionManager::set_auto_switch(bool auto_switch) { } void PyMoneroConnectionManager::stop_polling() { - if (m_is_polling) { - m_is_polling = false; - if (m_thread.joinable()) m_thread.join(); - } + set_is_polling(false); } void PyMoneroConnectionManager::start_polling(const boost::optional& period_ms, const boost::optional& auto_switch, const boost::optional& timeout_ms, const boost::optional& poll_type, const boost::optional>> &excluded_connections) { + // stop polling + stop_polling(); + // apply defaults - uint64_t period = period_ms == boost::none ? DEFAULT_POLL_PERIOD : period_ms.get(); - PyMoneroConnectionPollType _pool_type = poll_type == boost::none ? PyMoneroConnectionPollType::PRIORITIZED : poll_type.get(); + uint64_t poll_period_ms = period_ms == boost::none ? DEFAULT_POLL_PERIOD : period_ms.get(); + set_period_in_ms(poll_period_ms); if (auto_switch != boost::none) set_auto_switch(auto_switch.get()); if (timeout_ms != boost::none) set_timeout(timeout_ms.get()); + m_excluded_connections.clear(); - // stop polling - stop_polling(); + // set poll type + m_poll_type = poll_type == boost::none ? PyMoneroConnectionPollType::PRIORITIZED : poll_type.get(); + if (excluded_connections != boost::none) { + m_excluded_connections.insert(excluded_connections.get().begin(), excluded_connections.get().end()); + } // start polling - switch (_pool_type) { - case PyMoneroConnectionPollType::CURRENT: - start_polling_connection(period); - break; - case PyMoneroConnectionPollType::ALL: - start_polling_connections(period); - break; - case PyMoneroConnectionPollType::UNDEFINED: - case PyMoneroConnectionPollType::PRIORITIZED: - start_polling_prioritized_connections(period, excluded_connections); - break; - } + set_is_polling(true); } std::shared_ptr PyMoneroConnectionManager::get_best_available_connection(const std::set>& excluded_connections) { + // try connections within each ascending priority auto cons = get_connections_in_ascending_priority(); - for (const auto& prioritizedConnections : cons) { + + for (const auto& prioritized_connections : cons) { try { - std::vector futures; - for (const auto& connection : prioritizedConnections) { + // check connections in parallel + boost::asio::thread_pool pool(4); + + boost::mutex mtx; + boost::condition_variable cv; + std::queue> completed; + std::atomic remaining{0}; + + for (const auto& connection : prioritized_connections) { if (!connection) throw std::runtime_error("connection is nullptr"); if (excluded_connections.count(connection)) continue; - std::thread thread = std::thread([this, connection]() { + + remaining++; + + boost::asio::post(pool, [&, connection]() { connection->check_connection(m_timeout); + + { + boost::lock_guard lock(mtx); + completed.push(connection); + } + cv.notify_one(); }); - thread.detach(); - futures.push_back(&thread); } - for (auto& fut : futures) { - if (fut->joinable()) fut->join(); - } + // get connection by completion order + while (remaining > 0) { + boost::unique_lock lock(mtx); - std::shared_ptr best_connection = nullptr; + cv.wait(lock, [&]() { return !completed.empty(); }); - for (const auto& conn : prioritizedConnections) { - try { - if (!conn) throw std::runtime_error("connection is nullptr"); - if (conn->is_connected()) { - if (best_connection == nullptr || best_connection->m_response_time == boost::none || best_connection->m_response_time < conn->m_response_time) best_connection = conn; - } - } catch (...) { - std::cout << "MoneroRpcConnection::get_best_available_connection(): error" << std::endl; + auto connection = completed.front(); + completed.pop(); + remaining--; + + lock.unlock(); + + // use first available connection + if (connection->is_connected().value_or(false)) { + pool.join(); + return connection; } } - if (best_connection != nullptr) return best_connection; + pool.join(); + } catch (const std::exception& e) { - throw std::runtime_error(std::string("Connection check error: ") + e.what()); + throw PyMoneroError(std::string("Connection check error: ") + e.what()); } } - return std::shared_ptr(nullptr); + return nullptr; } std::shared_ptr PyMoneroConnectionManager::get_best_available_connection(const std::shared_ptr& excluded_connection) { const std::set>& excluded_connections = { excluded_connection }; - return get_best_available_connection(excluded_connections); } @@ -787,9 +847,12 @@ void PyMoneroConnectionManager::disconnect() { } void PyMoneroConnectionManager::clear() { + // clear connections m_connections.clear(); + m_excluded_connections.clear(); if (m_current_connection != nullptr) { + // clear current connection m_current_connection = nullptr; on_connection_changed(m_current_connection); } @@ -805,7 +868,7 @@ void PyMoneroConnectionManager::reset() { void PyMoneroConnectionManager::on_connection_changed(const std::shared_ptr& connection) { boost::lock_guard lock(m_listeners_mutex); - + // notify connection change to listeners for (const auto &listener : m_listeners) { listener->on_connection_changed(connection); } @@ -814,26 +877,26 @@ void PyMoneroConnectionManager::on_connection_changed(const std::shared_ptr>> PyMoneroConnectionManager::get_connections_in_ascending_priority() { boost::lock_guard lock(m_connections_mutex); + // build connection priorities map std::map>> connection_priorities; - for (const auto& connection : m_connections) { int priority = connection->m_priority; connection_priorities[priority].push_back(connection); } + // build prioritized connections vector std::vector>> prioritized_connections; - for (auto& [priority, group] : connection_priorities) { prioritized_connections.push_back(group); } if (!prioritized_connections.empty() && connection_priorities.count(0)) { - auto it = std::find_if(prioritized_connections.begin(), prioritized_connections.end(), - [](const auto& group) { - return !group.empty() && group[0]->m_priority == 0; - }); + auto it = std::find_if(prioritized_connections.begin(), prioritized_connections.end(), [](const auto& group) { + return !group.empty() && group[0]->m_priority == 0; + }); if (it != prioritized_connections.end()) { + // move priority 0 to end auto zero_priority_group = *it; prioritized_connections.erase(it); prioritized_connections.push_back(zero_priority_group); @@ -843,103 +906,99 @@ std::vector>> PyMoneroConnect return prioritized_connections; } -void PyMoneroConnectionManager::start_polling_connection(uint64_t period_ms) { - m_is_polling = true; - - m_thread = std::thread([this, period_ms]() { - while (m_is_polling) { - try { - check_connection(); - } catch (const std::exception& e) { - std::cout << "ERROR " << e.what() << std::endl; - } - - std::this_thread::sleep_for(std::chrono::milliseconds(period_ms)); - } - }); - m_thread.detach(); +void PyMoneroConnectionManager::poll() { + // do polling + switch (m_poll_type) { + case PyMoneroConnectionPollType::CURRENT: + check_connection(); + break; + case PyMoneroConnectionPollType::ALL: + check_connections(); + break; + case PyMoneroConnectionPollType::UNDEFINED: + case PyMoneroConnectionPollType::PRIORITIZED: + check_prioritized_connections(); + break; + } } -void PyMoneroConnectionManager::start_polling_connections(uint64_t period_ms) { - m_is_polling = true; +bool PyMoneroConnectionManager::check_connections(const std::vector>& connections, const std::set>& excluded_connections) { + boost::lock_guard connections_lock(m_connections_mutex); + try { + // start checking connections in parallel + boost::asio::thread_pool pool(4); + boost::mutex result_mutex; + boost::condition_variable result_cv; + std::vector> completed; + + // submit tasks + int num_tasks = 0; + for (const auto& connection : connections) { + if (excluded_connections.count(connection)) continue; - m_thread = std::thread([this, period_ms]() { - while (m_is_polling) { - try { - check_connections(); - } catch (const std::exception& e) { - std::cout << "ERROR " << e.what() << std::endl; - } + num_tasks++; - std::this_thread::sleep_for(std::chrono::milliseconds(period_ms)); - } - }); - m_thread.detach(); -} + boost::asio::post(pool, [this, connection, &result_mutex, &result_cv, &completed]() { + bool change = connection->check_connection(m_timeout); -void PyMoneroConnectionManager::start_polling_prioritized_connections(uint64_t period_ms, const boost::optional>>& excluded_connections) { - m_is_polling = true; - m_thread = std::thread([this, period_ms, &excluded_connections]() { - while (m_is_polling) { - try { - check_prioritized_connections(excluded_connections); - } catch (const std::exception& e) { - std::cout << "ERROR " << e.what() << std::endl; - } + if (change && connection == get_connection()) { + on_connection_changed(connection); + } - std::this_thread::sleep_for(std::chrono::milliseconds(period_ms)); - } - }); - m_thread.detach(); -} + { + boost::lock_guard lock(result_mutex); + completed.push_back(connection); + } -bool PyMoneroConnectionManager::check_connections(const std::vector>& connections, const std::set>& excluded_connections) { - boost::lock_guard lock(m_connections_mutex); - try { - auto timeout_ms = m_timeout; + result_cv.notify_one(); + }); + } bool has_connection = false; + size_t received = 0; - for (const auto& connection : connections) { - if (excluded_connections.count(connection)) continue; + // wait for responses + while (received < num_tasks) { + boost::unique_lock lock(result_mutex); + result_cv.wait(lock, [&]() { return completed.size() > received; }); - bool changed = connection->check_connection(timeout_ms); - if (changed && connection == get_connection()) { - on_connection_changed(connection); - } - if (connection->is_connected() && !has_connection) { + auto connection = completed[received++]; + lock.unlock(); + + if (connection->is_connected().value_or(false) && !has_connection) { has_connection = true; - if (!is_connected() && m_auto_switch) { - set_connection(connection); - } + if (!is_connected().value_or(false) && m_auto_switch) set_connection(connection); } } + pool.join(); + + // process responses process_responses(connections); + return has_connection; } catch (const std::exception& e) { - throw std::runtime_error(std::string("check_connections failed: ") + e.what()); + throw PyMoneroError(std::string("check_connections failed: ") + e.what()); } } -void PyMoneroConnectionManager::check_prioritized_connections(const boost::optional>>& excluded_connections) { +void PyMoneroConnectionManager::check_prioritized_connections() { + // check connections in ascending priority for (const auto &prioritized_connections : get_connections_in_ascending_priority()) { - if (excluded_connections != boost::none) { - std::set> ex(excluded_connections.get().begin(), excluded_connections.get().end()); - check_connections(prioritized_connections, ex); - } - else { check_connections(prioritized_connections, {}); } + check_connections(prioritized_connections, m_excluded_connections); } } std::shared_ptr PyMoneroConnectionManager::process_responses(const std::vector>& responses) { + // add new connections for (const auto& conn : responses) { if (m_response_times.find(conn) == m_response_times.end()) { m_response_times[conn] = {}; } } + // insert response times or null for (auto& [conn, times] : m_response_times) { if (std::find(responses.begin(), responses.end(), conn) != responses.end()) { times.insert(times.begin(), conn->m_response_time); @@ -948,69 +1007,75 @@ std::shared_ptr PyMoneroConnectionManager::process_respon } if (times.size() > MIN_BETTER_RESPONSES) { + // remove old response times times.pop_back(); } } + // update best connection based on responses and priority return update_best_connection_in_priority(); } std::shared_ptr PyMoneroConnectionManager::get_best_connection_from_prioritized_responses(const std::vector>& responses) { - std::shared_ptr best_response = std::shared_ptr(nullptr); - + // get best response + std::shared_ptr best_response = nullptr; for (const auto& conn : responses) { - if (conn->is_connected()) { + if (conn->is_connected().value_or(false)) { if (!best_response || conn->m_response_time < best_response->m_response_time) { best_response = conn; } } } - if (!best_response) return std::shared_ptr(nullptr); + // no update if no responses + if (!best_response) return nullptr; + // use best response if disconnected auto best_connection = get_connection(); - if (!best_connection || !best_connection->is_connected()) { + if (!best_connection || !best_connection->is_connected().value_or(false)) { return best_response; } - if (PyMoneroConnectionPriorityComparator::compare(best_response->m_priority, best_connection->m_priority) != 0) { + // use best response if different priority (assumes being called in descending priority) + if (best_response->m_priority != best_connection->m_priority) { return best_response; } + // keep best connection if not enough data if (m_response_times.find(best_connection) == m_response_times.end()) { return best_connection; } + // check if a connection is consistently better for (const auto& conn : responses) { if (conn == best_connection) continue; auto it_best = m_response_times.find(best_connection); auto it_curr = m_response_times.find(conn); - if (it_curr == m_response_times.end()) continue; - if (it_curr->second.size() < MIN_BETTER_RESPONSES || it_best->second.size() < MIN_BETTER_RESPONSES) continue; - bool consistently_better = true; - for (int i = 0; i < MIN_BETTER_RESPONSES; ++i) { + if (it_curr == m_response_times.end() || it_curr->second.size() < MIN_BETTER_RESPONSES) continue; + + bool better = true; + for (int i = 0; i < MIN_BETTER_RESPONSES; i++) { auto curr_time = it_curr->second[i]; auto best_time = it_best->second[i]; if (curr_time == boost::none || best_time == boost::none || curr_time.get() > best_time.get()) { - consistently_better = false; + better = false; break; } } - if (consistently_better) { - best_connection = conn; - } + if (better) best_connection = conn; } return best_connection; } std::shared_ptr PyMoneroConnectionManager::update_best_connection_in_priority() { - if (!m_auto_switch) return std::shared_ptr(nullptr); + if (!m_auto_switch) return nullptr; for (const auto& prioritized_connections : get_connections_in_ascending_priority()) { + // get best connection and update current auto best_conn = get_best_connection_from_prioritized_responses(prioritized_connections); if (best_conn != nullptr) { set_connection(best_conn); @@ -1018,5 +1083,6 @@ std::shared_ptr PyMoneroConnectionManager::update_best_co } } - return std::shared_ptr(nullptr); + // no connection updated + return nullptr; } diff --git a/src/cpp/common/py_monero_common.h b/src/cpp/common/py_monero_common.h index cd428c2..e8b3d29 100644 --- a/src/cpp/common/py_monero_common.h +++ b/src/cpp/common/py_monero_common.h @@ -119,6 +119,9 @@ enum PyMoneroConnectionType : uint8_t { I2P }; +/* + * Specify behavior when polling. +*/ enum PyMoneroConnectionPollType : uint8_t { PRIORITIZED = 0, CURRENT, @@ -257,30 +260,137 @@ class PyMoneroBinaryResponse { boost::optional get_response() const; }; -class PyMoneroRpcConnection : public monero::monero_rpc_connection { +// TODO refactory +class PyMoneroGetBlocksByHeightRequest : public PyMoneroBinaryRequest { public: - boost::optional m_zmq_uri; - int m_priority; - uint64_t m_timeout; - boost::optional m_response_time; + std::vector m_heights; + + PyMoneroGetBlocksByHeightRequest(uint64_t num_blocks); + PyMoneroGetBlocksByHeightRequest(const std::vector& heights): m_heights(heights) { m_method = "get_blocks_by_height.bin"; } + rapidjson::Value to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const override; +}; + +/** + * Maintains a connection and sends requests to a Monero RPC API. + * + * TODO: refactor monero_rpc_connection extends monero_connection? + */ +class PyMoneroRpcConnection : public monero::monero_rpc_connection { +public: + boost::optional m_zmq_uri; // TODO implement zmq listener + int m_priority; // priority relative to other connections. 1 is highest, then priority 2, etc. Default prorioty is 0, lowest priority. + uint64_t m_timeout; // RPC request timeout in milliseconds. + boost::optional m_response_time; // automatically set by calling check_connection() + + /** + * Checks rpc connection order. + * + * @param c1 first RPC connection to compare. + * @param c2 second RPC connection to compare. + * @param current_connection connection with highest priority. + */ static bool before(const std::shared_ptr& c1, const std::shared_ptr& c2, const std::shared_ptr& current_connection); + /** + * Initialize a new RPC connection. + * + * @param uri RPC connection uri. + * @param username RPC connection authentication username. + * @param password RPC connection authentication password. + * @param proxy_uri RPC connection proxy uri. + * @param zmq_uri RPC connection zmq uri. + * @param priority RPC connection priority. + * @param timeout RPC connection timeout in milliseconds. + */ PyMoneroRpcConnection(const std::string& uri = "", const std::string& username = "", const std::string& password = "", const std::string& proxy_uri = "", const std::string& zmq_uri = "", int priority = 0, uint64_t timeout = 20000); - PyMoneroRpcConnection(const monero::monero_rpc_connection& rpc); - rapidjson::Value to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const override; + /** + * Copy a RPC connection. + * + * @param rpc RPC connection to copy. + */ + PyMoneroRpcConnection(const monero::monero_rpc_connection& rpc); + /** + * Indicates if the connection uri is a TOR server. + * + * @return true or false to indicate if connection uri is a TOR server. + */ bool is_onion() const; + + /** + * Indicates if the connection uri is a I2P server. + * + * @return true or false to indicate if connection uri is a I2P server. + */ bool is_i2p() const; + + /** + * Set connection credentials. + * + * @param username username to use in RPC authentication. + * @param password password to use in RPC authentication. + */ void set_credentials(const std::string& username, const std::string& password); + + /** + * Set connection attribute. + * + * @param key is the attribute key + * @param val is the attribute value + */ void set_attribute(const std::string& key, const std::string& val); + + /** + * Get connection attribute. + * + * @param key is the attribute to get the value of + * @return key's value if set + */ std::string get_attribute(const std::string& key) const; - bool is_online() const; - bool is_authenticated() const; - bool is_connected() const; + + /** + * Indicates if the connection is online, which is set automatically by calling check_connection(). + * + * @return true or false to indicate if online, or null if check_connection() has not been called + */ + boost::optional is_online() const { return m_is_online; } + + /** + * Indicates if the connection is authenticated, which is set automatically by calling check_connection(). + * + * @return true if authenticated or no authentication, false if not authenticated, or null if not set + */ + boost::optional is_authenticated() const { return m_is_authenticated; } + + /** + * Indicates if the connection is connected, which is set automatically by calling check_connection(). + * + * @return true or false to indicate if connected, or null if check_connection() has not been called + */ + boost::optional is_connected() const; + + /** + * Check the connection and update online, authentication, and response time status. + * + * @param timeout_ms the maximum response time before considered offline + * @return + */ bool check_connection(const boost::optional& timeout_ms = boost::none); + /** + * Resets the current connection. + */ + void reset(); + + /** + * Send a request to the RPC API. + * + * @param path specifies the method to request + * @param params are the request's input parameters + * @return the RPC API response as a map + */ inline const boost::property_tree::ptree send_json_request(const std::string& path, const std::shared_ptr& params = nullptr) { PyMoneroJsonRequest request(path, params); auto response = send_json_request(request); @@ -289,6 +399,13 @@ class PyMoneroRpcConnection : public monero::monero_rpc_connection { return response->m_result.get(); } + /** + * Send a request to the RPC API. + * + * @param request specifies the method to request with parameters + * @param timeout request timeout in milliseconds + * @return the RPC API response as a map + */ inline const std::shared_ptr send_json_request(const PyMoneroJsonRequest &request, std::chrono::milliseconds timeout = std::chrono::seconds(15)) { PyMoneroJsonResponse response; @@ -298,6 +415,15 @@ class PyMoneroRpcConnection : public monero::monero_rpc_connection { return std::make_shared(response); } + /** + * Send a RPC request to the given path and with the given paramters. + * + * E.g. "/get_transactions" with params + * + * @param path is the url path of the request to invoke + * @param params are request parameters sent in the body + * @return the RPC API response as a map + */ inline const boost::property_tree::ptree send_path_request(const std::string& path, const std::shared_ptr& params = nullptr) { PyMoneroPathRequest request(path, params); auto response = send_path_request(request); @@ -306,6 +432,13 @@ class PyMoneroRpcConnection : public monero::monero_rpc_connection { return response->m_response.get(); } + /** + * Send a RPC request to the given path and with the given paramters. + * + * @param request specifies the method to request with parameters + * @param timeout request timeout in milliseconds + * @return the request's deserialized response + */ inline const std::shared_ptr send_path_request(const PyMoneroPathRequest &request, std::chrono::milliseconds timeout = std::chrono::seconds(15)) { PyMoneroPathResponse response; @@ -316,6 +449,13 @@ class PyMoneroRpcConnection : public monero::monero_rpc_connection { return std::make_shared(response); } + /** + * Send a binary RPC request. + * + * @param request specifies the method to request with paramesters + * @param timeout request timeout in milliseconds + * @return the request's deserialized response + */ inline const std::shared_ptr send_binary_request(const PyMoneroBinaryRequest &request, std::chrono::milliseconds timeout = std::chrono::seconds(15)) { if (request.m_method == boost::none || request.m_method->empty()) throw std::runtime_error("No RPC method set in binary request"); if (!m_http_client) throw std::runtime_error("http client not set"); @@ -335,25 +475,51 @@ class PyMoneroRpcConnection : public monero::monero_rpc_connection { // exposed python methods + /** + * Send a request to the RPC API. + * + * @param method specifies the method to request + * @param parameters are the request's input parameters + * @return the RPC API response as a map + */ inline boost::optional send_json_request(const std::string& method, const boost::optional& parameters) { PyMoneroJsonRequest request(method, parameters); auto response = send_json_request(request); return response->get_result(); } + /** + * Send a RPC request to the given path and with the given paramters. + * + * E.g. "/get_transactions" with params + * + * @param method is the url path of the request to invoke + * @param parameters are request parameters sent in the body + * @return the RPC API response as a map + */ inline boost::optional send_path_request(const std::string& method, const boost::optional& parameters) { PyMoneroPathRequest request(method, parameters); auto response = send_path_request(request); return response->get_response(); } + /** + * Send a binary RPC request. + * + * @param method specifies the method to request + * @param parameters are request parameters sent in the body + * @return the request's deserialized response + */ inline boost::optional send_binary_request(const std::string& method, const boost::optional& parameters) { PyMoneroBinaryRequest request(method, parameters); auto response = send_binary_request(request); return response->m_binary; } + rapidjson::Value to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const override; + protected: + // istance variables mutable boost::recursive_mutex m_mutex; std::string m_server; boost::optional m_credentials; @@ -412,41 +578,186 @@ class PyMoneroConnectionManagerListener : public monero_connection_manager_liste } }; -class PyMoneroConnectionManager { +class PyMoneroConnectionManager : public PyThreadPoller { public: ~PyMoneroConnectionManager(); PyMoneroConnectionManager() { } - PyMoneroConnectionManager(const PyMoneroConnectionManager &connection_manager); + /** + * Add a listener to receive notifications when the connection changes. + * + * @param listener the listener to add + */ void add_listener(const std::shared_ptr &listener); + + /** + * Remove a listener. + * + * @param listener the listener to remove + */ void remove_listener(const std::shared_ptr &listener); + + /** + * Remove all listeners. + */ void remove_listeners(); + + /** + * Get all listeners. + * + * @return all listeners + */ std::vector> get_listeners() const; - std::shared_ptr get_connection_by_uri(const std::string &uri); + + /** + * Add a connection. The connection may have an elevated priority for this manager to use. + * + * @param connection the connection to add + */ void add_connection(const std::shared_ptr& connection); + + /** + * Add connection URI. + * + * @param uri uri of the connection to add + */ void add_connection(const std::string &uri); + + /** + * Remove a connection. + * + * @param uri uri of the connection to remove + */ void remove_connection(const std::string &uri); + + /** + * Set the current connection without changing the credentials. + * Replace connection if its URI was previously added. Otherwise add new connection. + * Notify if current connection changes. + * Does not check the connection. + * + * @param connection is the connection to make current + */ void set_connection(const std::shared_ptr& connection); + + /** + * Set the current connection without changing the credentials. + * Add new connection if URI not previously added. + * Notify if current connection changes. + * Does not check the connection. + * + * @param uri identifies the connection to make current + */ void set_connection(const std::string& uri); + + /** + * Indicates if this manager has a connection with the given URI. + * + * @param uri URI of the connection to check + * @return true if this manager has a connection with the given URI, false otherwise + */ bool has_connection(const std::string& uri); + + /** + * Get the current connection. + * + * @return the current connection or null if no connection set + */ std::shared_ptr get_connection() const { return m_current_connection; } + + /** + * Get a connection by URI. + * + * @param uri URI of the connection to get + * @return the connection with the URI or null if no connection with the URI exists + */ + std::shared_ptr get_connection_by_uri(const std::string &uri); + + /** + * Get all connections in order of current connection (if applicable), online status, priority, and name. + * + * @return the list of sorted connections + */ std::vector> get_connections() const; + + /** + * Get if auto switch is enabled or disabled. + * + * @return true if auto switch enabled, false otherwise + */ bool get_auto_switch() const { return m_auto_switch; } void set_timeout(uint64_t timeout_ms) { m_timeout = timeout_ms; } uint64_t get_timeout() const { return m_timeout; } - bool is_connected() const; + + /** + * Indicates if the connection manager is connected to a node. + * + * @return true if the current connection is set, online, and not unauthenticated, none if unknown, false otherwise + */ + boost::optional is_connected() const; + + /** + * Check the current connection. If disconnected and auto switch enabled, switches to best available connection. + */ void check_connection(); + + /** + * Automatically switch to the best available connection as connections are polled, based on priority, response time, and consistency. + * + * @param auto_switch specifies if the connection should auto switch to a better connection + */ void set_auto_switch(bool auto_switch); + + /** + * Stop polling connections. + */ void stop_polling(); void start_polling(const boost::optional& period_ms, const boost::optional& auto_switch, const boost::optional& timeout_ms, const boost::optional& poll_type, const boost::optional>> &excluded_connections); + + /** + * Collect connectable peers of the managed connections. + * + * @return connectable peers + */ std::vector> get_peer_connections() const { throw std::runtime_error("PyMoneroConnectionManager::get_peer_connections(): not implemented"); } + + /** + * Get the best available connection in order of priority then response time. + * + * @param excluded_connections to be excluded from consideration (optional) + * @return the best available connection in order of priority then response time, null if no connections available + */ std::shared_ptr get_best_available_connection(const std::set>& excluded_connections = {}); + + /** + * Get the best available connection in order of priority then response time. + * + * @param excluded_connection to be excluded from consideration + * @return the best available connection in order of priority then response time, null if no connections available + */ std::shared_ptr get_best_available_connection(const std::shared_ptr& excluded_connection); + + /** + * Check all managed connections. + */ void check_connections(); + + /** + * Disconnect from the current connection. + */ void disconnect(); + + /** + * Remove all connections. + */ void clear(); + + /** + * Reset to default state. + */ void reset(); + void poll() override; private: // static variables @@ -454,24 +765,28 @@ class PyMoneroConnectionManager { inline static const uint64_t DEFAULT_POLL_PERIOD = 20000; inline static const bool DEFAULT_AUTO_SWITCH = true; inline static const int MIN_BETTER_RESPONSES = 3; + + // instance variables mutable boost::recursive_mutex m_listeners_mutex; mutable boost::recursive_mutex m_connections_mutex; + std::vector> m_listeners; std::vector> m_connections; std::shared_ptr m_current_connection; + std::set> m_excluded_connections; + + bool m_is_polling = false; bool m_auto_switch = true; uint64_t m_timeout = 5000; + std::map, std::vector>> m_response_times; - bool m_is_polling = false; - std::thread m_thread; + + PyMoneroConnectionPollType m_poll_type = PyMoneroConnectionPollType::UNDEFINED; void on_connection_changed(const std::shared_ptr& connection); std::vector>> get_connections_in_ascending_priority(); - void start_polling_connection(uint64_t period_ms); - void start_polling_connections(uint64_t period_ms); - void start_polling_prioritized_connections(uint64_t period_ms, const boost::optional>>& excluded_connections); bool check_connections(const std::vector>& connections, const std::set>& excluded_connections = {}); - void check_prioritized_connections(const boost::optional>>& excluded_connections); + void check_prioritized_connections(); std::shared_ptr process_responses(const std::vector>& responses); std::shared_ptr get_best_connection_from_prioritized_responses(const std::vector>& responses); std::shared_ptr update_best_connection_in_priority(); diff --git a/src/cpp/daemon/py_monero_daemon_model.cpp b/src/cpp/daemon/py_monero_daemon_model.cpp index 9237a5e..c2e21c8 100644 --- a/src/cpp/daemon/py_monero_daemon_model.cpp +++ b/src/cpp/daemon/py_monero_daemon_model.cpp @@ -895,12 +895,6 @@ rapidjson::Value PyMoneroGetBlockTemplateParams::to_rapidjson_val(rapidjson::Doc return root; } -rapidjson::Value PyMoneroGetBlocksByHeightRequest::to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const { - rapidjson::Value root(rapidjson::kObjectType); - if (!m_heights.empty()) root.AddMember("heights", monero_utils::to_rapidjson_val(allocator, m_heights), allocator); - return root; -} - rapidjson::Value PyMoneroBan::to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const { rapidjson::Value root(rapidjson::kObjectType); rapidjson::Value value_str(rapidjson::kStringType); diff --git a/src/cpp/daemon/py_monero_daemon_model.h b/src/cpp/daemon/py_monero_daemon_model.h index f4da2ac..5c13b12 100644 --- a/src/cpp/daemon/py_monero_daemon_model.h +++ b/src/cpp/daemon/py_monero_daemon_model.h @@ -122,15 +122,6 @@ class PyMoneroGetBlockTemplateParams : public PyMoneroJsonRequestParams { rapidjson::Value to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const override; }; -class PyMoneroGetBlocksByHeightRequest : public PyMoneroBinaryRequest { -public: - std::vector m_heights; - - PyMoneroGetBlocksByHeightRequest(const std::vector& heights): m_heights(heights) { m_method = "get_blocks_by_height.bin"; } - - rapidjson::Value to_rapidjson_val(rapidjson::Document::AllocatorType& allocator) const override; -}; - class PyMoneroVersion : public monero::monero_version { public: diff --git a/src/cpp/py_monero.cpp b/src/cpp/py_monero.cpp index dfaea82..09212d0 100644 --- a/src/cpp/py_monero.cpp +++ b/src/cpp/py_monero.cpp @@ -323,6 +323,7 @@ PYBIND11_MODULE(monero, m) { if (val != boost::none && !val->empty()) { self.m_proxy_uri = val; } else self.m_proxy_uri = boost::none; + self.reset(); }) .def_property("zmq_uri", [](const PyMoneroRpcConnection& self) { return self.m_zmq_uri; }, @@ -387,91 +388,91 @@ PYBIND11_MODULE(monero, m) { .def(py::init<>()) .def("add_listener", [](PyMoneroConnectionManager& self, const std::shared_ptr &listener) { MONERO_CATCH_AND_RETHROW(self.add_listener(listener)); - }, py::arg("listener")) + }, py::arg("listener"), py::call_guard()) .def("remove_listener", [](PyMoneroConnectionManager& self, const std::shared_ptr &listener) { MONERO_CATCH_AND_RETHROW(self.remove_listener(listener)); - }, py::arg("listener")) + }, py::arg("listener"), py::call_guard()) .def("remove_listeners", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.remove_listeners()); - }) + }, py::call_guard()) .def("get_listeners", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_listeners()); - }) + }, py::call_guard()) .def("get_connection_by_uri", [](PyMoneroConnectionManager& self, const std::string& uri) { MONERO_CATCH_AND_RETHROW(self.get_connection_by_uri(uri)); - }, py::arg("uri")) + }, py::arg("uri"), py::call_guard()) .def("add_connection", [](PyMoneroConnectionManager& self, const std::shared_ptr &connection) { MONERO_CATCH_AND_RETHROW(self.add_connection(connection)); - }, py::arg("connection")) + }, py::arg("connection"), py::call_guard()) .def("add_connection", [](PyMoneroConnectionManager& self, const std::string &uri) { MONERO_CATCH_AND_RETHROW(self.add_connection(uri)); - }, py::arg("uri")) + }, py::arg("uri"), py::call_guard()) .def("remove_connection", [](PyMoneroConnectionManager& self, const std::string &uri) { MONERO_CATCH_AND_RETHROW(self.remove_connection(uri)); - }, py::arg("uri")) + }, py::arg("uri"), py::call_guard()) .def("set_connection", [](PyMoneroConnectionManager& self, std::shared_ptr &connection) { MONERO_CATCH_AND_RETHROW(self.set_connection(connection)); - }, py::arg("connection")) + }, py::arg("connection"), py::call_guard()) .def("set_connection", [](PyMoneroConnectionManager& self, const std::string &uri) { MONERO_CATCH_AND_RETHROW(self.set_connection(uri)); - }, py::arg("uri")) + }, py::arg("uri"), py::call_guard()) .def("get_connection", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_connection()); - }) + }, py::call_guard()) .def("has_connection", [](PyMoneroConnectionManager& self, const std::string &uri) { MONERO_CATCH_AND_RETHROW(self.has_connection(uri)); - }, py::arg("uri")) + }, py::arg("uri"), py::call_guard()) .def("get_connections", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_connections()); - }) + }, py::call_guard()) .def("is_connected", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.is_connected()); - }) + }, py::call_guard()) .def("check_connection", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.check_connection()); - }) + }, py::call_guard()) .def("start_polling", [](PyMoneroConnectionManager& self, const boost::optional& period_ms, const boost::optional& auto_switch, const boost::optional& timeout_ms, const boost::optional& poll_type, const boost::optional>>& excluded_connections) { MONERO_CATCH_AND_RETHROW(self.start_polling(period_ms, auto_switch, timeout_ms, poll_type, excluded_connections)); - }, py::arg("period_ms") = py::none(), py::arg("auto_switch") = py::none(), py::arg("timeout_ms") = py::none(), py::arg("poll_type") = py::none(), py::arg("excluded_connections") = py::none()) + }, py::arg("period_ms") = py::none(), py::arg("auto_switch") = py::none(), py::arg("timeout_ms") = py::none(), py::arg("poll_type") = py::none(), py::arg("excluded_connections") = py::none(), py::call_guard()) .def("stop_polling", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.stop_polling()); - }) + }, py::call_guard()) .def("set_auto_switch", [](PyMoneroConnectionManager& self, bool auto_switch) { MONERO_CATCH_AND_RETHROW(self.set_auto_switch(auto_switch)); - }, py::arg("auto_switch")) + }, py::arg("auto_switch"), py::call_guard()) .def("get_auto_switch", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_auto_switch()); - }) + }, py::call_guard()) .def("set_timeout", [](PyMoneroConnectionManager& self, uint64_t timeout_ms) { MONERO_CATCH_AND_RETHROW(self.set_timeout(timeout_ms)); - }, py::arg("timeout_ms")) + }, py::arg("timeout_ms"), py::call_guard()) .def("get_timeout", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_timeout()); - }) + }, py::call_guard()) .def("get_peer_connections", [](const PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_peer_connections()); - }) + }, py::call_guard()) .def("disconnect", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.disconnect()); - }) + }, py::call_guard()) .def("clear", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.clear()); - }) + }, py::call_guard()) .def("reset", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.reset()); - }) + }, py::call_guard()) .def("get_best_available_connection", [](PyMoneroConnectionManager& self, const std::set>& excluded_connections) { MONERO_CATCH_AND_RETHROW(self.get_best_available_connection(excluded_connections)); - }, py::arg("excluded_connections")) + }, py::arg("excluded_connections"), py::call_guard()) .def("get_best_available_connection", [](PyMoneroConnectionManager& self, std::shared_ptr& excluded_connection) { MONERO_CATCH_AND_RETHROW(self.get_best_available_connection(excluded_connection)); - }, py::arg("excluded_connection")) + }, py::arg("excluded_connection"), py::call_guard()) .def("get_best_available_connection", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.get_best_available_connection()); - }) + }, py::call_guard()) .def("check_connections", [](PyMoneroConnectionManager& self) { MONERO_CATCH_AND_RETHROW(self.check_connections()); - }); + }, py::call_guard()); // monero_block_header py_monero_block_header diff --git a/src/cpp/wallet/py_monero_wallet_rpc.cpp b/src/cpp/wallet/py_monero_wallet_rpc.cpp index a764266..833983e 100644 --- a/src/cpp/wallet/py_monero_wallet_rpc.cpp +++ b/src/cpp/wallet/py_monero_wallet_rpc.cpp @@ -209,11 +209,6 @@ void PyMoneroWalletRpc::remove_listener(monero_wallet_listener& listener) { refresh_listening(); } -boost::optional PyMoneroWalletRpc::get_rpc_connection() const { - if (m_rpc == nullptr) return boost::none; - return boost::optional(*m_rpc); -} - PyMoneroWalletRpc* PyMoneroWalletRpc::open_wallet(const std::shared_ptr &config) { if (config == nullptr) throw std::runtime_error("Must provide configuration of wallet to open"); if (config->m_path == boost::none || config->m_path->empty()) throw std::runtime_error("Filename is not initialized"); diff --git a/src/cpp/wallet/py_monero_wallet_rpc.h b/src/cpp/wallet/py_monero_wallet_rpc.h index 70e7bf2..63489fa 100644 --- a/src/cpp/wallet/py_monero_wallet_rpc.h +++ b/src/cpp/wallet/py_monero_wallet_rpc.h @@ -35,7 +35,7 @@ class PyMoneroWalletRpc : public PyMoneroWallet { PyMoneroWalletRpc* open_wallet(const std::shared_ptr &config); PyMoneroWalletRpc* open_wallet(const std::string& name, const std::string& password); PyMoneroWalletRpc* create_wallet(const std::shared_ptr &config); - boost::optional get_rpc_connection() const; + std::shared_ptr get_rpc_connection() const { return m_rpc; } std::vector get_seed_languages() const; void stop(); void add_listener(monero_wallet_listener& listener) override; diff --git a/src/python/monero_connection_manager.pyi b/src/python/monero_connection_manager.pyi index cdcc1b1..146e72f 100644 --- a/src/python/monero_connection_manager.pyi +++ b/src/python/monero_connection_manager.pyi @@ -16,7 +16,7 @@ class MoneroConnectionManager: def add_connection(self, connection: MoneroRpcConnection) -> None: """ Add a connection. The connection may have an elevated priority for this manager to use. - + :param MoneroRpcConnection connection: the connection to add """ ... @@ -24,14 +24,14 @@ class MoneroConnectionManager: def add_connection(self, uri: str) -> None: """ Add a connection URI. - + :param str uri: uri of the connection to add """ ... def add_listener(self, listener: MoneroConnectionManagerListener) -> None: """ Add a listener to receive notifications when the connection changes. - + :param MoneroConnectionManagerListener listener: the listener to add """ ... @@ -58,7 +58,7 @@ class MoneroConnectionManager: def get_auto_switch(self) -> bool: """ Get if auto switch is enabled or disabled. - + :return bool: true if auto switch enabled, false otherwise """ ... @@ -96,7 +96,7 @@ class MoneroConnectionManager: def get_connection_by_uri(self, uri: str) -> MoneroRpcConnection: """ Get a connection by URI. - + :param str uri: URI of the connection to get :return MoneroRpcConnection: the connection with the URI or null if no connection with the URI exists """ @@ -118,43 +118,43 @@ class MoneroConnectionManager: def get_peer_connections(self) -> list[MoneroRpcConnection]: """ Collect connectable peers of the managed connections. - + :return list[MoneroRpcConnection]: connectable peers """ ... def get_timeout(self) -> int: """ Get the request timeout. - + :return int: the request timeout before a connection is considered offline """ ... def has_connection(self, uri: str) -> bool: """ Indicates if this manager has a connection with the given URI. - + :param str uri: URI of the connection to check :return bool: true if this manager has a connection with the given URI, false otherwise """ ... - def is_connected(self) -> bool: + def is_connected(self) -> bool | None: """ Indicates if the connection manager is connected to a node. - - :return bool: true if the current connection is set, online, and not unauthenticated, null if unknown, false otherwise + + :return bool: `True` if the current connection is set, online, and not unauthenticated, `None` if unknown, `False` otherwise. """ ... def remove_connection(self, uri: str) -> None: """ Remove a connection. - + :param str uri: uri of the connection to remove """ ... def remove_listener(self, listener: MoneroConnectionManagerListener) -> None: """ Remove a listener. - + :param MoneroConnectionManagerListener listener: the listener to remove """ ... @@ -171,7 +171,7 @@ class MoneroConnectionManager: def set_auto_switch(self, auto_switch: bool) -> None: """ Automatically switch to the best available connection as connections are polled, based on priority, response time, and consistency. - + :param bool auto_switch: specifies if the connection should auto switch to a better connection """ ... @@ -182,7 +182,7 @@ class MoneroConnectionManager: Replace connection if its URI was previously added. Otherwise add new connection. Notify if current connection changes. Does not check the connection. - + :param Optional[MoneroRpcConnection] connection: is the connection to make current """ ... @@ -193,21 +193,21 @@ class MoneroConnectionManager: Add new connection if URI not previously added. Notify if current connection changes. Does not check the connection. - + :param str uri: identifies the connection to make current """ ... def set_timeout(self, timeout_ms: int) -> None: """ Set the maximum request time before a connection is considered offline. - + :param int timeout_ms: is the timeout before a connection is considered offline """ ... def start_polling(self, period_ms: int | None = None, auto_switch: bool | None = None, timeout_ms: int | None = None, poll_type: MoneroConnectionPollType | None = None, excluded_connections: list[MoneroRpcConnection] | None = None) -> None: """ Start polling connections. - + :param Optional[int] period_ms: poll period in milliseconds (default 20s) :param Optional[bool] auto_switch: specifies to automatically switch to the best connection (default true unless changed) :param Optional[int] timeout_ms: specifies the timeout to poll a single connection (default 5s unless changed) diff --git a/src/python/monero_rpc_connection.pyi b/src/python/monero_rpc_connection.pyi index 796563c..a0a081a 100644 --- a/src/python/monero_rpc_connection.pyi +++ b/src/python/monero_rpc_connection.pyi @@ -82,22 +82,22 @@ class MoneroRpcConnection(SerializableStruct): :return value: attribute value """ ... - def is_authenticated(self) -> bool: + def is_authenticated(self) -> bool | None: """ Indicates if the connection is authenticated according to the last call to check_connection(). Note: must call check_connection() manually unless using MoneroConnectionManager. - :return bool: true if authenticated or no authentication, false if not authenticated, or null if check_connection() has not been called + :return bool | None: `True` if authenticated or no authentication required, `False` if not authenticated, or `None` if check_connection() has not been called. """ ... - def is_connected(self) -> bool: + def is_connected(self) -> bool | None: """ Indicates if the connection is connected according to the last call to check_connection(). Note: must call check_connection() manually unless using MoneroConnectionManager. - :return bool: true or false to indicate if connected, or null if check_connection() has not been called + :return bool | None: `True` or `False` to indicate if connected, or `None` if check_connection() has not been called. """ ... def is_i2p(self) -> bool: @@ -110,13 +110,13 @@ class MoneroRpcConnection(SerializableStruct): Indicates if the connection is a TOR connection. """ ... - def is_online(self) -> bool: + def is_online(self) -> bool | None: """ Indicates if the connection is online according to the last call to check_connection(). Note: must call check_connection() manually unless using MoneroConnectionManager. - :return bool: true or false to indicate if online, or null if check_connection() has not been called + :return bool | None: `True` or `False` to indicate if online, or `None` if check_connection() has not been called. """ ... def send_json_request(self, method: str, parameters: object | None = None) -> object | None: diff --git a/tests/config/config.ini b/tests/config/config.ini index ae88d26..aa870f4 100644 --- a/tests/config/config.ini +++ b/tests/config/config.ini @@ -5,7 +5,7 @@ lite_mode=False test_notifications=True test_resets=True network_type=regtest -auto_connect_timeout_ms=5000 +auto_connect_timeout_ms=3000 [daemon] rpc_uri=http://127.0.0.1:18081 @@ -28,12 +28,12 @@ rpc_port_start=18082 rpc_username=rpc_user rpc_password=abc123 rpc_access_control_origins="http:#localhost:8080" -rpc_domain=127.0.0.1 +rpc_domain=http://127.0.0.1 rpc_zmq_enabled=False rpc_zmq_port_start=48083 rpc_zmq_bind_port_start=48083 rpc_zmq_domain=127.0.0.1 -sync_period_in_ms=2500 +sync_period_in_ms=5000 [mining_wallet] name=mining_wallet diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 14d0ac3..879aaee 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -9,6 +9,7 @@ services: - node_2 - xmr_wallet_1 - xmr_wallet_2 + - xmr_wallet_3 node_1: image: lalanza808/monero:v0.18.4.4 @@ -30,7 +31,6 @@ services: "--no-zmq", "--max-connections-per-ip=100", "--rpc-max-connections-per-private-ip=100", - "--start-mining=49cvU1JnXFAH7r1RbDLJw78aaDnhW4sCKRbLaTYa9eHvcz9PK1YXwod5npWZvMyQ8L4waVjUhuCp6btFyELkRpA4SWNKeQH", "--mining-threads=1", "--rpc-login=rpc_daemon_user:abc123", "--non-interactive" @@ -126,8 +126,36 @@ services: - node_1 - node_2 + xmr_wallet_3: + image: lalanza808/monero:v0.18.4.4 + container_name: xmr_wallet_3 + command: [ + "monero-wallet-rpc", + "--log-level=3", + "--allow-mismatched-daemon-version", + "--rpc-bind-ip=0.0.0.0", + "--confirm-external-bind", + "--rpc-bind-port=18084", + "--trusted-daemon", + "--daemon-address=node_2:18081", + "--daemon-login=rpc_daemon_user:abc123", + "--rpc-login=rpc_user:abc123", + "--rpc-max-connections-per-private-ip=100", + "--wallet-dir=/wallet", + "--rpc-access-control-origins=*", + "--non-interactive" + ] + ports: + - "18084:18084" + volumes: + - xmr_wallet_3_data:/wallet + depends_on: + - node_1 + - node_2 + volumes: xmr_node_1_data: xmr_node_2_data: xmr_wallet_1_data: - xmr_wallet_2_data: \ No newline at end of file + xmr_wallet_2_data: + xmr_wallet_3_data: \ No newline at end of file diff --git a/tests/test_monero_connection_manager.py b/tests/test_monero_connection_manager.py index 328ce5a..529e20b 100644 --- a/tests/test_monero_connection_manager.py +++ b/tests/test_monero_connection_manager.py @@ -3,315 +3,361 @@ from typing import Optional from monero import ( - MoneroWallet, MoneroConnectionManager, MoneroRpcConnection, MoneroConnectionPollType + MoneroConnectionManager, MoneroRpcConnection, MoneroConnectionPollType +) +from utils import ( + ConnectionChangeCollector, TestUtils as Utils, + AssertUtils, RpcConnectionUtils ) -from utils import ConnectionChangeCollector, TestUtils as Utils, AssertUtils, GenUtils logger: logging.Logger = logging.getLogger("TestMoneroConnectionManager") -# TODO enable connection manager tests -@pytest.mark.skip(reason="TODO") + @pytest.mark.integration class TestMoneroConnectionManager: """Connection manager integration tests""" + OFFLINE_PROXY_URI: str = "127.0.0.1:9050" + """Proxy used to simulate offline servers""" + + _cm: MoneroConnectionManager | None = None + + #region Fixtures + + # Setup and teardown of test class + @pytest.fixture(scope="class", autouse=True) + def global_setup_and_teardown(self): + """Executed once before all tests""" + self.before_all() + yield + self.after_all() + + # Before all tests + def before_all(self) -> None: + """Executed once before all tests""" + logger.info(f"Setup test class {type(self).__name__}") + self._cm = MoneroConnectionManager() + + # After all tests + def after_all(self) -> None: + """Executed once after all tests""" + logger.info(f"Teardown test class {type(self).__name__}") + if self._cm: + self._cm.reset() + logger.debug("Resetted connection manager") + else: + logger.warning("Test connection manager is not set!") + + Utils.RPC_WALLET_MANAGER.clear() + + # setup and teardown of each test @pytest.fixture(autouse=True) def setup_and_teardown(self, request: pytest.FixtureRequest): logger.info(f"Before {request.node.name}") # type: ignore yield logger.info(f"After {request.node.name}") # type: ignore - def test_connection_manager(self): - wallet_rpcs: list[MoneroWallet] = Utils.get_wallets("rpc") - connection_manager: Optional[MoneroConnectionManager] = None - try: - i: int = 0 - - # create connection manager - connection_manager = MoneroConnectionManager() - - # listen for changes - listener = ConnectionChangeCollector() - connection_manager.add_listener(listener) - - # add prioritized connections - connection: Optional[MoneroRpcConnection] = wallet_rpcs[4].get_daemon_connection() - assert connection is not None - connection.priority = 1 - connection_manager.add_connection(connection) - connection = wallet_rpcs[2].get_daemon_connection() - assert connection is not None - connection.priority = 2 - connection_manager.add_connection(connection) - connection = wallet_rpcs[3].get_daemon_connection() - assert connection is not None - connection.priority = 2 - connection_manager.add_connection(connection) - connection = wallet_rpcs[0].get_daemon_connection() - assert connection is not None - connection_manager.add_connection(connection) # default priority is lowest - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert connection.uri is not None - connection_manager.add_connection(MoneroRpcConnection(connection.uri)) # test unauthenticated - - # test connections and order - ordered_connections: list[MoneroRpcConnection] = connection_manager.get_connections() - assert ordered_connections[0] == wallet_rpcs[4].get_daemon_connection() - assert ordered_connections[1] == wallet_rpcs[2].get_daemon_connection() - assert ordered_connections[2] == wallet_rpcs[3].get_daemon_connection() - assert ordered_connections[3] == wallet_rpcs[0].get_daemon_connection() - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert ordered_connections[4].uri == connection.uri - - for connection in ordered_connections: - assert connection.is_online() is None - - # test getting connection by uri - connection = wallet_rpcs[0].get_daemon_connection() - assert connection is not None - assert connection.uri is not None - assert connection_manager.has_connection(connection.uri) - assert connection_manager.get_connection_by_uri(connection.uri) == wallet_rpcs[0].get_daemon_connection() - - # test unknown connection - num_expected_changes: int = 0 - connection_manager.set_connection(ordered_connections[0]) - assert connection_manager.is_connected() is None - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - - # auto connect to the best available connection - connection_manager.start_polling(Utils.SYNC_PERIOD_IN_MS) - GenUtils.wait_for(Utils.AUTO_CONNECT_TIMEOUT_MS) - assert connection_manager.is_connected() - connection = connection_manager.get_connection() - assert connection is not None - assert connection.is_online() - assert connection == wallet_rpcs[4].get_daemon_connection() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert listener.changed_connections.get(listener.changed_connections.size() - 1) == connection - connection_manager.set_auto_switch(False) - connection_manager.stop_polling() - connection_manager.disconnect() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert listener.changed_connections.get(listener.changed_connections.size() - 1) is None - - # start periodically checking connection without auto switch - connection_manager.start_polling(Utils.SYNC_PERIOD_IN_MS, False) - - # connect to the best available connection in order of priority and response time - connection = connection_manager.get_best_available_connection() - connection_manager.set_connection(connection) - assert connection == wallet_rpcs[4].get_daemon_connection() - assert connection.is_online() - assert connection.is_authenticated() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert listener.changed_connections.get(listener.changed_connections.size() - 1) == connection - - # test connections and order - ordered_connections = connection_manager.get_connections() - assert ordered_connections[0] == wallet_rpcs[4].get_daemon_connection() - assert ordered_connections[1] == wallet_rpcs[2].get_daemon_connection() - assert ordered_connections[2] == wallet_rpcs[3].get_daemon_connection() - assert ordered_connections[3] == wallet_rpcs[0].get_daemon_connection() - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert ordered_connections[4].uri == connection.uri - for orderedConnection in ordered_connections: - assert orderedConnection.is_online() is None - - # check all connections - connection_manager.check_connections() - - # test connection order - ordered_connections = connection_manager.get_connections() - assert ordered_connections[0] == wallet_rpcs[4].get_daemon_connection() - assert ordered_connections[1] == wallet_rpcs[0].get_daemon_connection() - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert ordered_connections[2].uri == connection.uri - assert ordered_connections[3] == wallet_rpcs[2].get_daemon_connection() - assert ordered_connections[4] == wallet_rpcs[3].get_daemon_connection() - - # test online and authentication status - for orderedConnection in ordered_connections: - is_online = orderedConnection.is_online() - is_authenticated = orderedConnection.is_authenticated() - if i == 1 or i == 2: - assert is_online - else: - assert is_online is False - if i == 1: - assert is_authenticated - elif i == 2: - assert is_authenticated is False - else: - assert is_authenticated is None - i += 1 - - # test auto switch when disconnected - connection_manager.set_auto_switch(True) - GenUtils.wait_for(Utils.SYNC_PERIOD_IN_MS + 100) - assert connection_manager.is_connected() - connection = connection_manager.get_connection() - assert connection is not None - assert connection.is_online() - assert connection == wallet_rpcs[0].get_daemon_connection() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert listener.changed_connections.get(listener.changed_connections.size() - 1) == connection - - # test connection order - ordered_connections = connection_manager.get_connections() - assert ordered_connections[0] == connection - assert ordered_connections[0] == wallet_rpcs[0].get_daemon_connection() - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert ordered_connections[1].uri == connection.uri - assert ordered_connections[2] == wallet_rpcs[4].get_daemon_connection() - assert ordered_connections[3] == wallet_rpcs[2].get_daemon_connection() - assert ordered_connections[4] == wallet_rpcs[3].get_daemon_connection() - - # connect to specific endpoint without authentication - connection = ordered_connections[1] - assert connection.is_authenticated() is False - connection_manager.set_connection(connection) - assert connection_manager.is_connected() is False - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - - # connect to specific endpoint with authentication - ordered_connections[1].set_credentials("rpc_user", "abc123") - connection_manager.check_connection() - connection = connection_manager.get_connection() - assert connection is not None - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert connection.uri == connection.uri - assert connection.is_online() - assert connection.is_authenticated() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert listener.changed_connections.get(listener.changed_connections.size() - 1) == connection - - # test connection order - ordered_connections = connection_manager.get_connections() - assert ordered_connections[0] == connection_manager.get_connection() - connection = wallet_rpcs[1].get_daemon_connection() - assert connection is not None - assert ordered_connections[0].uri == connection.uri - assert ordered_connections[1] == wallet_rpcs[0].get_daemon_connection() - assert ordered_connections[2] == wallet_rpcs[4].get_daemon_connection() - assert ordered_connections[3] == wallet_rpcs[2].get_daemon_connection() - assert ordered_connections[4] == wallet_rpcs[3].get_daemon_connection() - - first: bool = True - for orderedConnection in ordered_connections: - if i <= 1: - assert orderedConnection.is_online() if first else not orderedConnection.is_online() - - assert ordered_connections[4].is_online() is False - - # set connection to existing uri - connection = wallet_rpcs[0].get_daemon_connection() - assert connection is not None - connection_manager.set_connection(connection.uri) - assert connection_manager.is_connected() is True - assert wallet_rpcs[0].get_daemon_connection() == connection_manager.get_connection() - connection = connection_manager.get_connection() - assert connection is not None - assert Utils.WALLET_RPC_USERNAME == connection.username - assert Utils.WALLET_RPC_PASSWORD == connection.password - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - AssertUtils.assert_equals( - listener.changed_connections.get(listener.changed_connections.size() - 1), - wallet_rpcs[0].get_daemon_connection() - ) - - # set connection to new uri - connection_manager.stop_polling() - uri: str = "http:#localhost:49999" - connection_manager.set_connection(uri) - connection = connection_manager.get_connection() - assert connection is not None - assert uri == connection.uri - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - connection = listener.changed_connections.get(listener.changed_connections.size() -1) - assert connection is not None - assert uri == connection.uri - - # set connection to empty string - connection_manager.set_connection("") - assert connection_manager.get_connection() is None - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - - # check all connections and test auto switch - connection_manager.check_connections() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert connection_manager.is_connected() - - # remove current connection and test auto switch - connection = connection_manager.get_connection() - assert connection is not None - assert connection.uri is not None - connection_manager.remove_connection(connection.uri) - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert connection_manager.is_connected() is False - connection_manager.check_connections() - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert connection_manager.is_connected() - - # test polling current connection - connection_manager.set_connection(None) - assert connection_manager.is_connected() is False - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - connection_manager.start_polling( - period_ms=Utils.SYNC_PERIOD_IN_MS, - poll_type=MoneroConnectionPollType.CURRENT - ) - GenUtils.wait_for(Utils.AUTO_CONNECT_TIMEOUT_MS) - assert connection_manager.is_connected() is True - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - - # test polling all connections - connection_manager.set_connection(None) - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - connection_manager.start_polling(period_ms=Utils.SYNC_PERIOD_IN_MS, poll_type=MoneroConnectionPollType.ALL) - GenUtils.wait_for(Utils.AUTO_CONNECT_TIMEOUT_MS) - assert connection_manager.is_connected() is True - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - - # shut down all connections - connection = connection_manager.get_connection() - assert connection is not None - - GenUtils.wait_for(Utils.SYNC_PERIOD_IN_MS + 100) - assert connection.is_online() is False - num_expected_changes += 1 - assert num_expected_changes == listener.changed_connections.size() - assert listener.changed_connections.get(listener.changed_connections.size() - 1) == connection - - # reset - connection_manager.reset() - assert len(connection_manager.get_connections()) == 0 - assert connection_manager.get_connection() is None - - finally: - # stop connection manager - if connection_manager is not None: - connection_manager.reset() + # test connnections fixture + @pytest.fixture(scope="class") + def connections(self) -> list[MoneroRpcConnection]: + """Rpc connections used in connection manager tests.""" + return Utils.get_all_rpc_connections() + + # connection manager + @pytest.fixture(scope="class") + def connection_manager(self) -> MoneroConnectionManager: + """Connection manager test instance.""" + if self._cm is None: + self._cm = MoneroConnectionManager() + return self._cm + + #endregion + + @pytest.mark.timeout(60 * 5) + def test_connection_manager(self, connection_manager: MoneroConnectionManager, connections: list[MoneroRpcConnection]) -> None: + # listen for changes + listener = ConnectionChangeCollector() + connection_manager.add_listener(listener) + + # add prioritized connections + connection: Optional[MoneroRpcConnection] = connections[4] + assert connection is not None + connection.priority = 1 + connection_manager.add_connection(connection) + connection = connections[2] + assert connection is not None + connection.priority = 2 + connection_manager.add_connection(connection) + connection = connections[3] + assert connection is not None + connection.priority = 2 + connection_manager.add_connection(connection) + connection = connections[0] + assert connection is not None + # default priority is lowest + connection_manager.add_connection(connection) + connection = connections[1] + assert connection is not None + assert connection.uri is not None + # test unauthenticated + connection_manager.add_connection(MoneroRpcConnection(connection.uri, timeout=connection.timeout)) + + # test connections and order + ordered_connections: list[MoneroRpcConnection] = connection_manager.get_connections() + RpcConnectionUtils.test_connections_and_order(ordered_connections, connections, True) + + # test getting connection by uri + connection = connections[0] + assert connection is not None + assert connection.uri is not None + assert connection_manager.has_connection(connection.uri) + assert connection_manager.get_connection_by_uri(connection.uri) == connections[0] + + # test unknown connection + num_expected_changes: int = 0 + connection_manager.set_connection(ordered_connections[0]) + assert connection_manager.is_connected() is None + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + + # auto connect to the best available connection + connection_manager.start_polling(Utils.SYNC_PERIOD_IN_MS) + listener.wait_for_change(Utils.SYNC_PERIOD_IN_MS, "Waiting for auto connect to best available connection") + assert connection_manager.is_connected() + connection = connection_manager.get_connection() + assert connection is not None + assert connection.is_online() + assert connection == connections[4] + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] == connection + connection_manager.set_auto_switch(False) + connection_manager.stop_polling() + connection_manager.disconnect() + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] is None + + # start periodically checking connection without auto switch + connection_manager.start_polling(Utils.SYNC_PERIOD_IN_MS, False) + + # connect to the best available connection in order of priority and response time + connection = connection_manager.get_best_available_connection() + connection_manager.set_connection(connection) + assert connection == connections[4] + assert connection.is_online() + assert connection.is_authenticated() + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] == connection + + # test connections and order + ordered_connections = connection_manager.get_connections() + RpcConnectionUtils.test_connections_and_order(ordered_connections, connections, False) + # TODO others should not ever connected + #for i, connection in enumerate(ordered_connections): + # if i < 1: + # continue + # assert connection.is_online() is None + + # set proxies to simulate prioritized servers shutdown + for i, conn in enumerate(connections): + if i < 2: + continue + conn.proxy_uri = self.OFFLINE_PROXY_URI + + listener.wait_for_change(Utils.SYNC_PERIOD_IN_MS, "Simulating priotizized servers shut down") + assert connection_manager.is_connected() is False, f"{connection_manager.get_connection().serialize()}" + connection = connection_manager.get_connection() + + assert connection.is_online() is False + assert connection.is_connected() is False + assert connection.is_authenticated() is None + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] == connection_manager.get_connection() + + # test connection order + ordered_connections = connection_manager.get_connections() + RpcConnectionUtils.test_connections_order(ordered_connections, connections) + + # check all connections + connection_manager.check_connections() + + # test connection order + ordered_connections = connection_manager.get_connections() + RpcConnectionUtils.test_connections_order(ordered_connections, connections) + + # test online and authentication status + for i, ordered_connection in enumerate(ordered_connections): + is_online = ordered_connection.is_online() + is_authenticated = ordered_connection.is_authenticated() + if i == 1 or i == 2: + assert is_online + else: + assert is_online is False + if i == 1: + assert is_authenticated + elif i == 2: + assert is_authenticated is False + else: + assert is_authenticated is None + + # test auto switch when disconnected + connection_manager.set_auto_switch(True) + listener.wait_for_autoswitch(connection_manager, Utils.SYNC_PERIOD_IN_MS) + connection = connection_manager.get_connection() + conn_str = connection.serialize() if connection is not None else 'None' # type: ignore + assert connection_manager.is_connected(), f"conn= {conn_str}" + assert connection is not None + assert connection.is_online() + assert connection == connections[0] + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] == connection + + # test connection order + ordered_connections = connection_manager.get_connections() + assert ordered_connections[0] == connection + assert ordered_connections[0] == connections[0] + connection = connections[1] + assert connection is not None + assert ordered_connections[1].uri == connection.uri + assert ordered_connections[2] == connections[4] + assert ordered_connections[3] == connections[2] + assert ordered_connections[4] == connections[3] + + # connect to specific endpoint without authentication + connection = ordered_connections[1] + assert connection.is_authenticated() is False + connection_manager.set_connection(connection) + assert connection_manager.is_connected() is False + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + + # connect to specific endpoint with authentication + ordered_connections[1].set_credentials("rpc_user", "abc123") + connection_manager.check_connection() + cm_connection: MoneroRpcConnection = connection_manager.get_connection() + assert cm_connection is not None + assert cm_connection.uri == connections[1].uri + assert connection.is_online() + assert connection.is_authenticated() + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] == connection + + # test connection order + ordered_connections = connection_manager.get_connections() + assert ordered_connections[0] == connection_manager.get_connection() + connection = connections[1] + assert connection is not None + assert ordered_connections[0].uri == connection.uri + assert ordered_connections[1] == connections[0] + assert ordered_connections[2] == connections[4] + assert ordered_connections[3] == connections[2] + assert ordered_connections[4] == connections[3] + + first: bool = True + for i, ordered_connection in enumerate(ordered_connections): + if i == len(ordered_connections) - 1: + break + if i <= 1: + assert ordered_connection.is_online() if first else not ordered_connection.is_online() + + assert ordered_connections[4].is_online() is False + + # set connection to existing uri + connection = connections[0] + assert connection is not None + connection_manager.set_connection(connection.uri) + assert connection_manager.is_connected() is True + assert connections[0] == connection_manager.get_connection() + connection = connection_manager.get_connection() + assert connection is not None + assert Utils.DAEMON_RPC_USERNAME == connection.username + assert Utils.DAEMON_RPC_PASSWORD == connection.password + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + AssertUtils.assert_equals(listener.changed_connections[-1], connections[0]) + + # set connection to new uri + connection_manager.stop_polling() + uri: str = "http:#localhost:49999" + connection_manager.set_connection(uri) + connection = connection_manager.get_connection() + assert connection is not None + assert uri == connection.uri + connection.timeout = Utils.AUTO_CONNECT_TIMEOUT_MS + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + connection = listener.changed_connections[-1] + assert connection is not None + assert uri == connection.uri + + # set connection to empty string + connection_manager.set_connection("") + assert connection_manager.get_connection() is None + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + + # check all connections and test auto switch + connection_manager.check_connections() + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert connection_manager.is_connected() + + # remove current connection and test auto switch + connection = connection_manager.get_connection() + assert connection is not None + assert connection.uri is not None + connection_manager.remove_connection(connection.uri) + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert connection_manager.is_connected() is False + connection_manager.check_connections() + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert connection_manager.is_connected() + + # test polling current connection + connection_manager.set_connection(None) + assert connection_manager.is_connected() is False + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + connection_manager.start_polling( + period_ms=Utils.SYNC_PERIOD_IN_MS, + poll_type=MoneroConnectionPollType.CURRENT + ) + + listener.wait_for_change(Utils.SYNC_PERIOD_IN_MS, "Polling current connection") + assert connection_manager.is_connected() is True + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + + # test polling all connections + connection_manager.set_connection(None) + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + connection_manager.start_polling(period_ms=Utils.SYNC_PERIOD_IN_MS, poll_type=MoneroConnectionPollType.ALL) + listener.wait_for_change(Utils.SYNC_PERIOD_IN_MS, "Polling all connections") + assert connection_manager.is_connected() is True + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + + connection = connection_manager.get_connection() + assert connection is not None + # set proxies simulating shut down all connections + for con in ordered_connections: + con.proxy_uri = self.OFFLINE_PROXY_URI + + listener.wait_for_change(Utils.SYNC_PERIOD_IN_MS, "Simulating total shut down") + assert connection.is_online() is False, f"Expected offline connection: {connection.serialize()}" + num_expected_changes += 1 + assert num_expected_changes == listener.num_changed_connections + assert listener.changed_connections[-1] == connection + + # reset + connection_manager.reset() + assert len(connection_manager.get_connections()) == 0 + assert connection_manager.get_connection() is None diff --git a/tests/test_monero_rpc_connection.py b/tests/test_monero_rpc_connection.py index a214861..3bbbd25 100644 --- a/tests/test_monero_rpc_connection.py +++ b/tests/test_monero_rpc_connection.py @@ -61,11 +61,11 @@ def wallet_connection(self) -> MoneroRpcConnection: def test_rpc_connection_serialization(self, node_connection: MoneroRpcConnection, wallet_connection: MoneroRpcConnection) -> None: # test node connection serialization connection_str: str = node_connection.serialize() - assert '{"uri":"http://127.0.0.1:18081","username":"rpc_daemon_user","password":"abc123","priority":0,"timeout":25000}' == connection_str + assert '{"uri":"http://127.0.0.1:18081","username":"rpc_daemon_user","password":"abc123","priority":0,"timeout":15000}' == connection_str # node wallet connection serialization connection_str = wallet_connection.serialize() - assert '{"uri":"127.0.0.1:18082","username":"rpc_user","password":"abc123","priority":0,"timeout":25000}' == connection_str + assert '{"uri":"http://127.0.0.1:18082","username":"rpc_user","password":"abc123","priority":0,"timeout":15000}' == connection_str # test empty connection connection: MoneroRpcConnection = MoneroRpcConnection() @@ -185,11 +185,8 @@ def test_set_invalid_credentials(self) -> None: assert connection.password == "abc123" assert connection.check_connection() - assert not connection.is_authenticated() - # TODO internal http client throwing "Network error" instaead of 401 http error - #assert connection.is_online() - #assert connection.is_connected() - assert not connection.is_online() + assert connection.is_authenticated() is False + assert connection.is_online() assert not connection.is_connected() # Can get and set arbitrary key/value attributes diff --git a/tests/test_monero_wallet_common.py b/tests/test_monero_wallet_common.py index b862bba..f4a7bd4 100644 --- a/tests/test_monero_wallet_common.py +++ b/tests/test_monero_wallet_common.py @@ -787,6 +787,7 @@ def test_create_wallet_random(self) -> None: config = MoneroWalletConfig() config.path = path self._create_wallet(config) + raise Exception("Should have thrown error") except Exception as e: e_msg: str = str(e) assert "Wallet already exists: " + path == e_msg, e_msg diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index bc60f03..6d51968 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -33,6 +33,8 @@ from .sync_seed_tester import SyncSeedTester from .send_and_update_txs_tester import SendAndUpdateTxsTester from .sync_with_pool_submit_tester import SyncWithPoolSubmitTester +from .docker_wallet_rpc_manager import DockerWalletRpcManager +from .rpc_connection_utils import RpcConnectionUtils __all__ = [ 'WalletUtils', @@ -69,5 +71,7 @@ 'SyncProgressTester', 'SyncSeedTester', 'SendAndUpdateTxsTester', - 'SyncWithPoolSubmitTester' + 'SyncWithPoolSubmitTester', + 'DockerWalletRpcManager', + 'RpcConnectionUtils' ] diff --git a/tests/utils/block_utils.py b/tests/utils/block_utils.py index 6602fcf..0908a6a 100644 --- a/tests/utils/block_utils.py +++ b/tests/utils/block_utils.py @@ -131,10 +131,12 @@ def test_get_blocks_range( :param BinaryBlockContext: binary block test context """ # fetch blocks by range - real_start_height = 0 if start_height is None else start_height - real_end_height = chain_height - 1 if end_height is None else end_height - blocks = daemon.get_blocks_by_range_chunked(start_height, end_height) if chunked else daemon.get_blocks_by_range(start_height, end_height) - assert real_end_height - real_start_height + 1 == len(blocks) + real_start_height: int = 0 if start_height is None else start_height + real_end_height: int = chain_height - 1 if end_height is None else end_height + blocks: list[MoneroBlock] = daemon.get_blocks_by_range_chunked(start_height, end_height) if chunked else daemon.get_blocks_by_range(start_height, end_height) + num_blocks: int = len(blocks) + expected_num_blocks: int = real_end_height - real_start_height + 1 + assert expected_num_blocks == num_blocks, f"Expected {expected_num_blocks} block(s), got {num_blocks}" # test each block for i, block in enumerate(blocks): diff --git a/tests/utils/connection_change_collector.py b/tests/utils/connection_change_collector.py index 4351e59..adf410c 100644 --- a/tests/utils/connection_change_collector.py +++ b/tests/utils/connection_change_collector.py @@ -1,25 +1,66 @@ +import logging + from typing import Optional from typing_extensions import override -from monero import MoneroConnectionManagerListener, MoneroRpcConnection - - -class ConnectionList(list[Optional[MoneroRpcConnection]]): +from monero import ( + MoneroConnectionManager, + MoneroConnectionManagerListener, MoneroRpcConnection +) - def size(self) -> int: - return len(self) +from .gen_utils import GenUtils - def get(self, index: int) -> Optional[MoneroRpcConnection]: - return self[index] +logger: logging.Logger = logging.getLogger("ConnectionChangeCollector") class ConnectionChangeCollector(MoneroConnectionManagerListener): + """Collects connection changes.""" - changed_connections: ConnectionList + changed_connections: list[Optional[MoneroRpcConnection]] + """Collected changed connections.""" + + @property + def num_changed_connections(self) -> int: + """Number of changed connections collected.""" + return len(self.changed_connections) def __init__(self) -> None: + """Initialize a new connection change collector.""" super().__init__() - self.changed_connections = ConnectionList() + self.changed_connections = [] @override def on_connection_changed(self, connection: Optional[MoneroRpcConnection]) -> None: + conn_str: str = connection.serialize() if connection is not None else 'None' + logger.debug(f"Collecting connection change: {conn_str}") self.changed_connections.append(connection) + + def wait_for_change(self, interval_ms: int = 5000, custom_message: str = "Waiting for connection change") -> Optional[MoneroRpcConnection]: + """ + Wait until a connection change occurs. + + :param int interval_ms: custom check interval in milliseconds (default 5000). + :param str custom_message: custom message to show in debug during wait. + :returns MoneroRpcConnection | None: changed connection. + """ + + last_num_changes: int = self.num_changed_connections + while self.num_changed_connections <= last_num_changes: + logger.debug(f"{custom_message} (connections {last_num_changes})...") + GenUtils.wait_for(interval_ms) + + logger.debug(f"Connection changed (connections {self.num_changed_connections}).") + + def wait_for_autoswitch(self, manager: MoneroConnectionManager, interval_ms: int) -> None: + """ + Wait for connection auto switch. + + :param MoneroConnectionManager manager: connection manager to wait for auto switch. + :param int interval_ms: custom check interval in milliseconds. + """ + connected: bool = False + # wait unitl manager has autoswitched connection + while not connected: + logger.debug("Waiting for autoswitch...") + GenUtils.wait_for(interval_ms) + connected = manager.is_connected() is not None + diff --git a/tests/utils/daemon_utils.py b/tests/utils/daemon_utils.py index f5d58c8..1492df7 100644 --- a/tests/utils/daemon_utils.py +++ b/tests/utils/daemon_utils.py @@ -486,6 +486,6 @@ def get_confirmed_txs(cls, daemon: MoneroDaemonRpc, num_txs: int) -> list[Monero start_idx -= num_blocks_per_req - raise Exception(f"Could not get {num_txs} confirmed txs") + raise Exception(f"Could not get {num_txs} confirmed txs (found: {len(txs)})") #endregion diff --git a/tests/utils/docker_wallet_rpc_manager.py b/tests/utils/docker_wallet_rpc_manager.py new file mode 100644 index 0000000..423afbb --- /dev/null +++ b/tests/utils/docker_wallet_rpc_manager.py @@ -0,0 +1,301 @@ +import logging + +from typing import Optional + +from monero import ( + MoneroWallet, + MoneroDaemonRpc, MoneroRpcConnection, + MoneroWalletRpc, MoneroWalletConfig +) + +from .string_utils import StringUtils + +logger: logging.Logger = logging.getLogger("DockerWalletRpcManager") + + +# TODO use some docker python package for managing instances dynamically +class DockerWalletRpcManager: + """Manager for wallet rpc clients connected to monero-wallet-rpc docker instances""" + + MAX_SLOTS: int = 2 + """Maximum docker wallet rpc slots.""" + + #region Private Attributes + + _domain: str + """Rpc wallet domain.""" + _rpc_port_start: int + """Rpc wallet port start.""" + _daemon: MoneroDaemonRpc + """Daemon rpc.""" + _wallet_password: str + """Rpc wallet password.""" + _wallets: dict[int, MoneroWalletRpc] + """Rpc wallets connected to docker instances.""" + _rpc_user: str + """Rpc user used to authenticate with monero-wallet-rpc.""" + _rpc_password: str + """Rpc password used to authenticate with monero-wallet-rpc.""" + _sync_period_ms: int + """Sync period in milliseconds.""" + _timeout_ms: int + """Connection timeout in milliseconds.""" + + #endregion + + #region Public Properties + + @property + def used_slots(self) -> int: + """Number of docker slots used.""" + return len(self._wallets) + + @property + def free_slots(self) -> int: + """Number of docker slots not used.""" + return self.MAX_SLOTS - self.used_slots + + @property + def no_slot_left(self) -> bool: + """Indicates if no docker slot is left.""" + return self.free_slots == 0 + + @property + def first_free_slot(self) -> int: + """The first free docker slot index (-1 for none).""" + slot_idxs: list[int] = list(self._wallets.keys()) + slot_range: list[int] = list(range(self.MAX_SLOTS)) + for slot_idx in slot_range: + if slot_idx not in slot_idxs: + return slot_idx + return -1 + + #endregion + + def __init__( + self, + domain: str, + rpc_port_start: int, + daemon: MoneroDaemonRpc, + wallet_password: str, + sync_period_ms: int, + timeout_ms: int + ) -> None: + """ + Initialize a new docker wallet rpc manager. + + :param str domain: RPC domain. + :param int rpc_port_start: RPC port start. + :param MoneroDaemonRpc daemon: daemon to use as wallet connection. + :param str wallet_password: password used by wallets. + :param int sync_period_ms: wallet sync period in milliseconds. + :param int timeout_ms: wallet connection timeout in milliseconds. + """ + self._domain = domain + self._rpc_port_start = rpc_port_start + self._daemon = daemon + self._wallet_password = wallet_password + self._wallets = {} + self._sync_period_ms = sync_period_ms + self._timeout_ms = timeout_ms + self._rpc_user = '' + self._rpc_password = '' + + def set_connection_credentials(self, username: str, password: str) -> None: + """ + Set wallet rpc global connection credentials. + """ + self._rpc_user = username + self._rpc_password = password + + def get_rpc_uri(self, slot: int) -> str: + """ + Get wallet rpc uri associated to slot. + + :param int slot: docker slot index. + :returns str: docker rpc uri. + """ + assert slot >= 0 + assert slot < self.MAX_SLOTS + # first docker instance is reserved to test wallet + return f"{self._domain}:{self._rpc_port_start + slot + 1}" + + def setup_create_wallet_config(self, config: MoneroWalletConfig) -> MoneroWalletConfig: + """ + Setup a `create` wallet configuration. + + :param MoneroWalletConfig config: configuration to setup for wallet creation. + :returns MoneroWalletConfig: setup config. + """ + random = config.seed is None and config.primary_address is None + + if config.path is None: + # set random wallet path + config.path = StringUtils.get_random_string() + + if config.restore_height is None and not random: + # set restore height + config.restore_height = 0 + + return config + + def setup_wallet_config(self, c: Optional[MoneroWalletConfig], create: bool, in_container: bool) -> MoneroWalletConfig: + """ + Setup a wallet configuration. + + :param MoneroWalletConfig | None c: wallet configuration to setup (optional). + :param bool create: setup wallet creation configuration. + :returns MoneroWalletConfig: setup configuration. + """ + config = c if c is not None else MoneroWalletConfig() + + # assign defaults + if config.password is None: + config.password = self._wallet_password + + if config.server is None: + config.server = self._daemon.get_rpc_connection() + if in_container: + config.server.uri = "http://node_2:18081" + + if create: + return self.setup_create_wallet_config(config) + + return config + + def get_rpc_connection(self, slot: int) -> MoneroRpcConnection: + """ + Get specific docker wallet rpc connection. + + :param int slot: docker slot to use. + :returns MoneroRpcConnection: wallet rpc docker connection. + """ + rpc_uri: str = self.get_rpc_uri(slot) + return MoneroRpcConnection(rpc_uri, self._rpc_user, self._rpc_password, timeout=self._timeout_ms) + + def get_rpc_connections(self) -> list[MoneroRpcConnection]: + """ + Get all docker wallet rpc connections. + + :returns list[MoneroRpcConnection]: all wallet rpc docker connections. + """ + connections: list[MoneroRpcConnection] = [] + for i in range(self.MAX_SLOTS): + connections.append(self.get_rpc_connection(i)) + return connections + + def setup_wallet(self, c: Optional[MoneroWalletConfig], create: bool, in_container: bool) -> MoneroWalletRpc: + """ + Setup a rpc wallet. + + :param MoneroWalletConfig | None c: wallet configuration. + :param bool create: create the wallet. + :returns MoneroWalletRpc: wallet rpc client. + """ + if self.no_slot_left: + raise Exception("Cannot open wallet: no rpc resources left") + + # setup open wallet configuration + config: MoneroWalletConfig = self.setup_wallet_config(c, create, in_container) + + # get first free slot and build wallet rpc uri + slot: int = self.first_free_slot + # create client connected to monero-wallet-rpc process + + wallet: MoneroWalletRpc = MoneroWalletRpc(self.get_rpc_connection(slot)) + + # open wallet + wallet.stop_syncing() + if create: + wallet.create_wallet(config) + else: + wallet.open_wallet(config) + wallet.set_daemon_connection(wallet.get_daemon_connection(), True, None) + if wallet.is_connected_to_daemon(): + wallet.start_syncing(self._sync_period_ms) + + # cache wallet + self._wallets[slot] = wallet + + return wallet + + def create_wallet(self, c: Optional[MoneroWalletConfig], in_container: bool) -> MoneroWalletRpc: + """ + Create a rpc wallet. + + :param MoneroWalletConfig | None c: wallet configuration. + :returns MoneroWalletRpc: wallet rpc client. + """ + return self.setup_wallet(c, True, in_container) + + def open_wallet(self, c: Optional[MoneroWalletConfig], in_container: bool) -> MoneroWalletRpc: + """ + Open a rpc wallet. + + :param MoneroWalletConfig | None: wallet configuration. + :returns MoneroWalletRpc: wallet rpc client. + """ + return self.setup_wallet(c, False, in_container) + + def get_wallets(self) -> list[MoneroWalletRpc]: + """ + Get all active wallet rpc instances. + + :returns list[MoneroWalletRpc]: rpc wallet instances. + """ + return list(self._wallets.values()) + + def is_docker_instance(self, wallet: MoneroWallet) -> bool: + """ + Check if wallet is a managed docker instance. + + :returns bool: `True` if `wallet` is a managed docker instance, `False` otherwise. + """ + for w in self._wallets.values(): + if w == wallet: + return True + + return False + + def free_slot(self, wallet: MoneroWallet, save: bool = False) -> None: + """ + Free wallet rpc docker slot. + + :param MoneroWallet wallet: wallet to free. + :param bool save: save the wallet before closing. + """ + found: bool = False + for w_idx in self._wallets.keys(): + w: MoneroWallet = self._wallets[w_idx] + if w == wallet: + found = True + try: + wallet.close(save) + except Exception as e: + e_msg: str = str(e) + if e_msg != "No wallet file": + logger.warning(e_msg) + + del self._wallets[w_idx] + break + + assert found, "wallet is not rpc docker instance" + + def clear(self, save: bool = False) -> None: + """ + Free all docker wallet rpc resources + + :param bool save: save wallets (default `False`). + """ + for wallet in self._wallets.values(): + if not wallet.is_closed(): + try: + wallet.close(save) + except Exception as e: + e_str: str = str(e) + if "No wallet file" != e_str: + raise e + + logger.debug("Free wallet rpc instance") + + self._wallets.clear() diff --git a/tests/utils/mining_utils.py b/tests/utils/mining_utils.py index 65df958..0ca0e19 100644 --- a/tests/utils/mining_utils.py +++ b/tests/utils/mining_utils.py @@ -20,10 +20,7 @@ def get_daemon(cls) -> MoneroDaemonRpc: :returns MoneroDaemonRpc: daemon rpc used for internal mining. """ - if cls._DAEMON is None: - cls._DAEMON = MoneroDaemonRpc("127.0.0.1:18089", Utils.DAEMON_RPC_USERNAME, Utils.DAEMON_RPC_PASSWORD) - - return cls._DAEMON + return Utils.get_mining_daemon() @classmethod def is_mining(cls, d: Optional[MoneroDaemonRpc] = None) -> bool: diff --git a/tests/utils/rpc_connection_utils.py b/tests/utils/rpc_connection_utils.py new file mode 100644 index 0000000..6bb99f7 --- /dev/null +++ b/tests/utils/rpc_connection_utils.py @@ -0,0 +1,50 @@ +from abc import ABC + +from monero import MoneroRpcConnection + + +class RpcConnectionUtils(ABC): + """Test utils for rpc connections.""" + + @classmethod + def test_connections_and_order( + cls, + ordered_connections: list[MoneroRpcConnection], + connections: list[MoneroRpcConnection], + check_never_connected: bool + ) -> None: + """ + Test rpc connections and order. + + :param list[MoneroRpcConnection] ordered_connections: list of ordered connections to test. + :param list[MoneroRpcConnection] connections: connections to test with ordered. + """ + assert ordered_connections[0] == connections[4] + assert ordered_connections[1] == connections[2] + assert ordered_connections[2] == connections[3] + assert ordered_connections[3] == connections[0] + connection = connections[1] + assert connection is not None + assert ordered_connections[4].uri == connection.uri + + if not check_never_connected: + return + + for connection in ordered_connections: + assert connection.is_online() is None + + @classmethod + def test_connections_order(cls, ordered_connections: list[MoneroRpcConnection], connections: list[MoneroRpcConnection]) -> None: + """ + Test rpc connections order. + + :param list[MoneroRpcConnection] ordered_connections: list of ordered connections to test. + :param list[MoneroRpcConnection] connections: connections to test with ordered. + """ + assert ordered_connections[0] == connections[4] + assert ordered_connections[1] == connections[0] + connection = connections[1] + assert connection is not None + assert ordered_connections[2].uri == connection.uri + assert ordered_connections[3] == connections[2] + assert ordered_connections[4] == connections[3] diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index 95a382b..faf3b14 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -15,8 +15,8 @@ from .wallet_sync_printer import WalletSyncPrinter from .wallet_tx_tracker import WalletTxTracker from .gen_utils import GenUtils -from .string_utils import StringUtils from .daemon_utils import DaemonUtils +from .docker_wallet_rpc_manager import DockerWalletRpcManager logger: logging.Logger = logging.getLogger("TestUtils") @@ -45,6 +45,8 @@ class TestUtils(ABC): """Mining wallet used for funding test wallets""" _DAEMON_RPC: Optional[MoneroDaemonRpc] = None """Default daemon rpc used for tests""" + _MINING_DAEMON: Optional[MoneroDaemonRpc] = None + """Internal daemon used for mining""" _WALLET_RPC_2: Optional[MoneroWalletRpc] = None """Additional wallet rpc instance""" @@ -87,7 +89,6 @@ class TestUtils(ABC): """Test wallet rpc domain""" WALLET_RPC_URI: str = "" """Test wallet rpc uri""" - WALLET_RPC_URI_2: str = "" WALLET_RPC_ZMQ_URI: str = "" """Test wallet rpc zmq uri""" WALLET_RPC_ACCESS_CONTROL_ORIGINS: str = "" @@ -152,6 +153,8 @@ class TestUtils(ABC): MINING_WALLET_FULL_PATH: str = "" """Mining wallet full path""" + RPC_WALLET_MANAGER: DockerWalletRpcManager + @classmethod def load_config(cls) -> None: """ @@ -211,7 +214,6 @@ def load_config(cls) -> None: cls.WALLET_RPC_ZMQ_BIND_PORT_START = parser.getint('wallet', 'rpc_zmq_bind_port_start') cls.WALLET_RPC_ZMQ_DOMAIN = parser.get('wallet', 'rpc_zmq_domain') cls.WALLET_RPC_URI = cls.WALLET_RPC_DOMAIN + ":" + str(cls.WALLET_RPC_PORT_START) - cls.WALLET_RPC_URI_2 = cls.WALLET_RPC_DOMAIN + ":" + str(cls.WALLET_RPC_PORT_START + 1) cls.WALLET_RPC_ZMQ_URI = "tcp:#" + cls.WALLET_RPC_ZMQ_DOMAIN + ":" + str(cls.WALLET_RPC_ZMQ_PORT_START) cls.SYNC_PERIOD_IN_MS = parser.getint('wallet', 'sync_period_in_ms') in_container = getenv("IN_CONTAINER", "true") @@ -234,6 +236,22 @@ def load_config(cls) -> None: cls._LOADED = True + @classmethod + def load(cls) -> None: + if cls._LOADED: + return + + cls.load_config() + cls.RPC_WALLET_MANAGER = DockerWalletRpcManager( + cls.WALLET_RPC_DOMAIN, + cls.WALLET_RPC_PORT_START, + cls.get_daemon_rpc(), + cls.WALLET_PASSWORD, + cls.SYNC_PERIOD_IN_MS, + cls.AUTO_CONNECT_TIMEOUT_MS + ) + cls.RPC_WALLET_MANAGER.set_connection_credentials(cls.WALLET_RPC_USERNAME, cls.WALLET_RPC_PASSWORD) + @classmethod def get_network_type(cls) -> str: """Get test network type""" @@ -267,10 +285,32 @@ def get_daemon_rpc(cls) -> MoneroDaemonRpc: return cls._DAEMON_RPC + @classmethod + def get_mining_daemon_rpc_connection(cls) -> MoneroRpcConnection: + """ + Get the rpc connection of the daemon used for internal mining. + + :returns MoneroRpcConnection: rpc connection to internal mining daemon. + """ + return MoneroRpcConnection("http://127.0.0.1:18089", cls.DAEMON_RPC_USERNAME, cls.DAEMON_RPC_PASSWORD) + + @classmethod + def get_mining_daemon(cls) -> MoneroDaemonRpc: + """ + Get daemon used for mining. + + :returns MoneroDaemonRpc: internal mining daemon. + """ + + if cls._MINING_DAEMON is None: + cls._MINING_DAEMON = MoneroDaemonRpc(cls.get_mining_daemon_rpc_connection()) + + return cls._MINING_DAEMON + @classmethod def get_daemon_rpc_connection(cls) -> MoneroRpcConnection: """Get test daemon rpc connection""" - return cls.get_daemon_rpc().get_rpc_connection() + return MoneroRpcConnection(cls.DAEMON_RPC_URI, cls.DAEMON_RPC_USERNAME, cls.DAEMON_RPC_PASSWORD) @classmethod def get_wallet_keys_config(cls) -> MoneroWalletConfig: @@ -389,6 +429,18 @@ def get_mining_wallet(cls) -> MoneroWalletFull: cls._WALLET_MINING = wallet return wallet + @classmethod + def get_wallet_rpc_connection(cls) -> MoneroRpcConnection: + """ + Get test wallet rpc connection. + + :returns MoneroRpcConnection: test wallet rpc connection. + """ + return MoneroRpcConnection( + cls.WALLET_RPC_URI, cls.WALLET_RPC_USERNAME, cls.WALLET_RPC_PASSWORD, + cls.WALLET_RPC_ZMQ_URI if cls.WALLET_RPC_ZMQ_ENABLED else '' + ) + @classmethod def get_wallet_rpc(cls) -> MoneroWalletRpc: """Get test wallet rpc""" @@ -396,10 +448,7 @@ def get_wallet_rpc(cls) -> MoneroWalletRpc: if cls._WALLET_RPC is None: # construct wallet rpc instance with daemon connection - rpc = MoneroRpcConnection( - cls.WALLET_RPC_URI, cls.WALLET_RPC_USERNAME, cls.WALLET_RPC_PASSWORD, - cls.WALLET_RPC_ZMQ_URI if cls.WALLET_RPC_ZMQ_ENABLED else '' - ) + rpc = cls.get_wallet_rpc_connection() cls._WALLET_RPC = MoneroWalletRpc(rpc) # attempt to open test wallet @@ -433,117 +482,45 @@ def get_wallet_rpc(cls) -> MoneroWalletRpc: @classmethod def open_wallet_rpc(cls, c: Optional[MoneroWalletConfig]) -> MoneroWalletRpc: """Open a rpc wallet""" - config = c if c is not None else MoneroWalletConfig() - - # assign defaults - if config.password is None: - config.password = cls.WALLET_PASSWORD - - if config.server is None: - config.server = cls.get_daemon_rpc().get_rpc_connection() - if cls.IN_CONTAINER: - config.server.uri = "http://node_2:18081" - - if cls._WALLET_RPC_2 is not None: - raise Exception("Cannot open wallet: no rpc resources left") - - if cls._WALLET_RPC_2 is None: - rpc = MoneroRpcConnection( - cls.WALLET_RPC_URI_2, cls.WALLET_RPC_USERNAME, cls.WALLET_RPC_PASSWORD, - cls.WALLET_RPC_ZMQ_URI if cls.WALLET_RPC_ZMQ_ENABLED else '' - ) - cls._WALLET_RPC_2 = MoneroWalletRpc(rpc) - - # open wallet - cls._WALLET_RPC_2.stop_syncing() - cls._WALLET_RPC_2.open_wallet(config) - cls._WALLET_RPC_2.set_daemon_connection(cls._WALLET_RPC_2.get_daemon_connection(), True, None) - if cls._WALLET_RPC_2.is_connected_to_daemon(): - cls._WALLET_RPC_2.start_syncing(TestUtils.SYNC_PERIOD_IN_MS) - - return cls._WALLET_RPC_2 + return cls.RPC_WALLET_MANAGER.open_wallet(c, cls.IN_CONTAINER) @classmethod def create_wallet_rpc(cls, c: Optional[MoneroWalletConfig]) -> MoneroWalletRpc: """Create rpc wallet""" - # assign defaults - config = c if c is not None else MoneroWalletConfig() - random = config.seed is None and config.primary_address is None - - if config.path is None: - # set random wallet path - config.path = StringUtils.get_random_string() - - if config.password is None: - # set wallet password - config.password = TestUtils.WALLET_PASSWORD - - if config.restore_height is None and not random: - # set restore height - config.restore_height = 0 - - if config.server is None: - config.server = TestUtils.get_daemon_rpc().get_rpc_connection() - if cls.IN_CONTAINER: - # TODO make this configurable - config.server.uri = "http://node_2:18081" - - # create client connected to monero-wallet-rpc process - wallet_rpc = cls._WALLET_RPC_2 - if wallet_rpc is not None: - raise Exception("Cannot open wallet rpc: no resources left") - - rpc = MoneroRpcConnection( - cls.WALLET_RPC_URI_2, cls.WALLET_RPC_USERNAME, cls.WALLET_RPC_PASSWORD, - '', - cls.WALLET_RPC_ZMQ_URI if cls.WALLET_RPC_ZMQ_ENABLED else '', 0, - cls.AUTO_CONNECT_TIMEOUT_MS - ) - wallet_rpc = MoneroWalletRpc(rpc) - - # create wallet - wallet_rpc.stop_syncing() - wallet_rpc.create_wallet(config) - wallet_rpc.set_daemon_connection(wallet_rpc.get_daemon_connection(), True, None) - if wallet_rpc.is_connected_to_daemon(): - wallet_rpc.start_syncing(TestUtils.SYNC_PERIOD_IN_MS) - - cls._WALLET_RPC_2 = wallet_rpc - return cls._WALLET_RPC_2 + return cls.RPC_WALLET_MANAGER.create_wallet(c, cls.IN_CONTAINER) @classmethod - def get_wallets(cls, wallet_type: str) -> list[MoneroWallet]: - """Get all test wallets""" - raise NotImplementedError() + def get_all_rpc_connections(cls) -> list[MoneroRpcConnection]: + """ + Get all daemon and wallets rpc connections used in tests (ordered by connection uri). + + :returns list[MoneroDaemonRpc | MoneroWalletRpc]: rpc connections to a daemon or wallet rpc. + """ + result: list[MoneroRpcConnection] = [] + # ordered by connection uri + result.append(cls.get_daemon_rpc_connection()) + result.append(cls.get_wallet_rpc_connection()) + result.extend(cls.RPC_WALLET_MANAGER.get_rpc_connections()) + result.append(cls.get_mining_daemon_rpc_connection()) + for connection in result: + connection.timeout = cls.AUTO_CONNECT_TIMEOUT_MS + return result @classmethod def free_wallet_rpc_resources(cls, save: bool = False) -> None: """Free all docker wallet rpc resources""" - if cls._WALLET_RPC_2 is None: - return - - if not cls._WALLET_RPC_2.is_closed(): - try: - cls._WALLET_RPC_2.close(save) - except Exception as e: - e_str: str = str(e) - if "No wallet file" != e_str: - logger.warning(str(e)) - - logger.debug(f"FREE WALLET RPC RESOURCE") - cls._WALLET_RPC_2 = None + cls.RPC_WALLET_MANAGER.clear(save) @classmethod def is_wallet_rpc_resource(cls, wallet: MoneroWallet) -> bool: """Indicates if wallet is using a docker rpc instance""" - return wallet is cls._WALLET_RPC_2 + return cls.RPC_WALLET_MANAGER.is_docker_instance(wallet) @classmethod def free_wallet_rpc_resource(cls, wallet: MoneroWallet, save: bool = False) -> None: """Free docker resource used by wallet""" - if cls.is_wallet_rpc_resource(wallet): - # TODO free specific wallet rpc resource - cls.free_wallet_rpc_resources(save) + if cls.RPC_WALLET_MANAGER.is_docker_instance(wallet): + cls.RPC_WALLET_MANAGER.free_slot(wallet, save) @classmethod def create_wallet_ground_truth( @@ -607,4 +584,4 @@ def dispose(cls) -> None: cls._WALLET_RPC_2.close(True) -TestUtils.load_config() +TestUtils.load() diff --git a/tests/utils/to_multiple_tx_sender.py b/tests/utils/to_multiple_tx_sender.py index a87f5c0..f1307f2 100644 --- a/tests/utils/to_multiple_tx_sender.py +++ b/tests/utils/to_multiple_tx_sender.py @@ -222,8 +222,8 @@ def send(self) -> None: account: MoneroAccount = self._wallet.get_account(src_account.index) assert account.balance is not None assert account.unlocked_balance is not None - assert account.balance < balance - assert account.unlocked_balance < unlocked_balance + assert account.balance < balance, "Balance did not decrease" + assert account.unlocked_balance < unlocked_balance, "Unlocked balance did not decrease" # build test context config.can_split = self._can_split