Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions google/cloud/storage/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/storage/idempotency_policy.h"
#include "google/cloud/storage/internal/base64.h"
#include "google/cloud/storage/internal/connection_factory.h"
#include "google/cloud/storage/internal/feature_tracker.h"
#include "google/cloud/storage/options.h"
#include "google/cloud/internal/curl_handle.h"
#include "google/cloud/internal/curl_options.h"
Expand Down Expand Up @@ -624,8 +625,8 @@ Options DefaultOptions(Options opts) {
rest_defaults.set<rest::CAPathOption>(o.get<internal::CAPathOption>());
}

return google::cloud::internal::MergeOptions(std::move(o),
std::move(rest_defaults));
return internal::SetupFeatureTracker(google::cloud::internal::MergeOptions(
std::move(o), std::move(rest_defaults)));
}

Options DefaultOptionsWithCredentials(Options opts) {
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/google_cloud_cpp_storage.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ google_cloud_cpp_storage_hdrs = [
"internal/curl/request_builder.h",
"internal/default_object_acl_requests.h",
"internal/empty_response.h",
"internal/feature_tracker.h",
"internal/generate_message_boundary.h",
"internal/generic_object_request.h",
"internal/generic_request.h",
Expand Down Expand Up @@ -173,6 +174,7 @@ google_cloud_cpp_storage_srcs = [
"internal/crc32c.cc",
"internal/default_object_acl_requests.cc",
"internal/empty_response.cc",
"internal/feature_tracker.cc",
"internal/generate_message_boundary.cc",
"internal/generic_stub_adapter.cc",
"internal/generic_stub_factory.cc",
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/storage/google_cloud_cpp_storage.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ add_library(
internal/default_object_acl_requests.h
internal/empty_response.cc
internal/empty_response.h
internal/feature_tracker.cc
internal/feature_tracker.h
internal/generate_message_boundary.cc
internal/generate_message_boundary.h
internal/generic_object_request.h
Expand Down Expand Up @@ -437,6 +439,7 @@ if (BUILD_TESTING)
internal/const_buffer_test.cc
internal/crc32c_test.cc
internal/default_object_acl_requests_test.cc
internal/feature_tracker_test.cc
internal/generate_message_boundary_test.cc
internal/generic_request_test.cc
internal/hash_function_impl_test.cc
Expand Down
15 changes: 10 additions & 5 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "google/cloud/storage/async/retry_policy.h"
#include "google/cloud/storage/internal/async/default_options.h"
#include "google/cloud/storage/internal/async/handle_redirect_error.h"
#include "google/cloud/storage/internal/feature_tracker.h"
#include "google/cloud/storage/internal/async/insert_object.h"
#include "google/cloud/storage/internal/async/object_descriptor_impl.h"
#include "google/cloud/storage/internal/async/open_object.h"
Expand Down Expand Up @@ -242,19 +243,22 @@ AsyncConnectionImpl::Open(OpenParams p) {
auto backoff =
std::shared_ptr<storage::BackoffPolicy>(backoff_policy(*current));
auto const* function_name = __func__;
auto feature_tracker = std::make_shared<storage::internal::FeatureTracker>();
auto factory = OpenStreamFactory(
[stub = stub_, cq = cq_, retry = std::move(retry),
backoff = std::move(backoff), current = std::move(current),
function_name](google::storage::v2::BidiReadObjectRequest request) {
function_name,
feature_tracker](google::storage::v2::BidiReadObjectRequest request) {
struct DummyRequest {};

auto call = [stub, request = std::move(request)](
auto call = [stub, request = std::move(request), feature_tracker](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
DummyRequest const&) mutable {
auto open = std::make_shared<OpenObject>(
*stub, cq, std::move(context), std::move(options), request);
*stub, cq, std::move(context), std::move(options), request,
feature_tracker);
// Extend the lifetime of the coroutine until it finishes.
return open->Call().then([open, &request](auto f) mutable {
open.reset();
Expand All @@ -275,7 +279,8 @@ AsyncConnectionImpl::Open(OpenParams p) {
using ReturnType = std::shared_ptr<storage::ObjectDescriptorConnection>;
return pending.then([rp = std::move(resume_policy), fa = std::move(factory),
rs = std::move(p.read_spec),
options = std::move(p.options), refresh = refresh_](
options = std::move(p.options), refresh = refresh_,
feature_tracker](
auto f) mutable -> StatusOr<ReturnType> {
auto result = f.get();
if (!result) return std::move(result).status();
Expand All @@ -296,7 +301,7 @@ AsyncConnectionImpl::Open(OpenParams p) {
};
auto impl = std::make_shared<ObjectDescriptorImpl>(
std::move(rp), std::move(fa), std::move(rs), std::move(result->stream),
std::move(options), std::move(transport_ok));
std::move(options), std::move(transport_ok), feature_tracker);
impl->Start(std::move(result->first_response));
return ReturnType(impl);
});
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/storage/internal/async/default_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/storage/async/resume_policy.h"
#include "google/cloud/storage/async/retry_policy.h"
#include "google/cloud/storage/async/writer_connection.h"
#include "google/cloud/storage/internal/feature_tracker.h"
#include "google/cloud/storage/internal/grpc/default_options.h"
#include <limits>

Expand Down Expand Up @@ -77,7 +78,8 @@ Options DefaultOptionsAsync(Options opts) {
.set<storage::EnableCrc32cValidationOption>(true)
.set<storage::MaximumRangeSizeOption>(128 * 1024 * 1024L)
.set<storage::EnableMultiStreamOptimizationOption>(true));
return Adjust(DefaultOptionsGrpc(std::move(opts)));
return storage::internal::SetupFeatureTracker(
Adjust(DefaultOptionsGrpc(std::move(opts))));
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
10 changes: 8 additions & 2 deletions google/cloud/storage/internal/async/object_descriptor_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream, Options options,
std::function<bool()> transport_ok)
std::function<bool()> transport_ok,
std::shared_ptr<storage::internal::FeatureTracker> feature_tracker)
: resume_policy_prototype_(std::move(resume_policy)),
make_stream_(std::move(make_stream)),
read_object_spec_(std::move(read_object_spec)),
options_(std::move(options)),
transport_ok_(std::move(transport_ok)) {
transport_ok_(std::move(transport_ok)),
feature_tracker_(std::move(feature_tracker)) {
stream_manager_ = std::make_unique<StreamManager>(
[]() -> std::shared_ptr<ReadStream> { return nullptr; }, // NOLINT
std::make_shared<ReadStream>(std::move(stream),
Expand Down Expand Up @@ -97,6 +99,10 @@ void ObjectDescriptorImpl::AssurePendingStreamQueued(
auto request = google::storage::v2::BidiReadObjectRequest{};

*request.mutable_read_object_spec() = read_object_spec_;
if (feature_tracker_ && stream_manager_ && stream_manager_->Size() >= 2) {
feature_tracker_->RegisterFeature(
storage::internal::TrackedFeature::kMultiStreamInMRD);
}
pending_stream_ = make_stream_(std::move(request));
}

Expand Down
14 changes: 9 additions & 5 deletions google/cloud/storage/internal/async/object_descriptor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "google/cloud/storage/async/object_descriptor_connection.h"
#include "google/cloud/storage/async/resume_policy.h"
#include "google/cloud/storage/internal/feature_tracker.h"
#include "google/cloud/storage/internal/async/multi_stream_manager.h"
#include "google/cloud/storage/internal/async/object_descriptor_reader.h"
#include "google/cloud/storage/internal/async/open_stream.h"
Expand Down Expand Up @@ -57,11 +58,13 @@ class ObjectDescriptorImpl
: public storage::ObjectDescriptorConnection,
public std::enable_shared_from_this<ObjectDescriptorImpl> {
public:
ObjectDescriptorImpl(std::unique_ptr<storage::ResumePolicy> resume_policy,
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream, Options options = {},
std::function<bool()> transport_ok = {});
ObjectDescriptorImpl(
std::unique_ptr<storage::ResumePolicy> resume_policy,
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream, Options options = {},
std::function<bool()> transport_ok = {},
std::shared_ptr<storage::internal::FeatureTracker> feature_tracker = {});
~ObjectDescriptorImpl() override;

// Start the read loop.
Expand Down Expand Up @@ -132,6 +135,7 @@ class ObjectDescriptorImpl
pending_stream_;
bool cancelled_ = false;
std::function<bool()> transport_ok_;
std::shared_ptr<storage::internal::FeatureTracker> feature_tracker_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ using ::google::protobuf::TextFormat;
using ::testing::_;
using ::testing::AtMost;
using ::testing::ElementsAre;
using ::testing::Eq;
using ::testing::NotNull;
using ::testing::Optional;
using ::testing::ResultOf;
Expand Down Expand Up @@ -2294,6 +2295,73 @@ TEST(ObjectDescriptorImpl, PartialReadChecksumValidationBypassed) {
next.first.set_value(true);
}

TEST(ObjectDescriptorImpl, MultiStreamFeatureTracking) {
auto feature_tracker = std::make_shared<storage::internal::FeatureTracker>();
EXPECT_THAT(feature_tracker->GetMask(), Eq(0));

auto s1 = std::make_unique<MockStream>();
EXPECT_CALL(*s1, Cancel).Times(AtMost(1));
EXPECT_CALL(*s1, Write).WillRepeatedly([](Request const&, grpc::WriteOptions) {
return make_ready_future(true);
});
EXPECT_CALL(*s1, Read).WillRepeatedly([] {
return make_ready_future(absl::optional<Response>{});
});

auto s2 = std::make_unique<MockStream>();
EXPECT_CALL(*s2, Cancel).Times(AtMost(1));
EXPECT_CALL(*s2, Write).WillRepeatedly([](Request const&, grpc::WriteOptions) {
return make_ready_future(true);
});
EXPECT_CALL(*s2, Read).WillRepeatedly([] {
return make_ready_future(absl::optional<Response>{});
});
auto open_stream2 = std::make_shared<OpenStream>(std::move(s2));

auto s3 = std::make_unique<MockStream>();
EXPECT_CALL(*s3, Cancel).Times(AtMost(1));
EXPECT_CALL(*s3, Write).WillRepeatedly([](Request const&, grpc::WriteOptions) {
return make_ready_future(true);
});
EXPECT_CALL(*s3, Read).WillRepeatedly([] {
return make_ready_future(absl::optional<Response>{});
});
auto open_stream3 = std::make_shared<OpenStream>(std::move(s3));

MockFactory factory;
EXPECT_CALL(factory, Call)
.WillOnce([&](Request const&) {
return make_ready_future(StatusOr<OpenStreamResult>(
OpenStreamResult{open_stream2, Response{}}));
})
.WillRepeatedly([&](Request const&) {
return make_ready_future(StatusOr<OpenStreamResult>(
OpenStreamResult{open_stream3, Response{}}));
});

Options options;
options.set<storage::EnableMultiStreamOptimizationOption>(true);

auto tested = std::make_shared<ObjectDescriptorImpl>(
NoResume(), factory.AsStdFunction(),
google::storage::v2::BidiReadObjectSpec{},
std::make_shared<OpenStream>(std::move(s1)), options, [] { return true; },
feature_tracker);

tested->Start(Response{});
// After Start, we have 1 open stream and 1 pending stream. Mask should still be 0.
EXPECT_THAT(feature_tracker->GetMask(), Eq(0));

// Make the first stream busy by initiating a read.
auto reader1 = tested->Read({0, 100});

// Now that the first stream is busy, MakeSubsequentStream will consume the
// pending stream (adding it as the 2nd active stream) and trigger
// AssurePendingStreamQueued for the 3rd stream, registering the feature.
tested->MakeSubsequentStream();
EXPECT_THAT(feature_tracker->GetMask(), Eq(1));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
25 changes: 18 additions & 7 deletions google/cloud/storage/internal/async/open_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ std::string RequestParams(
return absl::StrCat("bucket=", read_spec.bucket());
}

OpenObject::OpenObject(storage_internal::StorageStub& stub, CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::BidiReadObjectRequest request)
: rpc_(std::make_shared<OpenStream>(CreateRpc(
stub, cq, std::move(context), std::move(options), request))),
OpenObject::OpenObject(
storage_internal::StorageStub& stub, CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::BidiReadObjectRequest request,
std::shared_ptr<storage::internal::FeatureTracker> tracker)
: rpc_(std::make_shared<OpenStream>(
CreateRpc(stub, cq, std::move(context), std::move(options), request,
tracker))),
initial_request_(std::move(request)) {}

future<StatusOr<OpenStreamResult>> OpenObject::Call() {
Expand All @@ -56,9 +59,17 @@ std::unique_ptr<OpenStream::StreamingRpc> OpenObject::CreateRpc(
storage_internal::StorageStub& stub, CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::BidiReadObjectRequest const& request) {
google::storage::v2::BidiReadObjectRequest const& request,
std::shared_ptr<storage::internal::FeatureTracker> const& tracker) {
auto p = RequestParams(request);
if (!p.empty()) context->AddMetadata("x-goog-request-params", std::move(p));
if (tracker) {
auto v = tracker->HeaderValue();
if (!v.empty()) {
context->AddMetadata(storage::internal::kFeatureTrackerHeaderName,
std::move(v));
}
}
return stub.AsyncBidiReadObject(cq, std::move(context), std::move(options));
}

Expand Down
14 changes: 9 additions & 5 deletions google/cloud/storage/internal/async/open_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_OPEN_OBJECT_H

#include "google/cloud/storage/internal/async/open_stream.h"
#include "google/cloud/storage/internal/feature_tracker.h"
#include "google/cloud/storage/internal/storage_stub.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/future.h"
Expand Down Expand Up @@ -80,10 +81,12 @@ std::string RequestParams(
class OpenObject : public std::enable_shared_from_this<OpenObject> {
public:
/// Create a coroutine to create an open a bidi streaming read RPC.
OpenObject(storage_internal::StorageStub& stub, CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::BidiReadObjectRequest request);
OpenObject(
storage_internal::StorageStub& stub, CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::BidiReadObjectRequest request,
std::shared_ptr<storage::internal::FeatureTracker> tracker = {});

/// Start the coroutine.
future<StatusOr<OpenStreamResult>> Call();
Expand All @@ -95,7 +98,8 @@ class OpenObject : public std::enable_shared_from_this<OpenObject> {
storage_internal::StorageStub& stub, CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::BidiReadObjectRequest const& request);
google::storage::v2::BidiReadObjectRequest const& request,
std::shared_ptr<storage::internal::FeatureTracker> const& tracker);

void OnStart(bool ok);
void OnWrite(bool ok);
Expand Down
Loading
Loading