Skip to content

Commit

Permalink
feat(pubsub): per-call options in SchemaAdminClient (#8406)
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Feb 18, 2022
1 parent ff91e2e commit d6e0897
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 92 deletions.
58 changes: 36 additions & 22 deletions google/cloud/pubsub/schema_admin_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,85 +13,98 @@
// limitations under the License.

#include "google/cloud/pubsub/schema_admin_client.h"
#include "google/cloud/pubsub/internal/defaults.h"

namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

SchemaAdminClient::SchemaAdminClient(
std::shared_ptr<SchemaAdminConnection> connection)
: connection_(std::move(connection)) {}
std::shared_ptr<SchemaAdminConnection> connection, Options opts)
: connection_(std::move(connection)),
options_(internal::MergeOptions(
std::move(opts),
pubsub_internal::DefaultCommonOptions(connection_->options()))) {}

StatusOr<google::pubsub::v1::Schema> SchemaAdminClient::CreateAvroSchema(
Schema const& schema, std::string schema_definition) {
Schema const& schema, std::string schema_definition, Options opts) {
google::pubsub::v1::CreateSchemaRequest request;
request.set_parent("projects/" + schema.project_id());
request.set_schema_id(schema.schema_id());
request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
request.mutable_schema()->set_definition(std::move(schema_definition));
return CreateSchema(request);
return CreateSchema(request, std::move(opts));
}

StatusOr<google::pubsub::v1::Schema> SchemaAdminClient::CreateProtobufSchema(
Schema const& schema, std::string schema_definition) {
Schema const& schema, std::string schema_definition, Options opts) {
google::pubsub::v1::CreateSchemaRequest request;
request.set_parent("projects/" + schema.project_id());
request.set_schema_id(schema.schema_id());
request.mutable_schema()->set_type(
google::pubsub::v1::Schema::PROTOCOL_BUFFER);
request.mutable_schema()->set_definition(std::move(schema_definition));
return CreateSchema(request);
return CreateSchema(request, std::move(opts));
}

StatusOr<google::pubsub::v1::Schema> SchemaAdminClient::CreateSchema(
google::pubsub::v1::CreateSchemaRequest const& request) {
google::pubsub::v1::CreateSchemaRequest const& request, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
return connection_->CreateSchema(request);
}

StatusOr<google::pubsub::v1::Schema> SchemaAdminClient::GetSchema(
Schema const& schema, google::pubsub::v1::SchemaView view) {
Schema const& schema, google::pubsub::v1::SchemaView view, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
google::pubsub::v1::GetSchemaRequest request;
request.set_name(schema.FullName());
request.set_view(view);
return connection_->GetSchema(request);
}

ListSchemasRange SchemaAdminClient::ListSchemas(
std::string const& project_id, google::pubsub::v1::SchemaView view) {
std::string const& project_id, google::pubsub::v1::SchemaView view,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
google::pubsub::v1::ListSchemasRequest request;
request.set_parent("projects/" + project_id);
request.set_view(view);
return connection_->ListSchemas(request);
}

