Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion example/asynchronous_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class EchoServiceImpl : public example::EchoService {
// optional: set a callback function which is called after response is sent
// and before cntl/req/res is destructed.
cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
true);

// The purpose of following logs is to help you to understand
// how clients interact with servers more intuitively. You should
Expand Down
3 changes: 2 additions & 1 deletion example/echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class EchoServiceImpl : public EchoService {
// optional: set a callback function which is called after response is sent
// and before cntl/req/res is destructed.
cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
true);

// The purpose of following logs is to help you to understand
// how clients interact with servers more intuitively. You should
Expand Down
3 changes: 2 additions & 1 deletion example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class HttpServiceImpl : public HttpService {
// optional: set a callback function which is called after response is sent
// and before cntl/req/res is destructed.
cntl->set_after_rpc_resp_fn(std::bind(&HttpServiceImpl::CallAfterRpc,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
true);

// Fill response.
cntl->http_response().set_content_type("text/plain");
Expand Down
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,7 @@ void Controller::CallAfterRpcResp(const google::protobuf::Message* req, const go
if (_after_rpc_resp_fn) {
_after_rpc_resp_fn(this, req, res);
_after_rpc_resp_fn = nullptr;
clear_flag(FLAGS_MANAGE_AFTER_RPC_RESP);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
static const uint32_t FLAGS_MANAGE_AFTER_RPC_RESP = (1 << 23);

public:
struct Inheritable {
Expand Down Expand Up @@ -621,7 +622,12 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
const google::protobuf::Message* req,
const google::protobuf::Message* res)>;

void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn = fn; }
void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn,
bool manage_concurrency_remover = false) {
_after_rpc_resp_fn = fn;
set_flag(FLAGS_MANAGE_AFTER_RPC_RESP,
manage_concurrency_remover && !!_after_rpc_resp_fn);
}

void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);

Expand Down
12 changes: 12 additions & 0 deletions src/brpc/details/controller_private_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ class ControllerPrivateAccessor {

std::shared_ptr<Span> span() const;

bool has_after_rpc_resp_fn() const {
return !!_cntl->_after_rpc_resp_fn;
}

bool has_added_concurrency() const {
return _cntl->has_flag(Controller::FLAGS_ADDED_CONCURRENCY);
}

bool manages_after_rpc_resp() const {
return _cntl->has_flag(Controller::FLAGS_MANAGE_AFTER_RPC_RESP);
}

uint32_t pipelined_count() const { return _cntl->_pipelined_count; }
void set_pipelined_count(uint32_t count) { _cntl->_pipelined_count = count; }

Expand Down
24 changes: 19 additions & 5 deletions src/brpc/details/method_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
// under the License.


#include <limits>
#include "butil/macros.h"
#include "brpc/controller.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/method_status.h"

Expand Down Expand Up @@ -156,12 +155,27 @@ int HandleResponseWritten(bthread_id_t id, void* data, int /*error_code*/) {
return 0;
}

ConcurrencyRemover::~ConcurrencyRemover() {
ConcurrencyRemover::ConcurrencyRemover(
MethodStatus* status, Controller* c, int64_t received_us)
: _status(status)
, _c(c)
, _server(c->server())
, _added_concurrency(ControllerPrivateAccessor(c).has_added_concurrency())
, _received_us(received_us) {
}

void ConcurrencyRemover::OnResponded(int error_code) {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status->OnResponded(error_code, butil::cpuwide_time_us() - _received_us);
_status = NULL;
}
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
}

ConcurrencyRemover::~ConcurrencyRemover() {
OnResponded(_c->ErrorCode());
if (_server) {
ServerPrivateAccessor(_server).RemoveConcurrency(_added_concurrency);
}
}

} // namespace brpc
7 changes: 5 additions & 2 deletions src/brpc/details/method_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,17 @@ int HandleResponseWritten(bthread_id_t id, void* data, int error_code);

class ConcurrencyRemover {
public:
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us)
: _status(status) , _c(c) , _received_us(received_us) {}
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us);
~ConcurrencyRemover();

void OnResponded(int error_code);

private:
DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover);
MethodStatus* _status;
Controller* _c;
const Server* _server;
bool _added_concurrency;
int64_t _received_us;
};

Expand Down
8 changes: 6 additions & 2 deletions src/brpc/details/server_private_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ class ServerPrivateAccessor {
<= _server->options().max_concurrency);
}

void RemoveConcurrency(const Controller* c) {
if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
void RemoveConcurrency(bool added_concurrency) {
if (added_concurrency) {
butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
}
}

void RemoveConcurrency(const Controller* c) {
RemoveConcurrency(c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY));
}

