From d77e897eaf5e8bd8de9ea11b64233d9a6d354f25 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 29 May 2026 13:36:32 +0100 Subject: [PATCH] Add `makePool` method on `ThreadMap` This patch introduces a pool of threads to the `Connection` class, and allows this pool to be populated with the thread map via `makePool`. When a client thread is not set in a request context, it is delegated to the pool. The pool is implemented as shortest-queue, where the thread with the shortest list of pending work handles the request. Tiebreaking is by lowest index. This was raised to me by Rust users, as they did not particularly care where work is executed on the server-side, but they have to set the thread regardless. ref: https://github.com/2140-dev/bitcoin-capnp-types/blob/master/tests/util/bitcoin_core.rs#L149 --- include/mp/proxy-io.h | 7 ++ include/mp/proxy.capnp | 4 + include/mp/type-context.h | 146 ++++++++++++++++++++++-------------- include/mp/type-threadmap.h | 1 + src/mp/proxy.cpp | 25 ++++++ test/mp/test/test.cpp | 73 ++++++++++++++++++ 6 files changed, 199 insertions(+), 57 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 092ea42e..27ad6a74 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -477,6 +477,13 @@ class Connection //! ThreadMap.makeThread) used to service requests to clients. ::capnp::CapabilityServerSet m_threads; + //! A thread created by makePool with associated pending work queue. Vector is filled once by makePool() and never resized. + struct PoolSlot { + Thread::Client client; + size_t depth{0}; + }; + std::vector m_thread_pool; + //! Canceler for canceling promises that we want to discard when the //! connection is destroyed. This is used to interrupt method calls that are //! still executing at time of disconnection. diff --git a/include/mp/proxy.capnp b/include/mp/proxy.capnp index 386f8f7a..e0a66fd9 100644 --- a/include/mp/proxy.capnp +++ b/include/mp/proxy.capnp @@ -45,6 +45,10 @@ interface ThreadMap $count(0) { # execute on. Clients create and name threads and pass the thread handle as # a call parameter. makeThread @0 (name :Text) -> (result :Thread); + # Pre-allocate a pool of server threads for implicit dispatch. When a + # request arrives with no context.thread set, the server dispatches it + # through this pool via a shared work queue. + makePool @1 (count :UInt32) -> (); } interface Thread { diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 3ab3d4b0..7e47ca8a 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -200,63 +200,95 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // asynchronously with getLocalServer(). const auto& params = server_context.call_context.getParams(); Context::Reader context_arg = Accessor::get(params); - auto thread_client = context_arg.getThread(); - auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) - .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { - // Assuming the thread object is found, pass it a pointer to the - // `invoke` lambda above which will invoke the function on that - // thread. - KJ_IF_MAYBE (thread_server, perhaps) { - auto& thread = static_cast&>(*thread_server); - MP_LOG(loop, Log::Debug) - << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; - return thread.template post(std::move(invoke)); - } else { - MP_LOG(loop, Log::Error) - << "IPC server error request #" << req << ", missing thread to execute request"; - throw std::runtime_error("invalid thread handle"); - } - }, [&loop, req](::kj::Exception&& e) -> kj::Promise { - // If you see the error "(remote):0: failed: remote exception: - // Called null capability" here, it probably means your Init class - // is missing a declaration like: - // - // construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); - // - // which passes a ThreadMap reference from the client to the server, - // allowing the server to create threads to run IPC calls on the - // client, and also returns a ThreadMap reference from the server to - // the client, allowing the client to create threads on the server. - // (Typically the latter ThreadMap is used more often because there - // are more client-to-server calls.) - // - // If the other side of the connection did not previously get a - // ThreadMap reference from this side of the connection, when the - // other side calls `m_thread_map.makeThreadRequest()` in - // `BuildField` above, `m_thread_map` will be null, but that call - // will not fail immediately due to Cap'n Proto's request pipelining - // and delayed execution. Instead that call will return an invalid - // Thread reference, and when that reference is passed to this side - // of the connection as `thread_client` above, the - // `getLocalServer(thread_client)` call there will be the first - // thing to overtly fail, leading to an error here. - // - // Potentially there are also other things that could cause errors - // here, but this is the most likely cause. - // - // The log statement here is not strictly necessary since the same - // exception will also be logged in serverInvoke, but this logging - // may provide extra context that could be helpful for debugging. - MP_LOG(loop, Log::Info) - << "IPC server error request #" << req << " CapabilityServerSet::getLocalServer call failed, did you forget to provide a ThreadMap to the client prior to this IPC call?"; - return kj::mv(e); - }); - // Use connection m_canceler object to cancel the result promise if the - // connection is destroyed. (By default Cap'n Proto does not cancel requests - // on disconnect, since it's possible clients might want to make requests - // and immediately disconnect without waiting for results, but not want the - // requests to be canceled.) - return server.m_context.connection->m_canceler.wrap(kj::mv(result)); + if (!context_arg.hasThread()) { + // No client thread specified — dispatch through the server thread + // pool, picking the slot with the smallest in-flight depth. + auto* connection = server.m_context.connection; + auto& pool = connection->m_thread_pool; + if (pool.empty()) { + MP_LOG(loop, Log::Error) + << "IPC server error request #" << req << ", no thread specified and no pool configured"; + throw std::runtime_error("no thread specified and no pool configured"); + } + auto* slot = &pool[0]; + for (size_t i = 1; i < pool.size(); ++i) { + if (pool[i].depth < slot->depth) slot = &pool[i]; + } + ++slot->depth; + auto result = connection->m_threads.getLocalServer(slot->client) + .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable -> kj::Promise { + KJ_IF_MAYBE (thread_server, perhaps) { + auto& thread = static_cast&>(*thread_server); + MP_LOG(loop, Log::Debug) + << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; + return thread.template post(std::move(invoke)); + } else { + MP_LOG(loop, Log::Error) + << "IPC server error request #" << req << ", pool thread not found"; + throw std::runtime_error("pool thread not found"); + } + }) + .attach(kj::defer([slot] { --slot->depth; })); + return connection->m_canceler.wrap(kj::mv(result)); + } else { + auto thread_client = context_arg.getThread(); + auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) + .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { + // Assuming the thread object is found, pass it a pointer to the + // `invoke` lambda above which will invoke the function on that + // thread. + KJ_IF_MAYBE (thread_server, perhaps) { + auto& thread = static_cast&>(*thread_server); + MP_LOG(loop, Log::Debug) + << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; + return thread.template post(std::move(invoke)); + } else { + MP_LOG(loop, Log::Error) + << "IPC server error request #" << req << ", missing thread to execute request"; + throw std::runtime_error("invalid thread handle"); + } + }, [&loop, req](::kj::Exception&& e) -> kj::Promise { + // If you see the error "(remote):0: failed: remote exception: + // Called null capability" here, it probably means your Init class + // is missing a declaration like: + // + // construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); + // + // which passes a ThreadMap reference from the client to the server, + // allowing the server to create threads to run IPC calls on the + // client, and also returns a ThreadMap reference from the server to + // the client, allowing the client to create threads on the server. + // (Typically the latter ThreadMap is used more often because there + // are more client-to-server calls.) + // + // If the other side of the connection did not previously get a + // ThreadMap reference from this side of the connection, when the + // other side calls `m_thread_map.makeThreadRequest()` in + // `BuildField` above, `m_thread_map` will be null, but that call + // will not fail immediately due to Cap'n Proto's request pipelining + // and delayed execution. Instead that call will return an invalid + // Thread reference, and when that reference is passed to this side + // of the connection as `thread_client` above, the + // `getLocalServer(thread_client)` call there will be the first + // thing to overtly fail, leading to an error here. + // + // Potentially there are also other things that could cause errors + // here, but this is the most likely cause. + // + // The log statement here is not strictly necessary since the same + // exception will also be logged in serverInvoke, but this logging + // may provide extra context that could be helpful for debugging. + MP_LOG(loop, Log::Info) + << "IPC server error request #" << req << " CapabilityServerSet::getLocalServer call failed, did you forget to provide a ThreadMap to the client prior to this IPC call?"; + return kj::mv(e); + }); + // Use connection m_canceler object to cancel the result promise if the + // connection is destroyed. (By default Cap'n Proto does not cancel requests + // on disconnect, since it's possible clients might want to make requests + // and immediately disconnect without waiting for results, but not want the + // requests to be canceled.) + return server.m_context.connection->m_canceler.wrap(kj::mv(result)); + } } } // namespace mp diff --git a/include/mp/type-threadmap.h b/include/mp/type-threadmap.h index 3005d9de..c38c2ac4 100644 --- a/include/mp/type-threadmap.h +++ b/include/mp/type-threadmap.h @@ -14,6 +14,7 @@ struct ProxyServer final : public virtual ThreadMap::Server public: ProxyServer(Connection& connection); kj::Promise makeThread(MakeThreadContext context) override; + kj::Promise makePool(MakePoolContext context) override; Connection& m_connection; }; diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 963050c3..c5f238a3 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include #include #include +#include namespace mp { @@ -415,6 +417,29 @@ kj::Promise ProxyServer::getName(GetNameContext context) ProxyServer::ProxyServer(Connection& connection) : m_connection(connection) {} +kj::Promise ProxyServer::makePool(MakePoolContext context) +{ + if (!m_connection.m_thread_pool.empty()) { + throw std::runtime_error("makePool called on connection with existing pool"); + } + EventLoop& loop{*m_connection.m_loop}; + const uint32_t count = context.getParams().getCount(); + for (uint32_t i = 0; i < count; ++i) { + const std::string thread_name = "pool/" + std::to_string(i); + std::promise thread_context; + std::thread thread([&loop, &thread_context, thread_name]() { + g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + thread_name + ")"; + g_thread_context.waiter = std::make_unique(); + Lock lock(g_thread_context.waiter->m_mutex); + thread_context.set_value(&g_thread_context); + g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); + }); + auto thread_server = kj::heap>(m_connection, *thread_context.get_future().get(), std::move(thread)); + m_connection.m_thread_pool.push_back({m_connection.m_threads.add(kj::mv(thread_server))}); + } + return kj::READY_NOW; +} + kj::Promise ProxyServer::makeThread(MakeThreadContext context) { EventLoop& loop{*m_connection.m_loop}; diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index eb4b5ec7..e7052c58 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -18,8 +18,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -524,5 +526,76 @@ KJ_TEST("Make simultaneous IPC calls on single remote thread") KJ_EXPECT(expected == 400); } +KJ_TEST("Call async IPC method dispatched to pool thread") +{ + TestSetup setup; + ProxyClient* foo = setup.client.get(); + + // Set up the thread map exchange so the client has the server's ThreadMap, + // then call makePool to pre-allocate two server threads. + foo->initThreadMap(); + setup.server->m_impl->m_int_fn = [](int n) { return n * 2; }; + + ThreadContext& tc{g_thread_context}; + std::atomic running{3}; + std::promise pool_ready; + foo->m_context.loop->sync([&] { + auto pool_req = foo->m_context.connection->m_thread_map.makePoolRequest(); + pool_req.setCount(2); + foo->m_context.loop->m_task_set->add( + pool_req.send().then([&](auto&&) { pool_ready.set_value(); })); + }); + pool_ready.get_future().get(); + + // Send three callIntFnAsync requests with no context.thread set. + // The server should dispatch each to a pool thread. + auto client{foo->m_client}; + foo->m_context.loop->sync([&] { + for (size_t i = 0; i < running; ++i) { + auto request{client.callIntFnAsyncRequest()}; + request.initContext(); // context present but thread unset + request.setArg(static_cast(i + 1)); + foo->m_context.loop->m_task_set->add(request.send().then( + [&running, &tc, i](auto&& results) { + assert(results.getResult() == static_cast((i + 1) * 2)); + running -= 1; + Lock lock(tc.waiter->m_mutex); + tc.waiter->m_cv.notify_all(); + })); + } + }); + { + Lock lock(tc.waiter->m_mutex); + tc.waiter->wait(lock, [&running] { return running == 0; }); + } +} + +KJ_TEST("Call async IPC method without thread or pool errors correctly") +{ + TestSetup setup; + ProxyClient* foo = setup.client.get(); + setup.server->m_impl->m_fn = [] {}; + + // Send a callFnAsync request with no context.thread and no pool configured. + // The server should throw the "no thread specified and no pool configured" error. + std::promise done; + bool error_thrown{false}; + foo->m_context.loop->sync([&] { + auto request{foo->m_client.callFnAsyncRequest()}; + request.initContext(); + foo->m_context.loop->m_task_set->add( + request.send().then( + [&](auto&&) { done.set_value(); }, + [&](kj::Exception&& e) { + error_thrown = true; + KJ_EXPECT(std::string_view{e.getDescription().cStr()}.find( + "no thread specified and no pool configured") != std::string_view::npos); + done.set_value(); + })); + }); + done.get_future().get(); + KJ_EXPECT(error_thrown); +} + } // namespace test } // namespace mp