Status SchemaAdminClient::DeleteSchema(Schema const& schema) {
Status SchemaAdminClient::DeleteSchema(Schema const& schema, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
google::pubsub::v1::DeleteSchemaRequest request;
request.set_name(schema.FullName());
return connection_->DeleteSchema(request);
}

StatusOr<google::pubsub::v1::ValidateSchemaResponse>
SchemaAdminClient::ValidateAvroSchema(std::string const& project_id,
std::string schema_definition) {
std::string schema_definition,
Options opts) {
google::pubsub::v1::Schema schema;
schema.set_definition(std::move(schema_definition));
schema.set_type(google::pubsub::v1::Schema::AVRO);
return ValidateSchema(project_id, std::move(schema));
return ValidateSchema(project_id, std::move(schema), std::move(opts));
}

StatusOr<google::pubsub::v1::ValidateSchemaResponse>
SchemaAdminClient::ValidateProtobufSchema(std::string const& project_id,
std::string schema_definition) {
std::string schema_definition,
Options opts) {
google::pubsub::v1::Schema schema;
schema.set_definition(std::move(schema_definition));
schema.set_type(google::pubsub::v1::Schema::PROTOCOL_BUFFER);
return ValidateSchema(project_id, std::move(schema));
return ValidateSchema(project_id, std::move(schema), std::move(opts));
}

StatusOr<google::pubsub::v1::ValidateSchemaResponse>
SchemaAdminClient::ValidateSchema(std::string const& project_id,
google::pubsub::v1::Schema schema) {
google::pubsub::v1::Schema schema,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
google::pubsub::v1::ValidateSchemaRequest request;
request.set_parent("projects/" + project_id);
*request.mutable_schema() = std::move(schema);
Expand All @@ -101,45 +114,46 @@ SchemaAdminClient::ValidateSchema(std::string const& project_id,
StatusOr<google::pubsub::v1::ValidateMessageResponse>
SchemaAdminClient::ValidateMessageWithNamedSchema(
google::pubsub::v1::Encoding encoding, std::string message,
Schema const& named_schema) {
Schema const& named_schema, Options opts) {
google::pubsub::v1::ValidateMessageRequest request;
request.set_parent("projects/" + named_schema.project_id());
request.set_message(std::move(message));
request.set_encoding(encoding);
request.set_name(named_schema.FullName());
return ValidateMessage(request);
return ValidateMessage(request, std::move(opts));
}

StatusOr<google::pubsub::v1::ValidateMessageResponse>
SchemaAdminClient::ValidateMessageWithAvro(
google::pubsub::v1::Encoding encoding, std::string message,
std::string project_id, std::string schema_definition) {
std::string project_id, std::string schema_definition, Options opts) {
google::pubsub::v1::ValidateMessageRequest request;
request.set_parent("projects/" + std::move(project_id));
request.set_message(std::move(message));
request.set_encoding(encoding);
request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
request.mutable_schema()->set_definition(std::move(schema_definition));
return ValidateMessage(request);
return ValidateMessage(request, std::move(opts));
}

StatusOr<google::pubsub::v1::ValidateMessageResponse>
SchemaAdminClient::ValidateMessageWithProtobuf(
google::pubsub::v1::Encoding encoding, std::string message,
std::string project_id, std::string schema_definition) {
std::string project_id, std::string schema_definition, Options opts) {
google::pubsub::v1::ValidateMessageRequest request;
request.set_parent("projects/" + std::move(project_id));
request.set_message(std::move(message));
request.set_encoding(encoding);
request.mutable_schema()->set_type(
google::pubsub::v1::Schema::PROTOCOL_BUFFER);
request.mutable_schema()->set_definition(std::move(schema_definition));
return ValidateMessage(request);
return ValidateMessage(request, std::move(opts));
}

StatusOr<google::pubsub::v1::ValidateMessageResponse>
SchemaAdminClient::ValidateMessage(
google::pubsub::v1::ValidateMessageRequest const& request) {
google::pubsub::v1::ValidateMessageRequest const& request, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
return connection_->ValidateMessage(request);
}

Expand Down
59 changes: 40 additions & 19 deletions google/cloud/pubsub/schema_admin_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
*/
class SchemaAdminClient {
public:
explicit SchemaAdminClient(std::shared_ptr<SchemaAdminConnection> connection);
explicit SchemaAdminClient(std::shared_ptr<SchemaAdminConnection> connection,
Options opts = {});

/**
* The default constructor is deleted.
Expand All @@ -71,22 +72,22 @@ class SchemaAdminClient {
SchemaAdminClient() = delete;

/**
* @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&)
* @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&,Options)
*
* @par Example
* @snippet samples.cc create-avro-schema
*/
StatusOr<google::pubsub::v1::Schema> CreateAvroSchema(
Schema const& schema, std::string schema_definition);
Schema const& schema, std::string schema_definition, Options opts = {});

/**
* @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&)
* @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&,Options)
*
* @par Example
* @snippet samples.cc create-protobuf-schema
*/
StatusOr<google::pubsub::v1::Schema> CreateProtobufSchema(
Schema const& schema, std::string schema_definition);
Schema const& schema, std::string schema_definition, Options opts = {});

/**
* Creates a new Cloud Pub/Sub schema.
Expand All @@ -97,7 +98,8 @@ class SchemaAdminClient {
* as a consequence of retrying a successful (but reported as failed) request.
*/
StatusOr<google::pubsub::v1::Schema> CreateSchema(
google::pubsub::v1::CreateSchemaRequest const& request);
google::pubsub::v1::CreateSchemaRequest const& request,
Options opts = {});

/**
* Gets information about an existing Cloud Pub/Sub schema.
Expand All @@ -111,10 +113,13 @@ class SchemaAdminClient {
* @param schema the full name of the schema
* @param view Use `BASIC` to include the name and type of the schema, but not
* the definition. Use `FULL` to include the definition.
* @param opts Override the class-level options, such as retry and backoff
* policies.
*/
StatusOr<google::pubsub::v1::Schema> GetSchema(
Schema const& schema,
google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC);
google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC,
Options opts = {});