// Find by MethodDescriptor::full_name
const Server::MethodProperty*
FindMethodPropertyByFullName(const butil::StringPiece &fullname) {
Expand Down
39 changes: 25 additions & 14 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
RpcPBMessages* messages, const Server* server,
MethodStatus* method_status, int64_t received_us,
std::shared_ptr<Span> span) {
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ControllerPrivateAccessor accessor(cntl);
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
Expand All @@ -282,21 +283,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,

const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();
const bool has_after_rpc_resp_fn = accessor.has_after_rpc_resp_fn();
const bool manages_after_rpc_resp = accessor.manages_after_rpc_resp();
std::unique_ptr<ConcurrencyRemover> concurrency_remover(
new ConcurrencyRemover(method_status, cntl, received_us));

// Recycle resources at the end of this function.
BRPC_SCOPE_EXIT {
{
// Remove concurrency and record latency at first.
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
}

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);

if (NULL == messages) {
return;
}

cntl->CallAfterRpcResp(req, res);
if (NULL == server->options().baidu_master_service) {
server->options().rpc_pb_message_factory->Return(messages);
} else {
Expand Down Expand Up @@ -390,8 +385,11 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,

ResponseWriteInfo args;
bthread_id_t response_id = INVALID_BTHREAD_ID;
const bool wait_for_response = (span || has_after_rpc_resp_fn);
if (span) {
span->set_response_size(res_buf.size());
}
if (wait_for_response) {
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}

Expand Down Expand Up @@ -451,12 +449,25 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
}
}

if (span) {
if (wait_for_response) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
if (span) {
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
}
const int responded_error_code = cntl->ErrorCode();
if (!manages_after_rpc_resp) {
concurrency_remover.reset();
}
if (has_after_rpc_resp_fn) {
cntl->CallAfterRpcResp(req, res);
}
if (manages_after_rpc_resp) {
concurrency_remover->OnResponded(responded_error_code);
}
concurrency_remover.reset();
}

namespace {
Expand Down
53 changes: 38 additions & 15 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,7 @@ friend class HttpResponseSenderAsDone;
class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
explicit HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override {
if (NULL != _sender._messages) {
_sender._cntl->CallAfterRpcResp(_sender._messages->Request(),
_sender._messages->Response());
}
delete this;
}
void Run() override { delete this; }

private:
HttpResponseSender _sender;
Expand All @@ -840,7 +834,10 @@ HttpResponseSender::~HttpResponseSender() {
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
const bool has_after_rpc_resp_fn = accessor.has_after_rpc_resp_fn();
const bool manages_after_rpc_resp = accessor.manages_after_rpc_resp();
std::unique_ptr<ConcurrencyRemover> concurrency_remover(
new ConcurrencyRemover(_method_status, cntl, _received_us));
Socket* socket = accessor.get_sending_socket();
const google::protobuf::Message* res = NULL != _messages ? _messages->Response() : NULL;

Expand All @@ -851,6 +848,14 @@ HttpResponseSender::~HttpResponseSender() {

const HttpHeader* req_header = &cntl->http_request();
HttpHeader* res_header = &cntl->http_response();
HttpHeader original_response_header;
butil::IOBuf original_response_attachment;
if (has_after_rpc_resp_fn) {
original_response_header.Swap(*res_header);
original_response_attachment.swap(cntl->response_attachment());
res_header->Swap(original_response_header);
cntl->response_attachment().swap(original_response_attachment);
}
res_header->set_version(req_header->major_version(),
req_header->minor_version());

Expand Down Expand Up @@ -1000,7 +1005,8 @@ HttpResponseSender::~HttpResponseSender() {
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
bthread_id_t response_id = INVALID_BTHREAD_ID;
if (span) {
const bool wait_for_response = (span || has_after_rpc_resp_fn);
if (wait_for_response) {
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
wopt.id_wait = response_id;
wopt.notify_on_success = true;
Expand All @@ -1020,8 +1026,10 @@ HttpResponseSender::~HttpResponseSender() {
if (FLAGS_http_verbose) {
LOG(INFO) << '\n' << *h2_response;
}
if (span) {
span->set_response_size(h2_response->EstimatedByteSize());
if (span || has_after_rpc_resp_fn) {
if (span) {
span->set_response_size(h2_response->EstimatedByteSize());
}
}
rc = socket->Write(h2_response, &wopt);
}
Expand Down Expand Up @@ -1050,12 +1058,27 @@ HttpResponseSender::~HttpResponseSender() {
return;
}

if (span) {
if (wait_for_response) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
if (span) {
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
}
const int responded_error_code = cntl->ErrorCode();
if (!manages_after_rpc_resp) {
concurrency_remover.reset();
}
if (has_after_rpc_resp_fn && NULL != _messages) {
cntl->http_response().Swap(original_response_header);
cntl->response_attachment().swap(original_response_attachment);
cntl->CallAfterRpcResp(_messages->Request(), _messages->Response());
}
if (manages_after_rpc_resp) {
concurrency_remover->OnResponded(responded_error_code);
}
concurrency_remover.reset();
}

// Normalize the sub string of `uri_path' covered by `splitter' and
Expand Down
Loading
Loading