Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions builtin-functions/kphp-light/stdlib/rpc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ final class KphpRpcRequestsExtraInfo {
public function get ();
}

/** @kphp-extern-func-info interruptible */
function rpc_send_requests($actor ::: string,
$arr ::: array,
$timeout ::: ?float,
$ignore_answer ::: bool,
\KphpRpcRequestsExtraInfo $requests_extra_info,
$need_responses_extra_info ::: bool) ::: int[];

/** @kphp-extern-func-info tl_common_h_dep interruptible */
/** @kphp-extern-func-info tl_common_h_dep */
function rpc_send_typed_query_requests($actor ::: string, @tl\RpcFunction[] $query_functions,
$timeout ::: ?float,
$ignore_answer ::: bool,
Expand Down
24 changes: 24 additions & 0 deletions runtime-light/k2-platform/k2-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,30 @@ inline int32_t component_access(std::string_view component_name) noexcept {
return k2_component_access(component_name.size(), component_name.data());
}

inline std::expected<uint64_t, int32_t> rpc_send_request(std::string_view actor_name, std::span<const std::byte> request_buffer) noexcept {
uint64_t rpc_d{};
if (auto error_code{k2_rpc_send(actor_name.data(), actor_name.size(), request_buffer.data(), request_buffer.size(), RpcKind::TL_RPC, std::addressof(rpc_d))};
error_code != k2::errno_ok) {
return std::unexpected{error_code};
}
return {rpc_d};
}

inline std::expected<size_t, int32_t> rpc_get_response_size(uint64_t rpc_d) noexcept {
size_t size{};
if (auto error_code{k2_rpc_get_response_size(rpc_d, std::addressof(size))}; error_code != k2::errno_ok) {
return std::unexpected{error_code};
}
return {size};
}

inline std::expected<void, int32_t> rpc_fetch_response(uint64_t rpc_d, std::span<std::byte> buffer) noexcept {
if (auto error_code{k2_rpc_fetch_response(rpc_d, buffer.data(), buffer.size())}; error_code != errno_ok) {
return std::unexpected{error_code};
}
return {};
}

inline void stream_status(k2::descriptor descriptor, StreamStatus* status) noexcept {
k2_stream_status(descriptor, status);
}
Expand Down
40 changes: 40 additions & 0 deletions runtime-light/k2-platform/k2-header.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ enum UpdateStatus {
NewDescriptor = 2,
};

enum RpcKind {
TL_RPC = 0,
};

struct ImageInfo {
// Base
const char* image_name;
Expand Down Expand Up @@ -316,6 +320,42 @@ int32_t k2_unlink(const char* path, size_t path_len);
*/
int32_t k2_component_access(size_t name_len, const char* name);

/**
* Try to send rpc request. In case of success write descriptor of rpc request to `rpc_d`, otherwise return `errno` != 0,
* which should be later used to call `k2_rpc_fetch_response`.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix grammar

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* @return return `0` on success. libc-like `errno` otherwise
*
* Possible `errno` values:
* `EAI_MEMORY` => max descriptors count achieved
* `EINVAL` => invalid `actor_name` or request, or connection pool is empty for this actor.
*/
int32_t k2_rpc_send(const char* actor_name, size_t actor_name_len, const void* request_ptr, size_t request_size, enum RpcKind rpc_kind, uint64_t* rpc_d);

/**
* Get response size for corresponding request of this `rpc_d`. Write 0 to `response_size` and return `EAGAIN` if response is not ready.
* Write response size value to `response_size` if response is ready.
*
* @return return `0` on success. libc-like `errno` otherwise
*
* Possible `errno` values:
* `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add something like * Possible errno values:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

* `EAGAIN` => response is not ready yet.
*/
int32_t k2_rpc_get_response_size(uint64_t rpc_d, size_t* response_size);

/**
* Write response for corresponding request of this `rpc_d` to `buf`. Return `EAGAIN` if response is not ready.
* If `buf_size` < response size, then write first `buf_size` bytes of response to `buf`.
*
* @return return `0` on success. libc-like `errno` otherwise
*
* Possible `errno` values:
* `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor.
* `EAGAIN` => response is not ready yet.
*/
int32_t k2_rpc_fetch_response(uint64_t rpc_d, void* buf, size_t buf_size);

/**
* If the write or read status is `Blocked` - then the platform ensures that
* the component receives this `stream_d` via `k2_take_update` when the status is
Expand Down
20 changes: 0 additions & 20 deletions runtime-light/state/instance-state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,26 +221,6 @@ kphp::coro::task<> InstanceState::run_instance_epilogue() noexcept {
}
shutdown_state_ = shutdown_state::finished;

/*
* Unlike regular RPC requests whose results the user code waits for via rpc_fetch_responses,
* thereby guaranteeing they are sent, the user code does not wait for requests sent with the
* ignore_answer flag. Therefore, we can’t guarantee that the coroutines responsible for
* sending ignore_answer requests have finished. This means the requests might not be sent
* if the instance terminates.
*
* This await suspends the current coroutine until all pending ignore_answer requests are
* fully sent. While suspended, other forks and coroutines may continue running.
*
* After this call completes, delivery of all ignore_answer requests is guaranteed.
*/
{
auto& rpc_client_instance_st{RpcClientInstanceState::get()};
auto ignore_answer_request_await_set{std::exchange(rpc_client_instance_st.ignore_answer_request_awaiter_tasks, kphp::coro::await_set<void>{})};
while (!ignore_answer_request_await_set.empty()) {
co_await ignore_answer_request_await_set.next();
}
}

// Stop session with internal Web component
if (auto& web_state{WebInstanceState::get()}; web_state.session.has_value()) {
web_state.session_is_finished = true;
Expand Down
Loading
Loading