/**
* Lists all the schemas for a given project id.
Expand All @@ -128,10 +133,13 @@ class SchemaAdminClient {
* @param project_id lists the schemas in this project
* @param view Use `BASIC` to include the name and type of each schema, but
* not the definition. Use `FULL` to include the definition.
* @param opts Override the class-level options, such as retry and backoff
* policies.
*/
ListSchemasRange ListSchemas(
std::string const& project_id,
google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC);
google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC,
Options opts = {});

/**
* Deletes an existing schema in Cloud Pub/Sub.
Expand All @@ -146,8 +154,10 @@ class SchemaAdminClient {
* @snippet samples.cc delete-schema
*
* @param schema the name of the schema to be deleted.
* @param opts Override the class-level options, such as retry and backoff
* policies.
*/
Status DeleteSchema(Schema const& schema);
Status DeleteSchema(Schema const& schema, Options opts = {});

/**
* Validates a schema definition.
Expand All @@ -159,7 +169,8 @@ class SchemaAdminClient {
* @snippet samples.cc validate-avro-schema
*/
StatusOr<google::pubsub::v1::ValidateSchemaResponse> ValidateAvroSchema(
std::string const& project_id, std::string schema_definition);
std::string const& project_id, std::string schema_definition,
Options opts = {});

/**
* Validates a schema definition.
Expand All @@ -171,7 +182,8 @@ class SchemaAdminClient {
* @snippet samples.cc validate-protobuf-schema
*/
StatusOr<google::pubsub::v1::ValidateSchemaResponse> ValidateProtobufSchema(
std::string const& project_id, std::string schema_definition);
std::string const& project_id, std::string schema_definition,
Options opts = {});

/**
* Validates a schema definition.
Expand All @@ -180,10 +192,11 @@ class SchemaAdminClient {
* This is a read-only operation and therefore always idempotent and retried.
*/
StatusOr<google::pubsub::v1::ValidateSchemaResponse> ValidateSchema(
std::string const& project_id, google::pubsub::v1::Schema schema);
std::string const& project_id, google::pubsub::v1::Schema schema,
Options opts = {});

/**
* @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&)
* @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&,Options)
*
* @par Example
* @snippet samples.cc validate-message-named-schema
Expand All @@ -192,14 +205,16 @@ class SchemaAdminClient {
* support some encodings.
* @param message the message to validate
* @param named_schema the name of an existing schema to validate against
* @param opts Override the class-level options, such as retry and backoff
* policies.
*/
StatusOr<google::pubsub::v1::ValidateMessageResponse>
ValidateMessageWithNamedSchema(google::pubsub::v1::Encoding encoding,
std::string message,
Schema const& named_schema);
Schema const& named_schema, Options opts = {});

/**
* @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&)
* @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&,Options)
*
* @par Example
* @snippet samples.cc validate-message-avro
Expand All @@ -209,13 +224,15 @@ class SchemaAdminClient {
* @param message the message to validate
* @param project_id the project used to perform the validation
* @param schema_definition the schema definition, in AVRO format
* @param opts Override the class-level options, such as retry and backoff
* policies.
*/
StatusOr<google::pubsub::v1::ValidateMessageResponse> ValidateMessageWithAvro(
google::pubsub::v1::Encoding encoding, std::string message,
std::string project_id, std::string schema_definition);
std::string project_id, std::string schema_definition, Options opts = {});

/**
* @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&)
* @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&,Options)
*
* @par Example
* @snippet samples.cc validate-message-protobuf
Expand All @@ -225,11 +242,13 @@ class SchemaAdminClient {
* @param message the message to validate
* @param project_id the project used to perform the validation
* @param schema_definition the schema definition, in protocol buffers format
* @param opts Override the class-level options, such as retry and backoff
* policies.
*/
StatusOr<google::pubsub::v1::ValidateMessageResponse>
ValidateMessageWithProtobuf(google::pubsub::v1::Encoding encoding,
std::string message, std::string project_id,
std::string schema_definition);
std::string schema_definition, Options opts = {});

/**
* Validates a message against a schema.
Expand All @@ -238,10 +257,12 @@ class SchemaAdminClient {
* This is a read-only operation and therefore always idempotent and retried.
*/
StatusOr<google::pubsub::v1::ValidateMessageResponse> ValidateMessage(
google::pubsub::v1::ValidateMessageRequest const& request);
google::pubsub::v1::ValidateMessageRequest const& request,
Options opts = {});

private:
std::shared_ptr<SchemaAdminConnection> connection_;
Options options_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Loading

0 comments on commit d6e0897

Please sign in to comment.