diff --git a/README.md b/README.md index 848c0e5ee..6f543e7ff 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,8 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | --------------------------- | --------------------------------- | ------ | | Native Image Pub Sub Sample | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | | Publish Operations | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | +| Commit Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CommitAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CommitAvroSchemaExample.java) | +| Commit Proto Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CommitProtoSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CommitProtoSchemaExample.java) | | Create Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | | Create Big Query Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java) | | Create Proto Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | @@ -253,13 +255,17 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | Create Subscription With Ordering | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithOrdering.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateSubscriptionWithOrdering.java) | | Create Topic Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateTopicExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateTopicExample.java) | | Create Topic With Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaExample.java) | +| Create Topic With Schema Revisions Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaRevisionsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaRevisionsExample.java) | | Delete Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/DeleteSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/DeleteSchemaExample.java) | +| Delete Schema Revision Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/DeleteSchemaRevisionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/DeleteSchemaRevisionExample.java) | | Delete Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/DeleteSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/DeleteSubscriptionExample.java) | | Delete Topic Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/DeleteTopicExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/DeleteTopicExample.java) | | Detach Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/DetachSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/DetachSubscriptionExample.java) | | Get Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/GetSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/GetSchemaExample.java) | +| Get Schema Revision Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/GetSchemaRevisionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/GetSchemaRevisionExample.java) | | Get Subscription Policy Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/GetSubscriptionPolicyExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/GetSubscriptionPolicyExample.java) | | Get Topic Policy Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/GetTopicPolicyExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/GetTopicPolicyExample.java) | +| List Schema Revisions Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSchemaRevisionsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSchemaRevisionsExample.java) | | List Schemas Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSchemasExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSchemasExample.java) | | List Subscriptions In Project Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSubscriptionsInProjectExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSubscriptionsInProjectExample.java) | | List Subscriptions In Topic Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSubscriptionsInTopicExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSubscriptionsInTopicExample.java) | @@ -278,12 +284,14 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | Receive Messages With Delivery Attempts Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java) | | Remove Dead Letter Policy Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java) | | Resume Publish With Ordering Keys | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ResumePublishWithOrderingKeys.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ResumePublishWithOrderingKeys.java) | +| Rollback Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/RollbackSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/RollbackSchemaExample.java) | | Set Subscription Policy Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SetSubscriptionPolicyExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SetSubscriptionPolicyExample.java) | | Set Topic Policy Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SetTopicPolicyExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SetTopicPolicyExample.java) | | Subscribe Async Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java) | | Subscribe Sync Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeSyncExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeSyncExample.java) | | Subscribe Sync With Lease Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeSyncWithLeaseExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeSyncWithLeaseExample.java) | | Subscribe With Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaExample.java) | +| Subscribe With Avro Schema Revisions Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaRevisionsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaRevisionsExample.java) | | Subscribe With Concurrency Control Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeWithConcurrencyControlExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeWithConcurrencyControlExample.java) | | Subscribe With Custom Attributes Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeWithCustomAttributesExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeWithCustomAttributesExample.java) | | Subscribe With Error Listener Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/SubscribeWithErrorListenerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/SubscribeWithErrorListenerExample.java) | @@ -294,6 +302,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | Test Topic Permissions Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/TestTopicPermissionsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/TestTopicPermissionsExample.java) | | Update Dead Letter Policy Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java) | | Update Push Configuration Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/UpdatePushConfigurationExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/UpdatePushConfigurationExample.java) | +| Update Topic Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/UpdateTopicSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/UpdateTopicSchemaExample.java) | | State | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/utilities/State.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/utilities/State.java) | | State Proto | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/utilities/StateProto.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/utilities/StateProto.java) | diff --git a/samples/snippets/src/main/java/pubsub/CommitAvroSchemaExample.java b/samples/snippets/src/main/java/pubsub/CommitAvroSchemaExample.java new file mode 100644 index 000000000..e6ac8f278 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CommitAvroSchemaExample.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_commit_avro_schema] + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaName; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class CommitAvroSchemaExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String schemaId = "your-schema-id"; + String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"; + + commitAvroSchemaExample(projectId, schemaId, avscFile); + } + + public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile) + throws IOException { + + ProjectName projectName = ProjectName.of(projectId); + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + // Read an Avro schema file formatted in JSON as a string. + String avscSource = new String(Files.readAllBytes(Paths.get(avscFile))); + + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + + Schema schema = + schemaServiceClient.commitSchema( + schemaName.toString(), + Schema.newBuilder() + .setName(schemaName.toString()) + .setType(Schema.Type.AVRO) + .setDefinition(avscSource) + .build()); + + System.out.println("Committed a schema using an Avro schema:\n" + schema); + return schema; + } catch (AlreadyExistsException e) { + System.out.println(schemaName + "already exists."); + return null; + } + } +} +// [END pubsub_commit_avro_schema] diff --git a/samples/snippets/src/main/java/pubsub/CommitProtoSchemaExample.java b/samples/snippets/src/main/java/pubsub/CommitProtoSchemaExample.java new file mode 100644 index 000000000..b32de29dc --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CommitProtoSchemaExample.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_commit_proto_schema] + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaName; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class CommitProtoSchemaExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String schemaId = "your-schema-id"; + String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"; + + commitProtoSchemaExample(projectId, schemaId, protoFile); + } + + public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile) + throws IOException { + + ProjectName projectName = ProjectName.of(projectId); + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + // Read a proto file as a string. + String protoSource = new String(Files.readAllBytes(Paths.get(protoFile))); + + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + + Schema schema = + schemaServiceClient.commitSchema( + schemaName.toString(), + Schema.newBuilder() + .setName(schemaName.toString()) + .setType(Schema.Type.PROTOCOL_BUFFER) + .setDefinition(protoSource) + .build()); + + System.out.println("Committed a schema using a protobuf schema:\n" + schema); + return schema; + } catch (AlreadyExistsException e) { + System.out.println(schemaName + "already exists."); + return null; + } + } +} +// [END pubsub_commit_proto_schema] diff --git a/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java b/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java index 1b93b7fbe..393b128b3 100644 --- a/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception { createAvroSchemaExample(projectId, schemaId, avscFile); } - public static void createAvroSchemaExample(String projectId, String schemaId, String avscFile) + public static Schema createAvroSchemaExample(String projectId, String schemaId, String avscFile) throws IOException { ProjectName projectName = ProjectName.of(projectId); @@ -60,8 +60,10 @@ public static void createAvroSchemaExample(String projectId, String schemaId, St schemaId); System.out.println("Created a schema using an Avro schema:\n" + schema); + return schema; } catch (AlreadyExistsException e) { System.out.println(schemaName + "already exists."); + return null; } } } diff --git a/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java b/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java index a8efdeb8e..e7b5bf113 100644 --- a/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception { createProtoSchemaExample(projectId, schemaId, protoFile); } - public static void createProtoSchemaExample(String projectId, String schemaId, String protoFile) + public static Schema createProtoSchemaExample(String projectId, String schemaId, String protoFile) throws IOException { ProjectName projectName = ProjectName.of(projectId); @@ -60,8 +60,10 @@ public static void createProtoSchemaExample(String projectId, String schemaId, S schemaId); System.out.println("Created a schema using a protobuf schema:\n" + schema); + return schema; } catch (AlreadyExistsException e) { System.out.println(schemaName + "already exists."); + return null; } } } diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaRevisionsExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaRevisionsExample.java new file mode 100644 index 000000000..69322d927 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithSchemaRevisionsExample.java @@ -0,0 +1,82 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_create_topic_with_schema_revisions] + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.Encoding; +import com.google.pubsub.v1.SchemaName; +import com.google.pubsub.v1.SchemaSettings; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; + +public class CreateTopicWithSchemaRevisionsExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + // Use an existing schema. + String schemaId = "your-schema-id"; + // Choose either BINARY or JSON message serialization in this topic. + Encoding encoding = Encoding.BINARY; + // Set the minimum and maximum revsion ID + String firstRevisionId = "your-revision-id"; + String lastRevisionId = "your-revision-id"; + + createTopicWithSchemaRevisionsExample( + projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding); + } + + public static void createTopicWithSchemaRevisionsExample( + String projectId, + String topicId, + String schemaId, + String firstRevisionid, + String lastRevisionId, + Encoding encoding) + throws IOException { + TopicName topicName = TopicName.of(projectId, topicId); + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + SchemaSettings schemaSettings = + SchemaSettings.newBuilder() + .setSchema(schemaName.toString()) + .setFirstRevisionId(firstRevisionid) + .setLastRevisionId(lastRevisionId) + .setEncoding(encoding) + .build(); + + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + + Topic topic = + topicAdminClient.createTopic( + Topic.newBuilder() + .setName(topicName.toString()) + .setSchemaSettings(schemaSettings) + .build()); + + System.out.println("Created topic with schema: " + topic.getName()); + } catch (AlreadyExistsException e) { + System.out.println(schemaName + "already exists."); + } + } +} +// [END pubsub_create_topic_with_schema_revisions] diff --git a/samples/snippets/src/main/java/pubsub/DeleteSchemaRevisionExample.java b/samples/snippets/src/main/java/pubsub/DeleteSchemaRevisionExample.java new file mode 100644 index 000000000..30aa65a53 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/DeleteSchemaRevisionExample.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_delete_schema_revision] + +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.pubsub.v1.DeleteSchemaRevisionRequest; +import com.google.pubsub.v1.SchemaName; +import java.io.IOException; + +public class DeleteSchemaRevisionExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String schemaId = "your-schema-id@your-revision-id"; + + deleteSchemaRevisionExample(projectId, schemaId); + } + + public static void deleteSchemaRevisionExample(String projectId, String schemaId) + throws IOException { + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + + DeleteSchemaRevisionRequest request = + DeleteSchemaRevisionRequest.newBuilder().setName(schemaName.toString()).build(); + + schemaServiceClient.deleteSchemaRevision(request); + + System.out.println("Deleted a schema revision:" + schemaName); + + } catch (NotFoundException e) { + System.out.println(schemaName + "not found."); + } + } +} +// [END pubsub_delete_schema_revision] diff --git a/samples/snippets/src/main/java/pubsub/GetSchemaRevisionExample.java b/samples/snippets/src/main/java/pubsub/GetSchemaRevisionExample.java new file mode 100644 index 000000000..e811ae9ff --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/GetSchemaRevisionExample.java @@ -0,0 +1,52 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_get_schema_revision] + +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaName; +import java.io.IOException; + +public class GetSchemaRevisionExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String schemaId = "your-schema-id@your-schema-revision"; + + getSchemaRevisionExample(projectId, schemaId); + } + + public static void getSchemaRevisionExample(String projectId, String schemaId) + throws IOException { + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + + Schema schema = schemaServiceClient.getSchema(schemaName); + + System.out.println("Got a schema:\n" + schema); + + } catch (NotFoundException e) { + System.out.println(schemaName + "not found."); + } + } +} +// [END pubsub_get_schema_revision] diff --git a/samples/snippets/src/main/java/pubsub/ListSchemaRevisionsExample.java b/samples/snippets/src/main/java/pubsub/ListSchemaRevisionsExample.java new file mode 100644 index 000000000..69cfa59ab --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/ListSchemaRevisionsExample.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_list_schema_revisions] +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaName; +import java.io.IOException; + +public class ListSchemaRevisionsExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String schemaId = "your-schema-id"; + + listSchemaRevisionsExample(projectId, schemaId); + } + + public static void listSchemaRevisionsExample(String projectId, String schemaId) + throws IOException { + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + for (Schema schema : schemaServiceClient.listSchemaRevisions(schemaName).iterateAll()) { + System.out.println(schema); + } + System.out.println("Listed schema revisions."); + } + } +} +// [END pubsub_list_schema_revisions] diff --git a/samples/snippets/src/main/java/pubsub/RollbackSchemaExample.java b/samples/snippets/src/main/java/pubsub/RollbackSchemaExample.java new file mode 100644 index 000000000..795ba33b5 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/RollbackSchemaExample.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_rollback_schema] + +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaName; +import java.io.IOException; + +public class RollbackSchemaExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project"; + String schemaId = "your-schema@your-revision"; + String revisionId = "your-revision"; + + rollbackSchemaExample(projectId, schemaId, revisionId); + } + + public static void rollbackSchemaExample(String projectId, String schemaId, String revisionId) + throws IOException { + SchemaName schemaName = SchemaName.of(projectId, schemaId); + + try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { + + Schema schema = schemaServiceClient.rollbackSchema(schemaName, revisionId); + + System.out.println("Rolled back a schema:" + schema); + + } catch (NotFoundException e) { + System.out.println(schemaName + "not found."); + } + } +} +// [END pubsub_rollback_schema] diff --git a/samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaRevisionsExample.java b/samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaRevisionsExample.java new file mode 100644 index 000000000..f86f05107 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SubscribeWithAvroSchemaRevisionsExample.java @@ -0,0 +1,158 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_subscribe_avro_records_with_revisions] + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.SchemaServiceClient; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Schema; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import utilities.State; + +public class SubscribeWithAvroSchemaRevisionsExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + // Use an existing subscription. + String subscriptionId = "your-subscription-id"; + + subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId); + } + + static SchemaServiceClient getSchemaServiceClient() { + try { + return SchemaServiceClient.create(); + } catch (IOException e) { + System.out.println("Could not get schema client: " + e); + return null; + } + } + + public static void subscribeWithAvroSchemaRevisionsExample( + String projectId, String subscriptionId) { + // Used to get the schemas for revsions. + final SchemaServiceClient schemaServiceClient = getSchemaServiceClient(); + if (schemaServiceClient == null) { + return; + } + + // Cache for the readers for different revision IDs. + Map> revisionReaders = + new HashMap>(); + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Get the schema encoding type. + String name = message.getAttributesMap().get("googclient_schemaname"); + String revision = message.getAttributesMap().get("googclient_schemarevisionid"); + + SpecificDatumReader reader = null; + synchronized (revisionReaders) { + reader = revisionReaders.get(revision); + } + if (reader == null) { + // This is the first time we are seeing this revision. We need to + // fetch the schema and cache its decoder. It would be more typical + // to do this asynchronously, but it shown here in a synchronous + // way to ease readability. + try { + Schema schema = schemaServiceClient.getSchema(name + "@" + revision); + org.apache.avro.Schema avroSchema = + new org.apache.avro.Schema.Parser().parse(schema.getDefinition()); + reader = new SpecificDatumReader(State.getClassSchema(), avroSchema); + synchronized (revisionReaders) { + revisionReaders.put(revision, reader); + } + } catch (Exception e) { + System.out.println("Could not get schema: " + e); + // Without the schema, we cannot read the message, so nack it. + consumer.nack(); + return; + } + } + + ByteString data = message.getData(); + // Send the message data to a byte[] input stream. + InputStream inputStream = new ByteArrayInputStream(data.toByteArray()); + + String encoding = message.getAttributesMap().get("googclient_schemaencoding"); + + Decoder decoder = null; + + // Prepare an appropriate decoder for the message data in the input stream + // based on the schema encoding type. + try { + switch (encoding) { + case "BINARY": + decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null); + System.out.println("Receiving a binary-encoded message:"); + break; + case "JSON": + decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream); + System.out.println("Receiving a JSON-encoded message:"); + break; + default: + System.out.println("Unknown message type; nacking."); + consumer.nack(); + break; + } + + // Obtain an object of the generated Avro class using the decoder. + State state = reader.read(null, decoder); + System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr()); + + // Ack the message. + consumer.ack(); + } catch (IOException e) { + System.err.println(e); + // If we failed to process the message, nack it. + consumer.nack(); + } + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + subscriber.stopAsync(); + } + } +} +// [END pubsub_subscribe_avro_records_with_revisions] diff --git a/samples/snippets/src/main/java/pubsub/UpdateTopicSchemaExample.java b/samples/snippets/src/main/java/pubsub/UpdateTopicSchemaExample.java new file mode 100644 index 000000000..d0412650b --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/UpdateTopicSchemaExample.java @@ -0,0 +1,82 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_update_topic_schema] + +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.protobuf.FieldMask; +import com.google.pubsub.v1.SchemaSettings; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.UpdateTopicRequest; +import java.io.IOException; + +public class UpdateTopicSchemaExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + // This is an existing topic that has schema settings attached to it. + String topicId = "your-topic-id"; + // Set the minimum and maximum revsion ID + String firstRevisionId = "your-revision-id"; + String lastRevisionId = "your-revision-id"; + + UpdateTopicSchemaExample.updateTopicSchemaExample( + projectId, topicId, firstRevisionId, lastRevisionId); + } + + public static void updateTopicSchemaExample( + String projectId, String topicId, String firstRevisionid, String lastRevisionId) + throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + + TopicName topicName = TopicName.of(projectId, topicId); + + // Construct the dead letter policy you expect to have after the update. + SchemaSettings schemaSettings = + SchemaSettings.newBuilder() + .setFirstRevisionId(firstRevisionid) + .setLastRevisionId(lastRevisionId) + .build(); + + // Construct the subscription with the dead letter policy you expect to have + // after the update. Here, values in the required fields (name, topic) help + // identify the subscription. + Topic topic = + Topic.newBuilder() + .setName(topicName.toString()) + .setSchemaSettings(schemaSettings) + .build(); + + // Construct a field mask to indicate which field to update in the subscription. + FieldMask updateMask = + FieldMask.newBuilder() + .addPaths("schema_settings.first_revision_id") + .addPaths("schema_settings.last_revision_id") + .build(); + + UpdateTopicRequest request = + UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build(); + + Topic response = topicAdminClient.updateTopic(request); + + System.out.println("Updated topic with schema: " + topic.getName()); + } + } +} +// [END pubsub_update_topic_schema] diff --git a/samples/snippets/src/main/resources/us-states-plus.avsc b/samples/snippets/src/main/resources/us-states-plus.avsc new file mode 100644 index 000000000..74225ae7e --- /dev/null +++ b/samples/snippets/src/main/resources/us-states-plus.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"State", + "namespace":"utilities", + "doc":"A list of states in the United States of America.", + "fields":[ + { + "name":"name", + "type":"string", + "doc":"The common name of the state." + }, + { + "name":"post_abbr", + "type":"string", + "doc":"The postal code abbreviation of the state." + }, + { + "name":"population", + "type":"long", + "default":0, + "doc":"The population of the state." + } + ] +} diff --git a/samples/snippets/src/main/resources/us-states-plus.proto b/samples/snippets/src/main/resources/us-states-plus.proto new file mode 100644 index 000000000..646c7dcb6 --- /dev/null +++ b/samples/snippets/src/main/resources/us-states-plus.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package utilities; +option java_outer_classname = "StateProto"; + +message State { + string name = 1; + string post_abbr = 2; + int64 population = 3; +} diff --git a/samples/snippets/src/test/java/pubsub/SchemaIT.java b/samples/snippets/src/test/java/pubsub/SchemaIT.java index bec908800..31685db56 100644 --- a/samples/snippets/src/test/java/pubsub/SchemaIT.java +++ b/samples/snippets/src/test/java/pubsub/SchemaIT.java @@ -26,6 +26,7 @@ import com.google.cloud.testing.junit4.MultipleAttemptsRule; import com.google.pubsub.v1.Encoding; import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Schema; import com.google.pubsub.v1.SchemaName; import com.google.pubsub.v1.TopicName; import java.io.ByteArrayOutputStream; @@ -46,6 +47,7 @@ public class SchemaIT { private static String _suffix; private static String avroTopicId; private static String protoTopicId; + private static String protoTopicWithRevisionsId; private static String avroSubscriptionId; private static String protoSubscriptionId; private static String avroSchemaId; @@ -54,12 +56,17 @@ public class SchemaIT { ClassLoader classLoader = getClass().getClassLoader(); File avscFile = new File(classLoader.getResource("us-states.avsc").getFile()); String absoluteAvscFilePath = avscFile.getAbsolutePath(); + File avscRevisionFile = new File(classLoader.getResource("us-states-plus.avsc").getFile()); + String absoluteAvscRevisionFilePath = avscRevisionFile.getAbsolutePath(); File protoFile = new File(classLoader.getResource("us-states.proto").getFile()); String absoluteProtoFilePath = protoFile.getAbsolutePath(); + File protoRevisionFile = new File(classLoader.getResource("us-states-plus.proto").getFile()); + String absoluteProtoRevisionFilePath = protoFile.getAbsolutePath(); private static TopicName avroTopicName; private static TopicName protoTopicName; + private static TopicName protoTopicWithRevisionsName; private static ProjectSubscriptionName avroSubscriptionName; private static ProjectSubscriptionName protoSubscriptionName; private static SchemaName avroSchemaName; @@ -79,12 +86,14 @@ public void setUp() { _suffix = UUID.randomUUID().toString(); avroTopicId = "avro-topic-" + _suffix; protoTopicId = "proto-topic-" + _suffix; + protoTopicWithRevisionsId = "proto-topic-with-revisions-" + _suffix; avroSubscriptionId = "avro-subscription-" + _suffix; protoSubscriptionId = "proto-subscription-" + _suffix; avroSchemaId = "avro-schema-" + _suffix; protoSchemaId = "proto-schema-" + _suffix; avroTopicName = TopicName.of(projectId, avroTopicId); protoTopicName = TopicName.of(projectId, protoTopicId); + protoTopicWithRevisionsName = TopicName.of(projectId, protoTopicWithRevisionsId); avroSubscriptionName = ProjectSubscriptionName.of(projectId, avroSubscriptionId); protoSubscriptionName = ProjectSubscriptionName.of(projectId, protoSubscriptionId); avroSchemaName = SchemaName.of(projectId, avroSchemaId); @@ -98,26 +107,49 @@ public void setUp() { public void tearDown() throws Exception { // Delete the schemas if they have not been cleaned up. try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) { - schemaServiceClient.deleteSchema(protoSchemaName); - schemaServiceClient.deleteSchema(avroSchemaName); - } catch (NotFoundException ignored) { - // Ignore this as resources may have already been cleaned up. + try { + schemaServiceClient.deleteSchema(protoSchemaName); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } + try { + schemaServiceClient.deleteSchema(avroSchemaName); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } } // Delete the subscriptions. try (SubscriptionAdminClient subscriptionAdmin = SubscriptionAdminClient.create()) { - subscriptionAdmin.deleteSubscription(avroSubscriptionName.toString()); - subscriptionAdmin.deleteSubscription(protoSubscriptionName.toString()); - } catch (NotFoundException ignored) { - // Ignore this as resources may have already been cleaned up. + try { + subscriptionAdmin.deleteSubscription(avroSubscriptionName.toString()); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } + try { + subscriptionAdmin.deleteSubscription(protoSubscriptionName.toString()); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } } // Delete the topics. try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { - topicAdminClient.deleteTopic(avroTopicName.toString()); - topicAdminClient.deleteTopic(protoTopicName.toString()); - } catch (NotFoundException ignored) { - // Ignore this as resources may have already been cleaned up. + try { + topicAdminClient.deleteTopic(avroTopicName.toString()); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } + try { + topicAdminClient.deleteTopic(protoTopicName.toString()); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } + try { + topicAdminClient.deleteTopic(protoTopicWithRevisionsName.toString()); + } catch (NotFoundException ignored) { + // Ignore this as resources may have already been cleaned up. + } } System.setOut(null); } @@ -125,28 +157,69 @@ public void tearDown() throws Exception { @Test public void testSchema() throws Exception { // Test creating Avro schema. - CreateAvroSchemaExample.createAvroSchemaExample(projectId, avroSchemaId, absoluteAvscFilePath); + Schema avroSchema = + CreateAvroSchemaExample.createAvroSchemaExample( + projectId, avroSchemaId, absoluteAvscFilePath); assertThat(bout.toString()).contains("Created a schema using an Avro schema:"); assertThat(bout.toString()).contains(avroSchemaName.toString()); + bout.reset(); + // Test committing Avro schema. + CommitAvroSchemaExample.commitAvroSchemaExample( + projectId, avroSchemaId, absoluteAvscRevisionFilePath); + assertThat(bout.toString()).contains("Committed a schema using an Avro schema:"); + assertThat(bout.toString()).contains(avroSchemaName.toString()); + + bout.reset(); // Test creating Proto schema. - CreateProtoSchemaExample.createProtoSchemaExample( - projectId, protoSchemaId, absoluteProtoFilePath); + final Schema protoSchema = + CreateProtoSchemaExample.createProtoSchemaExample( + projectId, protoSchemaId, absoluteProtoFilePath); assertThat(bout.toString()).contains("Created a schema using a protobuf schema:"); assertThat(bout.toString()).contains(protoSchemaName.toString()); + bout.reset(); + // Test committing Proto schema. + final Schema protoSchemaRevision = + CommitProtoSchemaExample.commitProtoSchemaExample( + projectId, protoSchemaId, absoluteProtoRevisionFilePath); + assertThat(bout.toString()).contains("Committed a schema using a protobuf schema:"); + assertThat(bout.toString()).contains(protoSchemaName.toString()); + + bout.reset(); + // Test rolling back a schema. + RollbackSchemaExample.rollbackSchemaExample( + projectId, + protoSchemaId + "@" + protoSchema.getRevisionId(), + protoSchemaRevision.getRevisionId()); + assertThat(bout.toString()).contains("Rolled back a schema:"); + assertThat(bout.toString()).contains(protoSchemaName.toString()); + bout.reset(); // Test getting a schema. GetSchemaExample.getSchemaExample(projectId, avroSchemaId); assertThat(bout.toString()).contains("Got a schema:"); assertThat(bout.toString()).contains(avroSchemaName.toString()); + bout.reset(); + // Test getting a schema revision. + GetSchemaRevisionExample.getSchemaRevisionExample( + projectId, protoSchemaId + "@" + protoSchemaRevision.getRevisionId()); + assertThat(bout.toString()).contains("Got a schema:"); + assertThat(bout.toString()).contains(protoSchemaName.toString()); + bout.reset(); // Test listing schemas. ListSchemasExample.listSchemasExample(projectId); assertThat(bout.toString()).contains("Listed schemas."); assertThat(bout.toString()).contains(avroSchemaName.toString()); + bout.reset(); + // Test listing schema revisions. + ListSchemaRevisionsExample.listSchemaRevisionsExample(projectId, protoSchemaId); + assertThat(bout.toString()).contains("Listed schema revisions."); + assertThat(bout.toString()).contains(protoSchemaName.toString()); + bout.reset(); // Test creating a topic with an Avro schema with BINARY encoding. CreateTopicWithSchemaExample.createTopicWithSchemaExample( @@ -159,6 +232,19 @@ public void testSchema() throws Exception { projectId, protoTopicId, protoSchemaId, Encoding.JSON); assertThat(bout.toString()).contains("Created topic with schema: " + protoTopicName.toString()); + bout.reset(); + // Test creating a topic with a proto schema with revisions specified. + CreateTopicWithSchemaRevisionsExample.createTopicWithSchemaRevisionsExample( + projectId, + protoTopicWithRevisionsId, + protoSchemaId, + protoSchema.getRevisionId(), + protoSchemaRevision.getRevisionId(), + Encoding.BINARY); + assertThat(bout.toString()) + .contains("Created topic with schema: " + protoTopicWithRevisionsName.toString()); + + bout.reset(); // Attach a default pull subscription to each topic. CreatePullSubscriptionExample.createPullSubscriptionExample( projectId, avroSubscriptionId, avroTopicId); @@ -179,7 +265,8 @@ public void testSchema() throws Exception { bout.reset(); // Test receiving BINARY-encoded Avro records. - SubscribeWithAvroSchemaExample.subscribeWithAvroSchemaExample(projectId, avroSubscriptionId); + SubscribeWithAvroSchemaRevisionsExample.subscribeWithAvroSchemaRevisionsExample( + projectId, avroSubscriptionId); assertThat(bout.toString()).contains("Receiving a binary-encoded message:"); assertThat(bout.toString()).contains(" is abbreviated as "); @@ -189,6 +276,23 @@ public void testSchema() throws Exception { assertThat(bout.toString()).contains("Received a JSON-formatted message:"); assertThat(bout.toString()).contains("Ack'ed the message"); + bout.reset(); + // Test updating a topic schema settings + UpdateTopicSchemaExample.updateTopicSchemaExample( + projectId, + protoTopicWithRevisionsId, + protoSchemaRevision.getRevisionId(), + protoSchemaRevision.getRevisionId()); + assertThat(bout.toString()) + .contains("Updated topic with schema: " + protoTopicWithRevisionsName.toString()); + + bout.reset(); + // Test deleting a schema revision. + DeleteSchemaRevisionExample.deleteSchemaRevisionExample( + projectId, protoSchemaId + "@" + protoSchemaRevision.getRevisionId()); + assertThat(bout.toString()).contains("Deleted a schema revision:"); + assertThat(bout.toString()).contains(protoSchemaName.toString()); + bout.reset(); // Test deleting a schema. DeleteSchemaExample.deleteSchemaExample(projectId, avroSchemaId);