diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 092ea42e..27ad6a74 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -477,6 +477,13 @@ class Connection //! ThreadMap.makeThread) used to service requests to clients. ::capnp::CapabilityServerSet m_threads; + //! A thread created by makePool with associated pending work queue. Vector is filled once by makePool() and never resized. + struct PoolSlot { + Thread::Client client; + size_t depth{0}; + }; + std::vector m_thread_pool; + //! Canceler for canceling promises that we want to discard when the //! connection is destroyed. This is used to interrupt method calls that are //! still executing at time of disconnection. diff --git a/include/mp/proxy.capnp b/include/mp/proxy.capnp index 386f8f7a..e0a66fd9 100644 --- a/include/mp/proxy.capnp +++ b/include/mp/proxy.capnp @@ -45,6 +45,10 @@ interface ThreadMap $count(0) { # execute on. Clients create and name threads and pass the thread handle as # a call parameter. makeThread @0 (name :Text) -> (result :Thread); + # Pre-allocate a pool of server threads for implicit dispatch. When a + # request arrives with no context.thread set, the server dispatches it + # through this pool via a shared work queue. + makePool @1 (count :UInt32) -> (); } interface Thread { diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 3ab3d4b0..7e47ca8a 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -200,63 +200,95 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // asynchronously with getLocalServer(). const auto& params = server_context.call_context.getParams(); Context::Reader context_arg = Accessor::get(params); - auto thread_client = context_arg.getThread(); - auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) - .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { - // Assuming the thread object is found, pass it a pointer to the - // `invoke` lambda above which will invoke the function on that - // thread. - KJ_IF_MAYBE (thread_server, perhaps) { - auto& thread = static_cast&>(*thread_server); - MP_LOG(loop, Log::Debug) - << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; - return thread.template post(std::move(invoke)); - } else { - MP_LOG(loop, Log::Error) - << "IPC server error request #" << req << ", missing thread to execute request"; - throw std::runtime_error("invalid thread handle"); - } - }, [&loop, req](::kj::Exception&& e) -> kj::Promise { - // If you see the error "(remote):0: failed: remote exception: - // Called null capability" here, it probably means your Init class - // is missing a declaration like: - // - // construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); - // - // which passes a ThreadMap reference from the client to the server, - // allowing the server to create threads to run IPC calls on the - // client, and also returns a ThreadMap reference from the server to - // the client, allowing the client to create threads on the server. - // (Typically the latter ThreadMap is used more often because there - // are more client-to-server calls.) - // - // If the other side of the connection did not previously get a - // ThreadMap reference from this side of the connection, when the - // other side calls `m_thread_map.makeThreadRequest()` in - // `BuildField` above, `m_thread_map` will be null, but that call - // will not fail immediately due to Cap'n Proto's request pipelining - // and delayed execution. Instead that call will return an invalid - // Thread reference, and when that reference is passed to this side - // of the connection as `thread_client` above, the - // `getLocalServer(thread_client)` call there will be the first - // thing to overtly fail, leading to an error here. - // - // Potentially there are also other things that could cause errors - // here, but this is the most likely cause. - // - // The log statement here is not strictly necessary since the same - // exception will also be logged in serverInvoke, but this logging - // may provide extra context that could be helpful for debugging. - MP_LOG(loop, Log::Info) - << "IPC server error request #" << req << " CapabilityServerSet::getLocalServer call failed, did you forget to provide a ThreadMap to the client prior to this IPC call?"; - return kj::mv(e); - }); - // Use connection m_canceler object to cancel the result promise if the - // connection is destroyed. (By default Cap'n Proto does not cancel requests - // on disconnect, since it's possible clients might want to make requests - // and immediately disconnect without waiting for results, but not want the - // requests to be canceled.) - return server.m_context.connection->m_canceler.wrap(kj::mv(result)); + if (!context_arg.hasThread()) { + // No client thread specified — dispatch through the server thread + // pool, picking the slot with the smallest in-flight depth. + auto* connection = server.m_context.connection; + auto& pool = connection->m_thread_pool; + if (pool.empty()) { + MP_LOG(loop, Log::Error) + << "IPC server error request #" << req << ", no thread specified and no pool configured"; + throw std::runtime_error("no thread specified and no pool configured"); + } + auto* slot = &pool[0]; + for (size_t i = 1; i < pool.size(); ++i) { + if (pool[i].depth < slot->depth) slot = &pool[i]; + } + ++slot->depth; + auto result = connection->m_threads.getLocalServer(slot->client) + .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable -> kj::Promise { + KJ_IF_MAYBE (thread_server, perhaps) { + auto& thread = static_cast&>(*thread_server); + MP_LOG(loop, Log::Debug) + << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; + return thread.template post(std::move(invoke)); + } else { + MP_LOG(loop, Log::Error) + << "IPC server error request #" << req << ", pool thread not found"; + throw std::runtime_error("pool thread not found"); + } + }) + .attach(kj::defer([slot] { --slot->depth; })); + return connection->m_canceler.wrap(kj::mv(result)); + } else { + auto thread_client = context_arg.getThread(); + auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) + .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { + // Assuming the thread object is found, pass it a pointer to the + // `invoke` lambda above which will invoke the function on that + // thread. + KJ_IF_MAYBE (thread_server, perhaps) { + auto& thread = static_cast&>(*thread_server); + MP_LOG(loop, Log::Debug) + << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; + return thread.template post(std::move(invoke)); + } else { + MP_LOG(loop, Log::Error) + << "IPC server error request #" << req << ", missing thread to execute request"; + throw std::runtime_error("invalid thread handle"); + } + }, [&loop, req](::kj::Exception&& e) -> kj::Promise { + // If you see the error "(remote):0: failed: remote exception: + // Called null capability" here, it probably means your Init class + // is missing a declaration like: + // + // construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); + // + // which passes a ThreadMap reference from the client to the server, + // allowing the server to create threads to run IPC calls on the + // client, and also returns a ThreadMap reference from the server to + // the client, allowing the client to create threads on the server. + // (Typically the latter ThreadMap is used more often because there + // are more client-to-server calls.) + // + // If the other side of the connection did not previously get a + // ThreadMap reference from this side of the connection, when the + // other side calls `m_thread_map.makeThreadRequest()` in + // `BuildField` above, `m_thread_map` will be null, but that call + // will not fail immediately due to Cap'n Proto's request pipelining + // and delayed execution. Instead that call will return an invalid + // Thread reference, and when that reference is passed to this side + // of the connection as `thread_client` above, the + // `getLocalServer(thread_client)` call there will be the first + // thing to overtly fail, leading to an error here. + // + // Potentially there are also other things that could cause errors + // here, but this is the most likely cause. + // + // The log statement here is not strictly necessary since the same + // exception will also be logged in serverInvoke, but this logging + // may provide extra context that could be helpful for debugging. + MP_LOG(loop, Log::Info) + << "IPC server error request #" << req << " CapabilityServerSet::getLocalServer call failed, did you forget to provide a ThreadMap to the client prior to this IPC call?"; + return kj::mv(e); + }); + // Use connection m_canceler object to cancel the result promise if the + // connection is destroyed. (By default Cap'n Proto does not cancel requests + // on disconnect, since it's possible clients might want to make requests + // and immediately disconnect without waiting for results, but not want the + // requests to be canceled.) + return server.m_context.connection->m_canceler.wrap(kj::mv(result)); + } } } // namespace mp diff --git a/include/mp/type-threadmap.h b/include/mp/type-threadmap.h index 3005d9de..c38c2ac4 100644 --- a/include/mp/type-threadmap.h +++ b/include/mp/type-threadmap.h @@ -14,6 +14,7 @@ struct ProxyServer final : public virtual ThreadMap::Server public: ProxyServer(Connection& connection); kj::Promise makeThread(MakeThreadContext context) override; + kj::Promise makePool(MakePoolContext context) override; Connection& m_connection; }; diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 963050c3..c5f238a3 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include #include #include +#include namespace mp { @@ -415,6 +417,29 @@ kj::Promise ProxyServer::getName(GetNameContext context) ProxyServer::ProxyServer(Connection& connection) : m_connection(connection) {} +kj::Promise ProxyServer::makePool(MakePoolContext context) +{ + if (!m_connection.m_thread_pool.empty()) { + throw std::runtime_error("makePool called on connection with existing pool"); + } + EventLoop& loop{*m_connection.m_loop}; + const uint32_t count = context.getParams().getCount(); + for (uint32_t i = 0; i < count; ++i) { + const std::string thread_name = "pool/" + std::to_string(i); + std::promise thread_context; + std::thread thread([&loop, &thread_context, thread_name]() { + g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + thread_name + ")"; + g_thread_context.waiter = std::make_unique(); + Lock lock(g_thread_context.waiter->m_mutex); + thread_context.set_value(&g_thread_context); + g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); + }); + auto thread_server = kj::heap>(m_connection, *thread_context.get_future().get(), std::move(thread)); + m_connection.m_thread_pool.push_back({m_connection.m_threads.add(kj::mv(thread_server))}); + } + return kj::READY_NOW; +} + kj::Promise ProxyServer::makeThread(MakeThreadContext context) { EventLoop& loop{*m_connection.m_loop}; diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index eb4b5ec7..e7052c58 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -18,8 +18,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -524,5 +526,76 @@ KJ_TEST("Make simultaneous IPC calls on single remote thread") KJ_EXPECT(expected == 400); } +KJ_TEST("Call async IPC method dispatched to pool thread") +{ + TestSetup setup; + ProxyClient* foo = setup.client.get(); + + // Set up the thread map exchange so the client has the server's ThreadMap, + // then call makePool to pre-allocate two server threads. + foo->initThreadMap(); + setup.server->m_impl->m_int_fn = [](int n) { return n * 2; }; + + ThreadContext& tc{g_thread_context}; + std::atomic running{3}; + std::promise pool_ready; + foo->m_context.loop->sync([&] { + auto pool_req = foo->m_context.connection->m_thread_map.makePoolRequest(); + pool_req.setCount(2); + foo->m_context.loop->m_task_set->add( + pool_req.send().then([&](auto&&) { pool_ready.set_value(); })); + }); + pool_ready.get_future().get(); + + // Send three callIntFnAsync requests with no context.thread set. + // The server should dispatch each to a pool thread. + auto client{foo->m_client}; + foo->m_context.loop->sync([&] { + for (size_t i = 0; i < running; ++i) { + auto request{client.callIntFnAsyncRequest()}; + request.initContext(); // context present but thread unset + request.setArg(static_cast(i + 1)); + foo->m_context.loop->m_task_set->add(request.send().then( + [&running, &tc, i](auto&& results) { + assert(results.getResult() == static_cast((i + 1) * 2)); + running -= 1; + Lock lock(tc.waiter->m_mutex); + tc.waiter->m_cv.notify_all(); + })); + } + }); + { + Lock lock(tc.waiter->m_mutex); + tc.waiter->wait(lock, [&running] { return running == 0; }); + } +} + +KJ_TEST("Call async IPC method without thread or pool errors correctly") +{ + TestSetup setup; + ProxyClient* foo = setup.client.get(); + setup.server->m_impl->m_fn = [] {}; + + // Send a callFnAsync request with no context.thread and no pool configured. + // The server should throw the "no thread specified and no pool configured" error. + std::promise done; + bool error_thrown{false}; + foo->m_context.loop->sync([&] { + auto request{foo->m_client.callFnAsyncRequest()}; + request.initContext(); + foo->m_context.loop->m_task_set->add( + request.send().then( + [&](auto&&) { done.set_value(); }, + [&](kj::Exception&& e) { + error_thrown = true; + KJ_EXPECT(std::string_view{e.getDescription().cStr()}.find( + "no thread specified and no pool configured") != std::string_view::npos); + done.set_value(); + })); + }); + done.get_future().get(); + KJ_EXPECT(error_thrown); +} + } // namespace test } // namespace mp