From f9dc4c68e6d2f8e9e3c1a6f0f44240b8deaa775d Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Mon, 7 Jun 2021 18:40:06 -0400 Subject: [PATCH] samples: add schema samples, typescript sample support (#1262) fixes: https://github.com/googleapis/nodejs-pubsub/issues/1220 The remainder of this PR is about adding schema sample snippets, as well as some preliminary TypeScript snippet support. There are still some linting issues with the generated samples, and I wanted to talk about the method used here before committing to it, so it's still marked as a work in progress. --- samples/createAvroSchema.js | 69 ++++ samples/createProtoSchema.js | 69 ++++ samples/createTopicWithSchema.js | 75 +++++ samples/deleteSchema.js | 59 ++++ samples/getSchema.js | 59 ++++ samples/listSchemas.js | 55 ++++ samples/listenForAvroRecords.js | 110 +++++++ samples/listenForProtobufMessages.js | 110 +++++++ samples/package.json | 24 +- samples/publishAvroRecords.js | 100 ++++++ samples/publishProtobufMessages.js | 102 ++++++ samples/synchronousPull.js | 2 +- .../fixtures/getSchema-expected.jstest | 57 ++++ .../fixtures/getSchema-expected.tstest | 53 +++ .../fixtures/getSchema-fixture.jstest | 53 +++ .../fixtures/getSchema-fixture.tstest | 53 +++ samples/system-test/fixtures/provinces.avsc | 18 + samples/system-test/fixtures/provinces.proto | 8 + samples/system-test/schema.test.ts | 311 ++++++++++++++++++ samples/system-test/typescript.test.ts | 68 ++++ samples/tsconfig.json | 19 ++ samples/typescript/README.md | 5 + samples/typescript/avro-js.d.ts | 16 + samples/typescript/createAvroSchema.ts | 66 ++++ samples/typescript/createProtoSchema.ts | 66 ++++ samples/typescript/createTopicWithSchema.ts | 75 +++++ samples/typescript/deleteSchema.ts | 55 ++++ samples/typescript/getSchema.ts | 55 ++++ samples/typescript/listSchemas.ts | 51 +++ samples/typescript/listenForAvroRecords.ts | 106 ++++++ .../typescript/listenForProtobufMessages.ts | 109 ++++++ samples/typescript/publishAvroRecords.ts | 101 ++++++ samples/typescript/publishProtobufMessages.ts | 103 ++++++ samples/typescript/tsconfig.json | 21 ++ samples/utils/simplifier.js | 95 ++++++ src/pubsub.ts | 3 +- src/schema.ts | 20 +- test/pubsub.ts | 9 +- test/schema.ts | 4 +- 39 files changed, 2412 insertions(+), 22 deletions(-) create mode 100644 samples/createAvroSchema.js create mode 100644 samples/createProtoSchema.js create mode 100644 samples/createTopicWithSchema.js create mode 100644 samples/deleteSchema.js create mode 100644 samples/getSchema.js create mode 100644 samples/listSchemas.js create mode 100644 samples/listenForAvroRecords.js create mode 100644 samples/listenForProtobufMessages.js create mode 100644 samples/publishAvroRecords.js create mode 100644 samples/publishProtobufMessages.js create mode 100644 samples/system-test/fixtures/getSchema-expected.jstest create mode 100644 samples/system-test/fixtures/getSchema-expected.tstest create mode 100644 samples/system-test/fixtures/getSchema-fixture.jstest create mode 100644 samples/system-test/fixtures/getSchema-fixture.tstest create mode 100644 samples/system-test/fixtures/provinces.avsc create mode 100644 samples/system-test/fixtures/provinces.proto create mode 100644 samples/system-test/schema.test.ts create mode 100644 samples/system-test/typescript.test.ts create mode 100644 samples/tsconfig.json create mode 100644 samples/typescript/README.md create mode 100644 samples/typescript/avro-js.d.ts create mode 100644 samples/typescript/createAvroSchema.ts create mode 100644 samples/typescript/createProtoSchema.ts create mode 100644 samples/typescript/createTopicWithSchema.ts create mode 100644 samples/typescript/deleteSchema.ts create mode 100644 samples/typescript/getSchema.ts create mode 100644 samples/typescript/listSchemas.ts create mode 100644 samples/typescript/listenForAvroRecords.ts create mode 100644 samples/typescript/listenForProtobufMessages.ts create mode 100644 samples/typescript/publishAvroRecords.ts create mode 100644 samples/typescript/publishProtobufMessages.ts create mode 100644 samples/typescript/tsconfig.json create mode 100644 samples/utils/simplifier.js diff --git a/samples/createAvroSchema.js b/samples/createAvroSchema.js new file mode 100644 index 000000000..e60fa3534 --- /dev/null +++ b/samples/createAvroSchema.js @@ -0,0 +1,69 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Create an Avro based Schema +// description: Creates a new schema definition on a project, using Avro +// usage: node createAvroSchema.js + +// [START pubsub_create_avro_schema] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; +// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json'; + +// Imports the Google Cloud client library +const {PubSub, SchemaTypes} = require('@google-cloud/pubsub'); +const fs = require('fs'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createAvroSchema(schemaName, avscFile) { + const definition = fs.readFileSync(avscFile).toString(); + const schema = await pubSubClient.createSchema( + schemaName, + SchemaTypes.Avro, + definition + ); + + const name = await schema.getName(); + console.log(`Schema ${name} created.`); +} +// [END pubsub_create_avro_schema] + +function main( + schemaName = 'YOUR_SCHEMA_NAME', + avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json' +) { + createAvroSchema(schemaName, avscFile).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/createProtoSchema.js b/samples/createProtoSchema.js new file mode 100644 index 000000000..23af653a8 --- /dev/null +++ b/samples/createProtoSchema.js @@ -0,0 +1,69 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Create a Proto based Schema +// description: Creates a new schema definition on a project, using Protos +// usage: node createProtoSchema.js + +// [START pubsub_create_proto_schema] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; +// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers'; + +// Imports the Google Cloud client library +const {PubSub, SchemaTypes} = require('@google-cloud/pubsub'); +const fs = require('fs'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createProtoSchema(schemaName, protoFile) { + const definition = fs.readFileSync(protoFile).toString(); + const schema = await pubSubClient.createSchema( + schemaName, + SchemaTypes.ProtocolBuffer, + definition + ); + + const fullName = await schema.getName(); + console.log(`Schema ${fullName} created.`); +} +// [END pubsub_create_proto_schema] + +function main( + schemaName = 'YOUR_SCHEMA_NAME', + protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers' +) { + createProtoSchema(schemaName, protoFile).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/createTopicWithSchema.js b/samples/createTopicWithSchema.js new file mode 100644 index 000000000..903f9e591 --- /dev/null +++ b/samples/createTopicWithSchema.js @@ -0,0 +1,75 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Create Topic With Schema +// description: Creates a new topic, with a schema definition. +// usage: node createTopicWithSchema.js [encoding-type] + +// [START pubsub_create_topic_with_schema] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicName = 'YOUR_TOPIC_NAME'; +// const schemaName = 'YOUR_SCHEMA_NAME'; +// const encodingType = 'BINARY'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createTopicWithSchema(topicName, schemaName, encodingType) { + // Get the fully qualified schema name. + const schema = pubSubClient.schema(schemaName); + const fullName = await schema.getName(); + + // Creates a new topic with a schema. Note that you might also + // pass Encodings.Json or Encodings.Binary here. + await pubSubClient.createTopic({ + name: topicName, + schemaSettings: { + schema: fullName, + encoding: encodingType, + }, + }); + console.log(`Topic ${topicName} created with schema ${fullName}.`); +} +// [END pubsub_create_topic_with_schema] + +function main( + topicName = 'YOUR_TOPIC_NAME', + schemaName = 'YOUR_SCHEMA_NAME', + encodingType = 'BINARY' +) { + createTopicWithSchema(topicName, schemaName, encodingType).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/deleteSchema.js b/samples/deleteSchema.js new file mode 100644 index 000000000..e30a45627 --- /dev/null +++ b/samples/deleteSchema.js @@ -0,0 +1,59 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Delete a previously created schema +// description: Deletes a schema which was previously created in the project. +// usage: node deleteSchema.js + +// [START pubsub_delete_schema] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function deleteSchema(schemaName) { + const schema = pubSubClient.schema(schemaName); + const name = await schema.getName(); + await schema.delete(); + console.log(`Schema ${name} deleted.`); +} +// [END pubsub_delete_schema] + +function main(schemaName = 'YOUR_SCHEMA_NAME') { + deleteSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/getSchema.js b/samples/getSchema.js new file mode 100644 index 000000000..5883e9d90 --- /dev/null +++ b/samples/getSchema.js @@ -0,0 +1,59 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Get a previously created schema +// description: Gets information about a schema which was previously created in the project. +// usage: node getSchema.js + +// [START pubsub_get_schema] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function getSchema(schemaName) { + const schema = pubSubClient.schema(schemaName); + const info = await schema.get(); + const fullName = await schema.getName(); + console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`); +} +// [END pubsub_get_schema] + +function main(schemaName = 'YOUR_SCHEMA_NAME') { + getSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/listSchemas.js b/samples/listSchemas.js new file mode 100644 index 000000000..724daeb96 --- /dev/null +++ b/samples/listSchemas.js @@ -0,0 +1,55 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: List schemas on a project +// description: Gets a list of schemas which were previously created in the project. +// usage: node listSchemas.js + +// [START pubsub_list_schemas] + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function listSchemas() { + for await (const s of pubSubClient.listSchemas()) { + console.log(await s.name); + } + console.log('Listed schemas.'); +} +// [END pubsub_list_schemas] + +function main() { + listSchemas().catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(); diff --git a/samples/listenForAvroRecords.js b/samples/listenForAvroRecords.js new file mode 100644 index 000000000..e3af46a3d --- /dev/null +++ b/samples/listenForAvroRecords.js @@ -0,0 +1,110 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Listen For Avro Records +// description: Listens for records in Avro encoding from a subscription. +// usage: node listenForAvroRecords.js [timeout-in-seconds] + +// [START pubsub_subscribe_avro_records] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME'; +// const timeout = 60; + +// Imports the Google Cloud client library +const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub'); + +// Node FS library, to load definitions +const fs = require('fs'); + +// And the Apache Avro library +const avro = require('avro-js'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +function listenForAvroRecords(subscriptionName, timeout) { + // References an existing subscription + const subscription = pubSubClient.subscription(subscriptionName); + + // Make an encoder using the official avro-js library. + const definition = fs + .readFileSync('system-test/fixtures/provinces.avsc') + .toString(); + const type = avro.parse(definition); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = async message => { + // "Ack" (acknowledge receipt of) the message + message.ack(); + + // Get the schema metadata from the message. + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + + let result; + switch (schemaMetadata.encoding) { + case Encodings.Binary: + result = type.fromBuffer(message.data); + break; + case Encodings.Json: + result = type.fromString(message.data.toString()); + break; + default: + console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`); + break; + } + + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${JSON.stringify(result, null, 4)}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; + }; + + // Listen for new messages until timeout is hit + subscription.on('message', messageHandler); + + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_subscribe_avro_records] + +function main(subscriptionName = 'YOUR_SUBSCRIPTION_NAME', timeout = 60) { + timeout = Number(timeout); + + try { + listenForAvroRecords(subscriptionName, timeout); + } catch (err) { + console.error(err.message); + process.exitCode = 1; + } +} + +main(...process.argv.slice(2)); diff --git a/samples/listenForProtobufMessages.js b/samples/listenForProtobufMessages.js new file mode 100644 index 000000000..258a44057 --- /dev/null +++ b/samples/listenForProtobufMessages.js @@ -0,0 +1,110 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Listen For Protobuf Messages +// description: Listens for messages in protobuf encoding from a subscription. +// usage: node listenForProtobufMessages.js [timeout-in-seconds] + +// [START pubsub_subscribe_proto_messages] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME'; +// const timeout = 60; + +// Imports the Google Cloud client library +const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub'); + +// And the protobufjs library +const protobuf = require('protobufjs'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function listenForProtobufMessages(subscriptionName, timeout) { + // References an existing subscription + const subscription = pubSubClient.subscription(subscriptionName); + + // Make an decoder using the protobufjs library. + // + // Since we're providing the test message for a specific schema here, we'll + // also code in the path to a sample proto definition. + const root = protobuf.loadSync('system-test/fixtures/provinces.proto'); + const Province = root.lookupType('utilities.Province'); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = async message => { + // "Ack" (acknowledge receipt of) the message + message.ack(); + + // Get the schema metadata from the message. + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + + let result; + switch (schemaMetadata.encoding) { + case Encodings.Binary: + result = Province.decode(message.data); + break; + case Encodings.Json: + // This doesn't require decoding with the protobuf library, + // since it's plain JSON. But you can still validate it against + // your schema. + result = JSON.parse(message.data.toString()); + console.log(`Validation of JSON: ${Province.verify(result)}`); + break; + default: + console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`); + break; + } + + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${JSON.stringify(result, null, 4)}`); + console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`); + messageCount += 1; + }; + + // Listen for new messages until timeout is hit + subscription.on('message', messageHandler); + + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_subscribe_proto_messages] + +function main(subscriptionName = 'YOUR_SUBSCRIPTION_NAME', timeout = 60) { + timeout = Number(timeout); + + listenForProtobufMessages(subscriptionName, timeout).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/package.json b/samples/package.json index 73cb82390..633176968 100644 --- a/samples/package.json +++ b/samples/package.json @@ -1,7 +1,8 @@ { "name": "nodejs-docs-samples-pubsub", "files": [ - "*.js" + "*.js", + "typescript/*.ts" ], "private": true, "license": "Apache-2.0", @@ -11,16 +12,33 @@ "node": ">=10" }, "scripts": { - "test": "mocha system-test --timeout 600000" + "test": "mocha build/system-test --timeout 600000", + "pretest": "npm run compile", + "tsc": "tsc -p .", + "simplifier": "node ./utils/simplifier.js post build/typescript/build/*.js", + "copybuilt": "cp *.js build/ && cp build/typescript/build/*.js build/", + "copytests": "cp -p system-test/*.js build/system-test", + "sampletsc": "cd typescript && tsc -p . && cd ..", + "compile": "npm run tsc && npm run sampletsc && npm run simplifier && npm run copybuilt && npm run copytests", + "clean": "gts clean && rm -rf build/", + "precompile": "npm run clean && node ./utils/simplifier.js commentify typescript/*.ts", + "updatejs": "cp build/*.js ." }, "dependencies": { "@google-cloud/pubsub": "^2.12.0", "@opentelemetry/api": "^0.18.1", - "@opentelemetry/tracing": "^0.18.2" + "@opentelemetry/tracing": "^0.18.2", + "avro-js": "^1.10.1", + "p-defer": "^3.0.0", + "protobufjs": "^6.11.2" }, "devDependencies": { + "@types/chai": "^4.2.16", + "@types/rimraf": "^3.0.0", "chai": "^4.2.0", + "gts": "^3.1.0", "mocha": "^8.0.0", + "rimraf": "^3.0.2", "uuid": "^8.0.0" } } diff --git a/samples/publishAvroRecords.js b/samples/publishAvroRecords.js new file mode 100644 index 000000000..1ff0a9563 --- /dev/null +++ b/samples/publishAvroRecords.js @@ -0,0 +1,100 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Publish Avro Records to a Topic +// description: Publishes a record in Avro to a topic with a schema. +// usage: node publishAvroRecords.js + +// [START pubsub_publish_avro_records] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const topicName = 'YOUR_TOPIC_NAME'; + +// Imports the Google Cloud client library +const {PubSub, Encodings} = require('@google-cloud/pubsub'); + +// And the Apache Avro library +const avro = require('avro-js'); +const fs = require('fs'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function publishAvroRecords(topicName) { + // Get the topic metadata to learn about its schema encoding. + const topic = pubSubClient.topic(topicName); + const [topicMetadata] = await topic.getMetadata(); + const topicSchemaMetadata = topicMetadata.schemaSettings; + + if (!topicSchemaMetadata) { + console.log(`Topic ${topicName} doesn't seem to have a schema.`); + return; + } + const schemaEncoding = topicSchemaMetadata.encoding; + + // Make an encoder using the official avro-js library. + const definition = fs + .readFileSync('system-test/fixtures/provinces.avsc') + .toString(); + const type = avro.parse(definition); + + // Encode the message. + const province = { + name: 'Ontario', + post_abbr: 'ON', + }; + let dataBuffer; + switch (schemaEncoding) { + case Encodings.Binary: + dataBuffer = type.toBuffer(province); + break; + case Encodings.Json: + dataBuffer = Buffer.from(type.toString(province)); + break; + default: + console.log(`Unknown schema encoding: ${schemaEncoding}`); + break; + } + if (!dataBuffer) { + console.log(`Invalid encoding ${schemaEncoding} on the topic.`); + return; + } + + const messageId = await topic.publish(dataBuffer); + console.log(`Avro record ${messageId} published.`); +} +// [END pubsub_publish_avro_records] + +function main(topicName = 'YOUR_TOPIC_NAME') { + publishAvroRecords(topicName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/publishProtobufMessages.js b/samples/publishProtobufMessages.js new file mode 100644 index 000000000..c022dc350 --- /dev/null +++ b/samples/publishProtobufMessages.js @@ -0,0 +1,102 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Publish Protobuf Messages to a Topic +// description: Publishes a message in protobuf form to a topic with a schema. +// usage: node publishProtobufMessages.js + +// [START pubsub_publish_proto_messages] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const topicName = 'YOUR_TOPIC_NAME'; + +// Imports the Google Cloud client library +const {PubSub, Encodings} = require('@google-cloud/pubsub'); + +// And the protobufjs library +const protobuf = require('protobufjs'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function publishProtobufMessages(topicName) { + // Get the topic metadata to learn about its schema. + const topic = pubSubClient.topic(topicName); + const [topicMetadata] = await topic.getMetadata(); + const topicSchemaMetadata = topicMetadata.schemaSettings; + + if (!topicSchemaMetadata) { + console.log(`Topic ${topicName} doesn't seem to have a schema.`); + return; + } + const schemaEncoding = topicSchemaMetadata.encoding; + + // Encode the message. + const province = { + name: 'Ontario', + postAbbr: 'ON', + }; + + // Make an encoder using the protobufjs library. + // + // Since we're providing the test message for a specific schema here, we'll + // also code in the path to a sample proto definition. + const root = await protobuf.load('system-test/fixtures/provinces.proto'); + const Province = root.lookupType('utilities.Province'); + const message = Province.create(province); + + let dataBuffer; + switch (schemaEncoding) { + case Encodings.Binary: + dataBuffer = Buffer.from(Province.encode(message).finish()); + break; + case Encodings.Json: + dataBuffer = Buffer.from(JSON.stringify(message.toJSON())); + break; + default: + console.log(`Unknown schema encoding: ${schemaEncoding}`); + break; + } + if (!dataBuffer) { + console.log(`Invalid encoding ${schemaEncoding} on the topic.`); + return; + } + + const messageId = await topic.publish(dataBuffer); + console.log(`Protobuf message ${messageId} published.`); +} +// [END pubsub_publish_proto_messages] + +function main(topicName = 'YOUR_TOPIC_NAME') { + publishProtobufMessages(topicName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/synchronousPull.js b/samples/synchronousPull.js index fbf9cf357..ea5f991df 100644 --- a/samples/synchronousPull.js +++ b/samples/synchronousPull.js @@ -69,7 +69,7 @@ function main( } if (ackIds.length !== 0) { - // Acknowledge all of the messages. You could also ackknowledge + // Acknowledge all of the messages. You could also acknowledge // these individually, but this is more efficient. const ackRequest = { subscription: formattedSubscription, diff --git a/samples/system-test/fixtures/getSchema-expected.jstest b/samples/system-test/fixtures/getSchema-expected.jstest new file mode 100644 index 000000000..7c89b0fbf --- /dev/null +++ b/samples/system-test/fixtures/getSchema-expected.jstest @@ -0,0 +1,57 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Get a previously created schema +// description: Gets information about a schema which was previously created in the project. +// usage: node getSchema.js + +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function getSchema(schemaName) { + const schema = pubSubClient.schema(schemaName); + const info = await schema.get(); + const fullName = await schema.getName(); + console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`); +} + +function main(schemaName = 'YOUR_SCHEMA_NAME') { + getSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/system-test/fixtures/getSchema-expected.tstest b/samples/system-test/fixtures/getSchema-expected.tstest new file mode 100644 index 000000000..1df0c5bd6 --- /dev/null +++ b/samples/system-test/fixtures/getSchema-expected.tstest @@ -0,0 +1,53 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//BLANK +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ +//BLANK +// sample-metadata: +// title: Get a previously created schema +// description: Gets information about a schema which was previously created in the project. +// usage: node getSchema.js +//BLANK +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; +//BLANK +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; +//BLANK +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); +//BLANK +async function getSchema(schemaName: string) { + const schema = pubSubClient.schema(schemaName); + const info = await schema.get(); + const fullName = await schema.getName(); + console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`); +} +//BLANK +function main(schemaName = 'YOUR_SCHEMA_NAME') { + getSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} +//BLANK +main(...process.argv.slice(2)); diff --git a/samples/system-test/fixtures/getSchema-fixture.jstest b/samples/system-test/fixtures/getSchema-fixture.jstest new file mode 100644 index 000000000..1e523a4f4 --- /dev/null +++ b/samples/system-test/fixtures/getSchema-fixture.jstest @@ -0,0 +1,53 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//BLANK +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ +//BLANK +// sample-metadata: +// title: Get a previously created schema +// description: Gets information about a schema which was previously created in the project. +// usage: node getSchema.js +//BLANK +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; +//BLANK +// Imports the Google Cloud client library +import { PubSub } from '@google-cloud/pubsub'; +//BLANK +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); +//BLANK +async function getSchema(schemaName) { + const schema = pubSubClient.schema(schemaName); + const info = await schema.get(); + const fullName = await schema.getName(); + console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`); +} +//BLANK +function main(schemaName = 'YOUR_SCHEMA_NAME') { + getSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} +//BLANK +main(...process.argv.slice(2)); diff --git a/samples/system-test/fixtures/getSchema-fixture.tstest b/samples/system-test/fixtures/getSchema-fixture.tstest new file mode 100644 index 000000000..cc670bf6f --- /dev/null +++ b/samples/system-test/fixtures/getSchema-fixture.tstest @@ -0,0 +1,53 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Get a previously created schema +// description: Gets information about a schema which was previously created in the project. +// usage: node getSchema.js + +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function getSchema(schemaName: string) { + const schema = pubSubClient.schema(schemaName); + const info = await schema.get(); + const fullName = await schema.getName(); + console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`); +} + +function main(schemaName = 'YOUR_SCHEMA_NAME') { + getSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/system-test/fixtures/provinces.avsc b/samples/system-test/fixtures/provinces.avsc new file mode 100644 index 000000000..3997e71a8 --- /dev/null +++ b/samples/system-test/fixtures/provinces.avsc @@ -0,0 +1,18 @@ +{ + "type":"record", + "name":"Province", + "namespace":"utilities", + "doc":"A list of provinces in Canada.", + "fields":[ + { + "name":"name", + "type":"string", + "doc":"The common name of the province." + }, + { + "name":"post_abbr", + "type":"string", + "doc":"The postal code abbreviation of the province." + } + ] +} \ No newline at end of file diff --git a/samples/system-test/fixtures/provinces.proto b/samples/system-test/fixtures/provinces.proto new file mode 100644 index 000000000..08f05488e --- /dev/null +++ b/samples/system-test/fixtures/provinces.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package utilities; + +message Province { + string name = 1; + string post_abbr = 2; +} diff --git a/samples/system-test/schema.test.ts b/samples/system-test/schema.test.ts new file mode 100644 index 000000000..88840cfe9 --- /dev/null +++ b/samples/system-test/schema.test.ts @@ -0,0 +1,311 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + Encodings, + Message, + PubSub, + Schema, + SchemaEncoding, + SchemaTypes, + Subscription, + Topic, +} from '@google-cloud/pubsub'; +import {assert} from 'chai'; +import {describe, it, afterEach} from 'mocha'; +import * as cp from 'child_process'; +import * as uuid from 'uuid'; +import * as path from 'path'; +import * as defer from 'p-defer'; + +// Tests run as Node 12. +// eslint-disable-next-line node/no-unsupported-features/node-builtins +import {promises as fs} from 'fs'; + +const execSync = (cmd: string) => cp.execSync(cmd, {encoding: 'utf-8'}); + +describe('schema', () => { + const projectId = process.env.GCLOUD_PROJECT; + const pubsub = new PubSub({projectId}); + const runId = uuid.v4(); + console.log(`Topics runId: ${runId}`); + const topicIdStem = `schema-top-${runId}`; + const subscriptionIdStem = `schema-sub-${runId}`; + const schemaIdStem = `schema-${runId}`; + + // Schemas have a delay between deletion and actual deletion, so + // we have to make unique schema names. It's simplest to do it for topics + // and subscriptions too. + let curTopic = 1; + function getTopicId(): string { + return topicIdStem + curTopic++; + } + + let curSub = 1; + function getSubId(): string { + return subscriptionIdStem + curSub++; + } + + let curSchema = 1; + function getSchemaId(): string { + return schemaIdStem + curSchema++; + } + + function fullSchemaName(base: string) { + return `projects/${projectId}/schemas/${base}`; + } + + const commandFor = (action: string) => `node ${action}.js`; + + async function cleanSchemas() { + const schemas = []; + for await (const s of pubsub.listSchemas()) { + schemas.push(pubsub.schema(s.name!).delete()); + } + await Promise.all(schemas); + } + + async function cleanTopics() { + const [topics] = await pubsub.getTopics(); + await Promise.all( + topics.filter(x => x.name.endsWith(runId)).map(x => x.delete()) + ); + } + + async function cleanSubs() { + const [subscriptions] = await pubsub.getSubscriptions(); + await Promise.all( + subscriptions.filter(x => x.name.endsWith(runId)).map(x => x.delete()) + ); + } + + afterEach(async () => { + await cleanSubs(); + await cleanTopics(); + await cleanSchemas(); + }); + + function fixturePath(fixture: string): string { + return path.join(__dirname, '..', '..', 'system-test', 'fixtures', fixture); + } + + async function createSchema(type: 'avro' | 'proto'): Promise { + const suffix = type === 'avro' ? 'avsc' : 'proto'; + const encoding = + type === 'avro' ? SchemaTypes.Avro : SchemaTypes.ProtocolBuffer; + const def = ( + await fs.readFile(fixturePath(`provinces.${suffix}`)) + ).toString(); + const schemaId = getSchemaId(); + const schema = await pubsub.createSchema(schemaId, encoding, def); + assert.ok(schema); + + return schema; + } + + async function createTopicWithSchema( + schemaName: string, + encodingType: SchemaEncoding + ): Promise { + const topicId = getTopicId(); + const [topic] = await pubsub.createTopic({ + name: topicId, + schemaSettings: { + schema: fullSchemaName(schemaName), + encoding: encodingType, + }, + }); + assert.ok(topic); + + return topic; + } + + async function createSub(topicName: string): Promise { + const subId = getSubId(); + const [sub] = await pubsub.createSubscription(topicName, subId); + assert.ok(sub); + + return sub; + } + + it('should create an avro schema', async () => { + const schemaId = getSchemaId(); + const output = execSync( + `${commandFor('createAvroSchema')} ${schemaId} ${fixturePath( + 'provinces.avsc' + )}` + ); + assert.include(output, schemaId); + assert.include(output, 'created.'); + + let found = false; + for await (const s of pubsub.listSchemas()) { + if (s.name?.endsWith(schemaId)) { + found = true; + break; + } + } + + assert.ok(found, 'created schema was not found'); + }); + + it('should create a proto schema', async () => { + const schemaId = getSchemaId(); + const output = execSync( + `${commandFor('createProtoSchema')} ${schemaId} ${fixturePath( + 'provinces.proto' + )}` + ); + assert.include(output, schemaId); + assert.include(output, 'created.'); + + let found = false; + for await (const s of pubsub.listSchemas()) { + if (s.name?.endsWith(schemaId)) { + found = true; + break; + } + } + + assert.ok(found, 'created schema was not found'); + }); + + it('should create a topic with a schema', async () => { + const schema = await createSchema('proto'); + const topicId = getTopicId(); + const output = execSync( + `${commandFor('createTopicWithSchema')} ${topicId} ${schema.id} BINARY` + ); + assert.include(output, topicId); + assert.include(output, schema.id); + assert.include(output, 'created with'); + + const [topic] = await pubsub.topic(topicId).get(); + assert.include(topic.metadata?.schemaSettings?.schema, schema.id); + }); + + it('should delete a schema', async () => { + const schema = await createSchema('proto'); + + const output = execSync(`${commandFor('deleteSchema')} ${schema.id}`); + assert.include(output, schema.id); + assert.include(output, 'deleted.'); + + try { + const got = await pubsub.schema(schema.id).get(); + assert.isNotOk(got, "schema hadn't been deleted"); + } catch (e) { + // This is expected. + } + }); + + it('should get a schema', async () => { + const schema = await createSchema('proto'); + + const output = execSync(`${commandFor('getSchema')} ${schema.id}`); + assert.include(output, schema.id); + assert.include(output, 'info:'); + assert.include(output, 'PROTO'); + }); + + it('should listen for avro records', async () => { + const schema = await createSchema('avro'); + const topic = await createTopicWithSchema(schema.id, Encodings.Json); + const sub = await createSub(topic.name); + + topic.publish( + Buffer.from( + JSON.stringify({ + name: 'Alberta', + post_abbr: 'AB', + }) + ) + ); + await topic.flush(); + + const output = execSync( + `${commandFor('listenForAvroRecords')} ${sub.name} 3` + ); + assert.include(output, 'Received message'); + assert.include(output, 'Alberta'); + assert.include(output, 'AB'); + }); + + it('should listen for protobuf messages', async () => { + const schema = await createSchema('proto'); + const topic = await createTopicWithSchema(schema.id, Encodings.Json); + const sub = await createSub(topic.name); + + topic.publish( + Buffer.from( + JSON.stringify({ + name: 'Quebec', + post_abbr: 'QC', + }) + ) + ); + await topic.flush(); + + const output = execSync( + `${commandFor('listenForProtobufMessages')} ${sub.name} 3` + ); + assert.include(output, 'Received message'); + assert.include(output, 'Quebec'); + assert.include(output, 'QC'); + }); + + it('should list schemas', async () => { + const schema = await createSchema('avro'); + const output = execSync(`${commandFor('listSchemas')}`); + assert.include(output, schema.id); + }); + + it('should publish avro records', async () => { + const schema = await createSchema('avro'); + const topic = await createTopicWithSchema(schema.id, Encodings.Binary); + const sub = await createSub(topic.name); + const deferred = defer(); + sub.on('message', deferred.resolve); + sub.on('error', deferred.reject); + + const output = execSync( + `${commandFor('publishAvroRecords')} ${topic.name}` + ); + assert.include(output, 'published.'); + + const result = (await deferred.promise) as Message; + assert.include(result.data.toString(), 'Ontario'); + + sub.close(); + }); + + it('should publish protobuf messages', async () => { + const schema = await createSchema('proto'); + const topic = await createTopicWithSchema(schema.id, Encodings.Binary); + const sub = await createSub(topic.name); + const deferred = defer(); + sub.on('message', deferred.resolve); + sub.on('error', deferred.reject); + + const output = execSync( + `${commandFor('publishProtobufMessages')} ${topic.name}` + ); + assert.include(output, 'published.'); + + const result = (await deferred.promise) as Message; + assert.include(result.data.toString(), 'Ontario'); + + sub.close(); + }); +}); diff --git a/samples/system-test/typescript.test.ts b/samples/system-test/typescript.test.ts new file mode 100644 index 000000000..f32e05f18 --- /dev/null +++ b/samples/system-test/typescript.test.ts @@ -0,0 +1,68 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {assert} from 'chai'; +import {describe, it, afterEach} from 'mocha'; +import * as cp from 'child_process'; +import * as fs from 'fs'; +import * as rimraf from 'rimraf'; + +const execSync = (cmd: string) => cp.execSync(cmd, {encoding: 'utf-8'}); +const fixtures = './system-test/fixtures'; +const buildDir = `${fixtures}/build`; + +describe('typescript', () => { + it('simplifier.js "commentify" mode works', () => { + if (!fs.existsSync(buildDir)) { + rimraf.sync(buildDir); + } + + execSync( + `node ./utils/simplifier.js commentify ${fixtures}/getSchema-fixture.tstest` + ); + const expected = fs + .readFileSync(`${fixtures}/getSchema-expected.tstest`) + .toString(); + const actual = fs + .readFileSync(`${buildDir}/getSchema-fixture.tstest`) + .toString(); + assert.strictEqual(actual, expected); + + rimraf.sync(`${fixtures}/build`); + }); + + it('simplifier.js "post" mode works', () => { + if (!fs.existsSync(buildDir)) { + rimraf.sync(buildDir); + } + fs.mkdirSync(buildDir); + fs.copyFileSync( + `${fixtures}/getSchema-fixture.jstest`, + `${buildDir}/getSchema-fixture.jstest` + ); + + execSync( + `node ./utils/simplifier.js post ${buildDir}/getSchema-fixture.jstest` + ); + const expected = fs + .readFileSync(`${fixtures}/getSchema-expected.jstest`) + .toString(); + const actual = fs + .readFileSync(`${buildDir}/getSchema-fixture.jstest`) + .toString(); + assert.strictEqual(actual, expected); + + rimraf.sync(`${fixtures}/build`); + }); +}); diff --git a/samples/tsconfig.json b/samples/tsconfig.json new file mode 100644 index 000000000..5923963b4 --- /dev/null +++ b/samples/tsconfig.json @@ -0,0 +1,19 @@ +{ + "extends": "./node_modules/gts/tsconfig-google.json", + "compilerOptions": { + "rootDir": ".", + "outDir": "build", + "resolveJsonModule": true, + "lib": [ + "es2018", + "dom" + ], + "moduleResolution": "node" + }, + "include": [ + "system-test/*.ts" + ], + "exclude": [ + "typescript/" + ] +} diff --git a/samples/typescript/README.md b/samples/typescript/README.md new file mode 100644 index 000000000..34a1f5cae --- /dev/null +++ b/samples/typescript/README.md @@ -0,0 +1,5 @@ +# About TypeScript samples + +In order to provide better samples for our TypeScript users, new samples going forward will be written in TypeScript, in this directory. `npm run compile` will take care of all the steps needed to merge the existing JavaScript samples into `build/` along with the compiled TypeScript samples. + +When you are ready to submit a PR for the updated sample, please run `npm run updatejs`. This will copy the built `*.js` files to the main sample directory, where they can be found by the docsite processors. You may also want to run `npm run lint` from the parent directory, because sometimes removing the typing annotations is enough to upset Prettier's formatting rules. The simplest fix for that is to just go ahead and run `gts fix`, also from the parent directory. diff --git a/samples/typescript/avro-js.d.ts b/samples/typescript/avro-js.d.ts new file mode 100644 index 000000000..1c6c80b44 --- /dev/null +++ b/samples/typescript/avro-js.d.ts @@ -0,0 +1,16 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This one doesn't seem to have typings. +declare module 'avro-js'; diff --git a/samples/typescript/createAvroSchema.ts b/samples/typescript/createAvroSchema.ts new file mode 100644 index 000000000..9e03b143f --- /dev/null +++ b/samples/typescript/createAvroSchema.ts @@ -0,0 +1,66 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Create an Avro based Schema +// description: Creates a new schema definition on a project, using Avro +// usage: node createAvroSchema.js + +// [START pubsub_create_avro_schema] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; +// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json'; + +// Imports the Google Cloud client library +import {PubSub, SchemaTypes} from '@google-cloud/pubsub'; + +import * as fs from 'fs'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createAvroSchema(schemaName: string, avscFile: string) { + const definition: string = fs.readFileSync(avscFile).toString(); + const schema = await pubSubClient.createSchema( + schemaName, + SchemaTypes.Avro, + definition + ); + + const name = await schema.getName(); + console.log(`Schema ${name} created.`); +} +// [END pubsub_create_avro_schema] + +function main( + schemaName = 'YOUR_SCHEMA_NAME', + avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json' +) { + createAvroSchema(schemaName, avscFile).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/createProtoSchema.ts b/samples/typescript/createProtoSchema.ts new file mode 100644 index 000000000..55a2b7b4e --- /dev/null +++ b/samples/typescript/createProtoSchema.ts @@ -0,0 +1,66 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Create a Proto based Schema +// description: Creates a new schema definition on a project, using Protos +// usage: node createProtoSchema.js + +// [START pubsub_create_proto_schema] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; +// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers'; + +// Imports the Google Cloud client library +import {PubSub, SchemaTypes} from '@google-cloud/pubsub'; + +import * as fs from 'fs'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createProtoSchema(schemaName: string, protoFile: string) { + const definition: string = fs.readFileSync(protoFile).toString(); + const schema = await pubSubClient.createSchema( + schemaName, + SchemaTypes.ProtocolBuffer, + definition + ); + + const fullName: string = await schema.getName(); + console.log(`Schema ${fullName} created.`); +} +// [END pubsub_create_proto_schema] + +function main( + schemaName = 'YOUR_SCHEMA_NAME', + protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers' +) { + createProtoSchema(schemaName, protoFile).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/createTopicWithSchema.ts b/samples/typescript/createTopicWithSchema.ts new file mode 100644 index 000000000..770084b56 --- /dev/null +++ b/samples/typescript/createTopicWithSchema.ts @@ -0,0 +1,75 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Create Topic With Schema +// description: Creates a new topic, with a schema definition. +// usage: node createTopicWithSchema.js [encoding-type] + +// [START pubsub_create_topic_with_schema] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicName = 'YOUR_TOPIC_NAME'; +// const schemaName = 'YOUR_SCHEMA_NAME'; +// const encodingType = 'BINARY'; + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createTopicWithSchema( + topicName: string, + schemaName: string, + encodingType: 'BINARY' | 'JSON' +) { + // Get the fully qualified schema name. + const schema = pubSubClient.schema(schemaName); + const fullName = await schema.getName(); + + // Creates a new topic with a schema. Note that you might also + // pass Encodings.Json or Encodings.Binary here. + await pubSubClient.createTopic({ + name: topicName, + schemaSettings: { + schema: fullName, + encoding: encodingType, + }, + }); + console.log(`Topic ${topicName} created with schema ${fullName}.`); +} +// [END pubsub_create_topic_with_schema] + +function main( + topicName = 'YOUR_TOPIC_NAME', + schemaName = 'YOUR_SCHEMA_NAME', + encodingType: 'BINARY' | 'JSON' = 'BINARY' +) { + createTopicWithSchema(topicName, schemaName, encodingType).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/deleteSchema.ts b/samples/typescript/deleteSchema.ts new file mode 100644 index 000000000..568e805f5 --- /dev/null +++ b/samples/typescript/deleteSchema.ts @@ -0,0 +1,55 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Delete a previously created schema +// description: Deletes a schema which was previously created in the project. +// usage: node deleteSchema.js + +// [START pubsub_delete_schema] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function deleteSchema(schemaName: string) { + const schema = pubSubClient.schema(schemaName); + const name = await schema.getName(); + await schema.delete(); + console.log(`Schema ${name} deleted.`); +} +// [END pubsub_delete_schema] + +function main(schemaName = 'YOUR_SCHEMA_NAME') { + deleteSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/getSchema.ts b/samples/typescript/getSchema.ts new file mode 100644 index 000000000..07d7e5f1b --- /dev/null +++ b/samples/typescript/getSchema.ts @@ -0,0 +1,55 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Get a previously created schema +// description: Gets information about a schema which was previously created in the project. +// usage: node getSchema.js + +// [START pubsub_get_schema] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const schemaName = 'YOUR_SCHEMA_NAME'; + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function getSchema(schemaName: string) { + const schema = pubSubClient.schema(schemaName); + const info = await schema.get(); + const fullName = await schema.getName(); + console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`); +} +// [END pubsub_get_schema] + +function main(schemaName = 'YOUR_SCHEMA_NAME') { + getSchema(schemaName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/listSchemas.ts b/samples/typescript/listSchemas.ts new file mode 100644 index 000000000..9e8bbf6b4 --- /dev/null +++ b/samples/typescript/listSchemas.ts @@ -0,0 +1,51 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: List schemas on a project +// description: Gets a list of schemas which were previously created in the project. +// usage: node listSchemas.js + +// [START pubsub_list_schemas] + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function listSchemas() { + for await (const s of pubSubClient.listSchemas()) { + console.log(await s.name); + } + console.log('Listed schemas.'); +} +// [END pubsub_list_schemas] + +function main() { + listSchemas().catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(); diff --git a/samples/typescript/listenForAvroRecords.ts b/samples/typescript/listenForAvroRecords.ts new file mode 100644 index 000000000..6d52bce9a --- /dev/null +++ b/samples/typescript/listenForAvroRecords.ts @@ -0,0 +1,106 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Listen For Avro Records +// description: Listens for records in Avro encoding from a subscription. +// usage: node listenForAvroRecords.js [timeout-in-seconds] + +// [START pubsub_subscribe_avro_records] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME'; +// const timeout = 60; + +// Imports the Google Cloud client library +import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub'; + +// Node FS library, to load definitions +import * as fs from 'fs'; + +// And the Apache Avro library +import * as avro from 'avro-js'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +function listenForAvroRecords(subscriptionName: string, timeout: number) { + // References an existing subscription + const subscription = pubSubClient.subscription(subscriptionName); + + // Make an encoder using the official avro-js library. + const definition = fs + .readFileSync('system-test/fixtures/provinces.avsc') + .toString(); + const type = avro.parse(definition); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = async (message: Message) => { + // "Ack" (acknowledge receipt of) the message + message.ack(); + + // Get the schema metadata from the message. + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + + let result: object | undefined; + switch (schemaMetadata.encoding) { + case Encodings.Binary: + result = type.fromBuffer(message.data); + break; + case Encodings.Json: + result = type.fromString(message.data.toString()); + break; + default: + console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`); + break; + } + + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${JSON.stringify(result, null, 4)}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; + }; + + // Listen for new messages until timeout is hit + subscription.on('message', messageHandler); + + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_subscribe_avro_records] + +function main(subscriptionName = 'YOUR_SUBSCRIPTION_NAME', timeout = 60) { + timeout = Number(timeout); + + try { + listenForAvroRecords(subscriptionName, timeout); + } catch (err) { + console.error(err.message); + process.exitCode = 1; + } +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/listenForProtobufMessages.ts b/samples/typescript/listenForProtobufMessages.ts new file mode 100644 index 000000000..490b2dc69 --- /dev/null +++ b/samples/typescript/listenForProtobufMessages.ts @@ -0,0 +1,109 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Listen For Protobuf Messages +// description: Listens for messages in protobuf encoding from a subscription. +// usage: node listenForProtobufMessages.js [timeout-in-seconds] + +// [START pubsub_subscribe_proto_messages] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME'; +// const timeout = 60; + +// Imports the Google Cloud client library +import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub'; + +// And the protobufjs library +import * as protobuf from 'protobufjs'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function listenForProtobufMessages( + subscriptionName: string, + timeout: number +) { + // References an existing subscription + const subscription = pubSubClient.subscription(subscriptionName); + + // Make an decoder using the protobufjs library. + // + // Since we're providing the test message for a specific schema here, we'll + // also code in the path to a sample proto definition. + const root = protobuf.loadSync('system-test/fixtures/provinces.proto'); + const Province = root.lookupType('utilities.Province'); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = async (message: Message) => { + // "Ack" (acknowledge receipt of) the message + message.ack(); + + // Get the schema metadata from the message. + const schemaMetadata = Schema.metadataFromMessage(message.attributes); + + let result; + switch (schemaMetadata.encoding) { + case Encodings.Binary: + result = Province.decode(message.data); + break; + case Encodings.Json: + // This doesn't require decoding with the protobuf library, + // since it's plain JSON. But you can still validate it against + // your schema. + result = JSON.parse(message.data.toString()); + console.log(`Validation of JSON: ${Province.verify(result)}`); + break; + default: + console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`); + break; + } + + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${JSON.stringify(result, null, 4)}`); + console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`); + messageCount += 1; + }; + + // Listen for new messages until timeout is hit + subscription.on('message', messageHandler); + + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_subscribe_proto_messages] + +function main(subscriptionName = 'YOUR_SUBSCRIPTION_NAME', timeout = 60) { + timeout = Number(timeout); + + listenForProtobufMessages(subscriptionName, timeout).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/publishAvroRecords.ts b/samples/typescript/publishAvroRecords.ts new file mode 100644 index 000000000..b7215a55b --- /dev/null +++ b/samples/typescript/publishAvroRecords.ts @@ -0,0 +1,101 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Publish Avro Records to a Topic +// description: Publishes a record in Avro to a topic with a schema. +// usage: node publishAvroRecords.js + +// [START pubsub_publish_avro_records] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const topicName = 'YOUR_TOPIC_NAME'; + +// Imports the Google Cloud client library +import {PubSub, Encodings} from '@google-cloud/pubsub'; + +// And the Apache Avro library +import * as avro from 'avro-js'; +import * as fs from 'fs'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +interface ProvinceObject { + name: string; + post_abbr: string; +} + +async function publishAvroRecords(topicName: string) { + // Get the topic metadata to learn about its schema encoding. + const topic = pubSubClient.topic(topicName); + const [topicMetadata] = await topic.getMetadata(); + const topicSchemaMetadata = topicMetadata.schemaSettings; + + if (!topicSchemaMetadata) { + console.log(`Topic ${topicName} doesn't seem to have a schema.`); + return; + } + const schemaEncoding = topicSchemaMetadata.encoding; + + // Make an encoder using the official avro-js library. + const definition = fs + .readFileSync('system-test/fixtures/provinces.avsc') + .toString(); + const type = avro.parse(definition); + + // Encode the message. + const province: ProvinceObject = { + name: 'Ontario', + post_abbr: 'ON', + }; + let dataBuffer: Buffer | undefined; + switch (schemaEncoding) { + case Encodings.Binary: + dataBuffer = type.toBuffer(province); + break; + case Encodings.Json: + dataBuffer = Buffer.from(type.toString(province)); + break; + default: + console.log(`Unknown schema encoding: ${schemaEncoding}`); + break; + } + if (!dataBuffer) { + console.log(`Invalid encoding ${schemaEncoding} on the topic.`); + return; + } + + const messageId = await topic.publish(dataBuffer); + console.log(`Avro record ${messageId} published.`); +} +// [END pubsub_publish_avro_records] + +function main(topicName = 'YOUR_TOPIC_NAME') { + publishAvroRecords(topicName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/publishProtobufMessages.ts b/samples/typescript/publishProtobufMessages.ts new file mode 100644 index 000000000..e66f109ca --- /dev/null +++ b/samples/typescript/publishProtobufMessages.ts @@ -0,0 +1,103 @@ +// Copyright 2019-2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Publish Protobuf Messages to a Topic +// description: Publishes a message in protobuf form to a topic with a schema. +// usage: node publishProtobufMessages.js + +// [START pubsub_publish_proto_messages] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const topicName = 'YOUR_TOPIC_NAME'; + +// Imports the Google Cloud client library +import {PubSub, Encodings} from '@google-cloud/pubsub'; + +// And the protobufjs library +import * as protobuf from 'protobufjs'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +interface ProvinceObject { + name: string; + postAbbr: string; +} + +async function publishProtobufMessages(topicName: string) { + // Get the topic metadata to learn about its schema. + const topic = pubSubClient.topic(topicName); + const [topicMetadata] = await topic.getMetadata(); + const topicSchemaMetadata = topicMetadata.schemaSettings; + + if (!topicSchemaMetadata) { + console.log(`Topic ${topicName} doesn't seem to have a schema.`); + return; + } + const schemaEncoding = topicSchemaMetadata.encoding; + + // Encode the message. + const province: ProvinceObject = { + name: 'Ontario', + postAbbr: 'ON', + }; + + // Make an encoder using the protobufjs library. + // + // Since we're providing the test message for a specific schema here, we'll + // also code in the path to a sample proto definition. + const root = await protobuf.load('system-test/fixtures/provinces.proto'); + const Province = root.lookupType('utilities.Province'); + const message = Province.create(province); + + let dataBuffer: Buffer | undefined; + switch (schemaEncoding) { + case Encodings.Binary: + dataBuffer = Buffer.from(Province.encode(message).finish()); + break; + case Encodings.Json: + dataBuffer = Buffer.from(JSON.stringify(message.toJSON())); + break; + default: + console.log(`Unknown schema encoding: ${schemaEncoding}`); + break; + } + if (!dataBuffer) { + console.log(`Invalid encoding ${schemaEncoding} on the topic.`); + return; + } + + const messageId = await topic.publish(dataBuffer); + console.log(`Protobuf message ${messageId} published.`); +} +// [END pubsub_publish_proto_messages] + +function main(topicName = 'YOUR_TOPIC_NAME') { + publishProtobufMessages(topicName).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/tsconfig.json b/samples/typescript/tsconfig.json new file mode 100644 index 000000000..7b0d858f1 --- /dev/null +++ b/samples/typescript/tsconfig.json @@ -0,0 +1,21 @@ +{ + "extends": "../node_modules/gts/tsconfig-google.json", + "compilerOptions": { + "rootDir": "..", + "outDir": "../build", + "resolveJsonModule": true, + "lib": [ + "es2018", + "dom" + ], + "module": "es2020", + "sourceMap": false, + "moduleResolution": "node" + }, + "include": [ + "./build/*.ts" + ], + "exclude": [ + "./*.ts" + ] +} diff --git a/samples/utils/simplifier.js b/samples/utils/simplifier.js new file mode 100644 index 000000000..24e25f64c --- /dev/null +++ b/samples/utils/simplifier.js @@ -0,0 +1,95 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + +This little utility gets us over the last hop of converting from +TypeScript samples to pretty reasonable JavaScript (ES6) samples. It +just replaces blank lines with '//BLANK' and then removes it in the +output, so TypeScript doesn't throw away our blank links. Everything +else is more or less handled by setting the module type to 'es2020'. + +...which keeps 'import' statements, so we convert those require(). +We also add a comment warning about editing the generated samples. + +usage: + +node simplifier.js commentify typescript/*.ts + This rewrites all .ts files in 'typescript' into typescript/build/*.ts + +node simplifier.js post build/typescript/build/*.js + This undoes the commentifying on the built sample, and other finishing touches. + +*/ + +// We test with Node 12. +// eslint-disable-next-line node/no-unsupported-features/node-builtins +const fs = require('fs').promises; +const path = require('path'); + +async function main(args) { + const mode = args.shift(); + if (mode === 'commentify') { + for (const fn of args) { + // Read the whole file. + let contents = (await fs.readFile(fn)).toString(); + + // Replace all blank lines with a known comment. + const blankLines = /^\s*\r?\n/gm; + contents = contents.replace(blankLines, '//BLANK\n'); + + // Write out it out for tsc to find. + const outputFn = path.join(path.dirname(fn), 'build', path.basename(fn)); + await fs.mkdir(path.dirname(outputFn), { + recursive: true, + }); + await fs.writeFile(outputFn, contents); + } + } else if (mode === 'post') { + for (const fn of args) { + // Read the whole file. + let contents = (await fs.readFile(fn)).toString(); + + // Convert back the comments to blank lines. + const blankedLines = /^\s*\/\/BLANK\r?\n/gm; + contents = contents.replace(blankedLines, '\n'); + + // Plain JS samples need to use require(), not import. + const esImports = /^\s*import \{\s*([^}]+[^\s])\s*\} from '([^']+)';$/gm; + contents = contents.replace(esImports, "const {$1} = require('$2');"); + const esStarImports = /^\s*import \* as ([^\s]+) from '([^']+)';$/gm; + contents = contents.replace(esStarImports, "const $1 = require('$2');"); + + // Add a "generated" warning. + const sampleMeta = /^(\/\/ sample-metadata:)$/gm; + contents = contents.replace( + sampleMeta, + "// This is a generated sample. Please see typescript/README.md for more info.\n\n'use strict';\n\n$1" + ); + + // TypeScript shifts things to 4 spaces, move them back to 2. + const extraSpaces = /^(\/\/[^\w]*){0}(( {4})+)/gm; + contents = contents.replace(extraSpaces, match => + ' '.repeat(match.length / 2) + ); + + await fs.writeFile(fn, contents); + } + } +} + +main(Array.from(process.argv).slice(2)).catch(e => { + console.error(e); + process.exitCode = 1; +}); diff --git a/src/pubsub.ts b/src/pubsub.ts index 6ab3b526b..046a2e5ea 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -32,6 +32,7 @@ import { ICreateSchemaRequest, SchemaViews, ISchema, + SchemaView, } from './schema'; import {Snapshot} from './snapshot'; import { @@ -790,7 +791,7 @@ export class PubSub { * } */ async *listSchemas( - view: google.pubsub.v1.SchemaView = SchemaViews.Basic, + view: SchemaView = SchemaViews.Basic, options?: CallOptions ): AsyncIterable { const client = await this.getSchemaClient_(); diff --git a/src/schema.ts b/src/schema.ts index c5cd00759..a5ca74e0e 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -136,7 +136,7 @@ export class Schema { * @returns {Promise} */ async get( - view: google.pubsub.v1.SchemaView = SchemaViews.Full, + view: SchemaView = SchemaViews.Full, gaxOpts?: CallOptions ): Promise { const client = await this.pubsub.getSchemaClient_(); @@ -241,7 +241,7 @@ export class Schema { name: attributes['googclient_schemaname'], encoding: attributes[ 'googclient_schemaencoding' - ] as unknown as keyof typeof google.pubsub.v1.Encoding, + ] as unknown as SchemaEncoding, }; } } @@ -259,26 +259,26 @@ export interface SchemaMessageMetadata { /** * Encoding; this will be Encodings.Json or Encodings.Binary. */ - encoding: keyof typeof google.pubsub.v1.Encoding | undefined; + encoding: SchemaEncoding | undefined; } // Export all of these so that clients don't have to dig for them. export type CreateSchemaResponse = google.pubsub.v1.Schema; export type ISchema = google.pubsub.v1.ISchema; -export type SchemaType = google.pubsub.v1.Schema.Type; -export type SchemaView = google.pubsub.v1.SchemaView; +export type SchemaType = keyof typeof google.pubsub.v1.Schema.Type; +export type SchemaView = keyof typeof google.pubsub.v1.SchemaView; export type ICreateSchemaRequest = google.pubsub.v1.ICreateSchemaRequest; -export type SchemaEncoding = google.pubsub.v1.Encoding; +export type SchemaEncoding = keyof typeof google.pubsub.v1.Encoding; // Also export these for JavaScript compatible usage. export const SchemaTypes = { - ProtocolBuffer: google.pubsub.v1.Schema.Type.PROTOCOL_BUFFER, - Avro: google.pubsub.v1.Schema.Type.AVRO, + ProtocolBuffer: 'PROTOCOL_BUFFER' as const, + Avro: 'AVRO' as const, }; export const SchemaViews = { - Basic: google.pubsub.v1.SchemaView.BASIC, - Full: google.pubsub.v1.SchemaView.FULL, + Basic: 'BASIC' as const, + Full: 'FULL' as const, }; // These are not schema-specific, but this seems to be the diff --git a/test/pubsub.ts b/test/pubsub.ts index 44be60642..c7dff0ce3 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -1591,9 +1591,8 @@ describe('PubSub', () => { it('should close the schema client when it has been opened', async () => { // Force it to create a client. const client = await pubsub.getSchemaClient_(); - const stub = sandbox.stub(client, 'close').resolves(); - pubsub.close(); - await stub; + sandbox.stub(client, 'close').resolves(); + await pubsub.close(); }); // I feel like this ought to be a test, but something in getSchemaClient_() @@ -1635,7 +1634,7 @@ describe('PubSub', () => { sandbox.stub(client, 'listSchemasAsync').callsFake((req, gaxOpts) => { assert.strictEqual(req!.parent, pubsub.name); - assert.strictEqual(req!.view, google.pubsub.v1.SchemaView.BASIC); + assert.strictEqual(req!.view, 'BASIC'); assert.deepStrictEqual(gaxOpts, {}); return toAsync([ { @@ -1662,7 +1661,7 @@ describe('PubSub', () => { const client = await pubsub.getSchemaClient_(); sandbox.stub(client, 'listSchemasAsync').callsFake(req => { - assert.strictEqual(req!.view, google.pubsub.v1.SchemaView.BASIC); + assert.strictEqual(req!.view, 'BASIC'); // eslint-disable-next-line @typescript-eslint/no-explicit-any return toAsync([]) as any; }); diff --git a/test/schema.ts b/test/schema.ts index 93342aa50..578a1e61d 100644 --- a/test/schema.ts +++ b/test/schema.ts @@ -99,7 +99,7 @@ describe('Schema', () => { .callsFake(async (params, gaxOpts) => { const name = await schema.getName(); assert.strictEqual(params.name, name); - assert.strictEqual(params.view, google.pubsub.v1.SchemaView.FULL); + assert.strictEqual(params.view, 'FULL'); assert.deepStrictEqual(gaxOpts, {}); called = true; return [ischema]; @@ -115,7 +115,7 @@ describe('Schema', () => { it('defaults to FULL when get() is called', async () => { let called = false; sandbox.stub(schemaClient, 'getSchema').callsFake(async params => { - assert.strictEqual(params.view, google.pubsub.v1.SchemaView.FULL); + assert.strictEqual(params.view, 'FULL'); called = true; return [ischema]; });