diff --git a/lib/pubsub/iam.js b/lib/pubsub/iam.js index 67c1b76171f..66b77164303 100644 --- a/lib/pubsub/iam.js +++ b/lib/pubsub/iam.js @@ -20,13 +20,22 @@ 'use strict'; -var is = require('is'); var arrify = require('arrify'); +var is = require('is'); +var nodeutil = require('util'); + +/** + * @type {module:common/serviceObject} + * @private + */ +var ServiceObject = require('../common/service-object.js'); /*! Developer Documentation * - * @param {module:pubsub} pubsub - PubSub Object - * @param {string} resource - topic or subscription name + * @param {module:pubsub} pubsub - PubSub Object. + * @param {object} config - Configuration object. + * @param {string} config.baseUrl - The base URL to apply to API requests. + * @param {string} config.id - The name of the topic or subscription. */ /** * [IAM (Identity and Access Management)](https://cloud.google.com/pubsub/access_control) @@ -66,11 +75,19 @@ var arrify = require('arrify'); * var subscription = pubsub.subscription('my-subscription'); * // subscription.iam */ -function IAM(pubsub, resource) { - this.resource = resource; - this.makeReq_ = pubsub.makeReq_.bind(pubsub); +function IAM(pubsub, scope) { + ServiceObject.call(this, { + parent: pubsub, + baseUrl: scope.baseUrl, + id: scope.id, + methods: { + // Nothing needed other than the `request` method. + } + }); } +nodeutil.inherits(IAM, ServiceObject); + /** * Get the IAM policy * @@ -90,9 +107,9 @@ function IAM(pubsub, resource) { * subscription.iam.getPolicy(function(err, policy, apiResponse) {}); */ IAM.prototype.getPolicy = function(callback) { - var path = this.resource + ':getIamPolicy'; - - this.makeReq_('GET', path, null, null, function(err, resp) { + this.request({ + uri: ':getIamPolicy' + }, function(err, resp) { if (err) { callback(err, null, resp); return; @@ -138,15 +155,16 @@ IAM.prototype.getPolicy = function(callback) { */ IAM.prototype.setPolicy = function(policy, callback) { if (!is.object(policy)) { - throw new Error('A policy is required'); + throw new Error('A policy object is required.'); } - var path = this.resource + ':setIamPolicy'; - var body = { - policy: policy - }; - - this.makeReq_('POST', path, null, body, function(err, resp) { + this.request({ + method: 'POST', + uri: ':setIamPolicy', + json: { + policy: policy + } + }, function(err, resp) { if (err) { callback(err, null, resp); return; @@ -207,23 +225,26 @@ IAM.prototype.setPolicy = function(policy, callback) { */ IAM.prototype.testPermissions = function(permissions, callback) { if (!is.array(permissions) && !is.string(permissions)) { - throw new Error('Permissions are required'); + throw new Error('Permissions are required.'); } - var path = this.resource + ':testIamPermissions'; - var body = { - permissions: arrify(permissions) - }; + permissions = arrify(permissions); - this.makeReq_('POST', path, null, body, function(err, resp) { + this.request({ + method: 'POST', + uri: ':testIamPermissions', + json: { + permissions: permissions + } + }, function(err, resp) { if (err) { callback(err, null, resp); return; } - var availablePermissions = resp.permissions || []; + var availablePermissions = arrify(resp.permissions); - var permissionsHash = body.permissions.reduce(function(acc, permission) { + var permissionsHash = permissions.reduce(function(acc, permission) { acc[permission] = availablePermissions.indexOf(permission) > -1; return acc; }, {}); diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 9a9cf551716..672f2d1e7c8 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -20,8 +20,15 @@ 'use strict'; -var format = require('string-format-obj'); +var arrify = require('arrify'); var is = require('is'); +var nodeutil = require('util'); + +/** + * @type {module:common/service} + * @private + */ +var Service = require('../common/service.js'); /** * @type {module:pubsub/subscription} @@ -47,21 +54,6 @@ var Topic = require('./topic.js'); */ var util = require('../common/util.js'); -/** - * @const {string} Base URL for Pub/Sub API. - * @private - */ -var PUBSUB_BASE_URL = 'https://pubsub.googleapis.com/v1/'; - -/** - * @const {array} Required scopes for Pub/Sub API. - * @private - */ -var SCOPES = [ - 'https://www.googleapis.com/auth/pubsub', - 'https://www.googleapis.com/auth/cloud-platform' -]; - /** * [Google Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a * reliable, many-to-many, asynchronous messaging service from Google Cloud @@ -86,17 +78,191 @@ function PubSub(options) { return new PubSub(options); } - this.makeAuthenticatedRequest_ = util.makeAuthenticatedRequestFactory({ - credentials: options.credentials, - keyFile: options.keyFilename, - scopes: SCOPES, - email: options.email - }); + var config = { + baseUrl: 'https://pubsub.googleapis.com/v1', + scopes: [ + 'https://www.googleapis.com/auth/pubsub', + 'https://www.googleapis.com/auth/cloud-platform' + ] + }; - this.projectId = options.projectId; - this.projectName = 'projects/' + this.projectId; + Service.call(this, config, options); } +nodeutil.inherits(PubSub, Service); + +/** + * Create a topic with the given name. + * + * @resource [Topics: create API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create} + * + * @param {string} name - Name of the topic. + * @param {function=} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {module:pubsub/topic} callback.topic - The newly created topic. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * pubsub.createTopic('my-new-topic', function(err, topic, apiResponse) { + * if (!err) { + * // The topic was created successfully. + * } + * }); + */ +PubSub.prototype.createTopic = function(name, callback) { + var self = this; + + callback = callback || util.noop; + + this.request({ + method: 'PUT', + uri: '/topics/' + name, + }, function(err, resp) { + if (err) { + callback(err, null, resp); + return; + } + + var topic = self.topic(name); + topic.metadata = resp; + + callback(null, topic, resp); + }); +}; + +/** + * Get a list of the subscriptions registered to all of your project's topics. + * You may optionally provide a query object as the first argument to customize + * the response. + * + * Your provided callback will be invoked with an error object if an API error + * occurred or an array of {@linkcode module:pubsub/subscription} objects. + * + * To get subscriptions for a topic, see {module:pubsub/topic}. + * + * @resource [Subscriptions: list API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list} + * + * @param {object=} options - Configuration object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: true. + * @param {string|module:pubsub/topic} options.topic - The name of the topic to + * list subscriptions from. + * @param {number} options.pageSize - Maximum number of results to return. + * @param {string} options.pageToken - Page token. + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {module:pubsub/subscription[]} callback.subscriptions - The list of + * subscriptions returned. + * @param {?object} callback.nextQuery - A query object representing the next + * page of topics. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * pubsub.getSubscriptions(function(err, subscriptions) { + * if (!err) { + * // subscriptions is an array of Subscription objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- + * var callback = function(err, subscriptions, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results exist. + * pubsub.getSubscriptions(nextQuery, callback); + * } + * }; + * + * pubsub.getSubscriptions({ + * autoPaginate: false + * }, callback); + * + * //- + * // Get the subscriptions as a readable object stream. + * //- + * pubsub.getSubscriptions() + * .on('error', console.error) + * .on('data', function(subscription) { + * // subscription is a Subscription object. + * }) + * .on('end', function() { + * // All subscriptions retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * pubsub.getSubscriptions() + * .on('data', function(topic) { + * this.end(); + * }); + */ +PubSub.prototype.getSubscriptions = function(options, callback) { + var self = this; + + if (is.fn(options)) { + callback = options; + options = {}; + } + + options = options || {}; + + var topicName; + + if (is.string(options.topic)) { + topicName = options.topic; + } else if (options.topic instanceof Topic) { + topicName = options.topic.unformattedName; + } + + var query = {}; + + if (options.pageSize) { + query.pageSize = options.pageSize; + } + + if (options.pageToken) { + query.pageToken = options.pageToken; + } + + this.request({ + uri: (topicName ? '/topics/' + topicName : '') + '/subscriptions', + qs: query + }, function(err, resp) { + if (err) { + callback(err, null, null, resp); + return; + } + + var subscriptions = arrify(resp.subscriptions).map(function(sub) { + // Depending on if we're using a subscriptions.list or + // topics.subscriptions.list API endpoint, we will get back a + // Subscription resource or just the name of the subscription. + var subscriptionInstance = self.subscription(sub.name || sub); + + if (sub.name) { + subscriptionInstance.metadata = sub; + } + + return subscriptionInstance; + }); + + var nextQuery = null; + + if (resp.nextPageToken) { + nextQuery = options; + nextQuery.pageToken = resp.nextPageToken; + } + + callback(null, subscriptions, nextQuery, resp); + }); +}; + /** * Get a list of the topics registered to your project. You may optionally * provide a query object as the first argument to customize the response. @@ -168,59 +334,34 @@ function PubSub(options) { */ PubSub.prototype.getTopics = function(query, callback) { var self = this; + if (!callback) { callback = query; query = {}; } - var path = this.projectName + '/topics'; - this.makeReq_('GET', path, query, true, function(err, result) { + + this.request({ + uri: '/topics', + qs: query + }, function(err, result) { if (err) { callback(err, null, null, result); return; } - var topics = (result.topics || []).map(function(item) { - var topicInstance = self.topic(item.name); - topicInstance.metadata = item; + + var topics = arrify(result.topics).map(function(topic) { + var topicInstance = self.topic(topic.name); + topicInstance.metadata = topic; return topicInstance; }); + var nextQuery = null; if (result.nextPageToken) { nextQuery = query; nextQuery.pageToken = result.nextPageToken; } - callback(null, topics, nextQuery, result); - }); -}; -/** - * Create a topic with the given name. - * - * @resource [Topics: create API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create} - * - * @param {string} name - Name of the topic. - * @param {function=} callback - The callback function. - * @param {?error} callback.err - An error from the API call, may be null. - * @param {module:pubsub/topic} callback.topic - The newly created topic. - * @param {object} callback.apiResponse - The full API response from the - * service. - * - * @example - * pubsub.createTopic('my-new-topic', function(err, topic, apiResponse) { - * if (!err) { - * // The topic was created successfully. - * } - * }); - */ -PubSub.prototype.createTopic = function(name, callback) { - callback = callback || util.noop; - var topic = this.topic(name); - var path = this.projectName + '/topics/' + name; - this.makeReq_('PUT', path, null, null, function(err, result) { - if (err) { - callback(err, null, result); - return; - } - callback(null, topic, result); + callback(null, topics, nextQuery, result); }); }; @@ -285,6 +426,8 @@ PubSub.prototype.createTopic = function(name, callback) { * pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {}); */ PubSub.prototype.subscribe = function(topic, subName, options, callback) { + var self = this; + if (!is.string(topic) && !(topic instanceof Topic)) { throw new Error('A Topic is required for a new subscription.'); } @@ -312,23 +455,25 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) { body.ackDeadlineSeconds = options.ackDeadlineSeconds; } - var subscription = this.subscription(subName, options); - - this.makeReq_('PUT', subscription.name, null, body, function(err, result) { + this.request({ + method: 'PUT', + uri: '/subscriptions/' + subName, + json: body + }, function(err, resp) { if (err && !(err.code === 409 && options.reuseExisting)) { - callback(err, null, result); + callback(err, null, resp); return; } - callback(null, subscription, result); + var subscription = self.subscription(resp.name, options); + callback(null, subscription, resp); }); }; /** - * Create a Subscription object in reference to an existing subscription. This - * command by itself will not run any API requests. You will receive a - * {@linkcode module:pubsub/subscription} object, which will allow you to - * interact with your subscription. + * Create a Subscription object. This command by itself will not run any API + * requests. You will receive a {@linkcode module:pubsub/subscription} object, + * which will allow you to interact with a subscription. * * @throws {Error} If a name is not provided. * @@ -341,7 +486,7 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) { * @return {module:pubsub/subscription} * * @example - * var subscription = pubsub.subscription('my-existing-subscription'); + * var subscription = pubsub.subscription('my-subscription'); * * // Register a listener for `message` events. * subscription.on('message', function(message) { @@ -362,8 +507,7 @@ PubSub.prototype.subscription = function(name, options) { }; /** - * Create a Topic object to reference an existing topic. See - * {module:pubsub/createTopic} to create a topic. + * Create a Topic object. See {module:pubsub/createTopic} to create a topic. * * @throws {Error} If a name is not provided. * @@ -371,7 +515,7 @@ PubSub.prototype.subscription = function(name, options) { * @return {module:pubsub/topic} * * @example - * var topic = pubsub.topic('my-existing-topic'); + * var topic = pubsub.topic('my-topic'); * * topic.publish({ * data: 'New message!' @@ -385,162 +529,6 @@ PubSub.prototype.topic = function(name) { return new Topic(this, name); }; -/** - * Get a list of the subscriptions registered to all of your project's topics. - * You may optionally provide a query object as the first argument to customize - * the response. - * - * Your provided callback will be invoked with an error object if an API error - * occurred or an array of {@linkcode module:pubsub/subscription} objects. - * - * To get subscriptions for a topic, see {module:pubsub/topic}. - * - * @resource [Subscriptions: list API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list} - * - * @param {object=} options - Configuration object. - * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: true. - * @param {string|module:pubsub/topic} options.topic - The name of the topic to - * list subscriptions from. - * @param {number} options.pageSize - Maximum number of results to return. - * @param {string} options.pageToken - Page token. - * @param {function} callback - The callback function. - * @param {?error} callback.err - An error from the API call, may be null. - * @param {module:pubsub/subscription[]} callback.subscriptions - The list of - * subscriptions returned. - * @param {?object} callback.nextQuery - A query object representing the next - * page of topics. - * @param {object} callback.apiResponse - The full API response from the - * service. - * - * @example - * pubsub.getSubscriptions(function(err, subscriptions) { - * if (!err) { - * // subscriptions is an array of Subscription objects. - * } - * }); - * - * //- - * // To control how many API requests are made and page through the results - * // manually, set `autoPaginate` to `false`. - * //- - * var callback = function(err, subscriptions, nextQuery, apiResponse) { - * if (nextQuery) { - * // More results exist. - * pubsub.getSubscriptions(nextQuery, callback); - * } - * }; - * - * pubsub.getSubscriptions({ - * autoPaginate: false - * }, callback); - * - * //- - * // Get the subscriptions as a readable object stream. - * //- - * pubsub.getSubscriptions() - * .on('error', console.error) - * .on('data', function(subscription) { - * // subscription is a Subscription object. - * }) - * .on('end', function() { - * // All subscriptions retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. - * //- - * pubsub.getSubscriptions() - * .on('data', function(topic) { - * this.end(); - * }); - */ -PubSub.prototype.getSubscriptions = function(options, callback) { - var self = this; - - if (is.fn(options)) { - callback = options; - options = {}; - } - - options = options || {}; - - var topicName; - - if (is.string(options.topic)) { - topicName = options.topic; - } else if (options.topic instanceof Topic) { - topicName = options.topic.unformattedName; - } - - var query = {}; - - if (options.pageSize) { - query.pageSize = options.pageSize; - } - - if (options.pageToken) { - query.pageToken = options.pageToken; - } - - var apiPath = format('{projectPath}{topicPath}/subscriptions', { - projectPath: 'projects/' + this.projectId, - topicPath: topicName ? '/topics/' + topicName : '' - }); - - this.makeReq_('GET', apiPath, query, null, function(err, result) { - if (err) { - callback(err, null, null, result); - return; - } - - var subscriptions = (result.subscriptions || []).map(function(sub) { - return new Subscription(self, { - // Depending on if we're using a subscriptions.list or - // topics.subscriptions.list API endpoint, we will get back a - // Subscription resource or just the name of the subscription. - name: sub.name || sub - }); - }); - - var nextQuery = null; - - if (result.nextPageToken) { - nextQuery = options; - nextQuery.pageToken = result.nextPageToken; - } - - callback(null, subscriptions, nextQuery, result); - }); -}; - -/** - * Make a new request object from the provided arguments and wrap the callback - * to intercept non-successful responses. - * - * @private - * - * @param {string} method - Action. - * @param {string} path - Request path. - * @param {*} query - Request query object. - * @param {*} body - Request body contents. - * @param {function} callback - The callback function. - */ -PubSub.prototype.makeReq_ = function(method, path, q, body, callback) { - var reqOpts = { - method: method, - qs: q, - uri: PUBSUB_BASE_URL + path - }; - - if (body) { - reqOpts.json = body; - } - - this.makeAuthenticatedRequest_(reqOpts, callback); -}; - /*! Developer Documentation * * These methods can be used with either a callback or as a readable object diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index 4ff223f6514..ec0a146a794 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -23,19 +23,26 @@ var arrify = require('arrify'); var events = require('events'); var is = require('is'); -var nodeutil = require('util'); +var modelo = require('modelo'); +var prop = require('propprop'); /** - * @type {module:common/util} + * @type {module:pubsub/iam} * @private */ -var util = require('../common/util.js'); +var IAM = require('./iam.js'); /** - * @type {module:pubsub/iam} + * @type {module:common/serviceObject} * @private */ -var IAM = require('./iam'); +var ServiceObject = require('../common/service-object.js'); + +/** + * @type {module:common/util} + * @private + */ +var util = require('../common/util.js'); /*! Developer Documentation * @@ -103,7 +110,7 @@ var IAM = require('./iam'); * // From {@linkcode module:pubsub/topic#subscription}: * //- * var topic = pubsub.topic('my-topic'); - * var subscription = topic.subscription('my-existing-subscription'); + * var subscription = topic.subscription('my-subscription'); * // `subscription` is a Subscription object. * * //- @@ -135,20 +142,95 @@ var IAM = require('./iam'); * subscription.removeListener('message', onMessage); */ function Subscription(pubsub, options) { - events.EventEmitter.call(this); + var baseUrl = '/subscriptions'; + var unformattedName = options.name.split('/').pop(); + + var methods = { + /** + * Check if the subscription exists. + * + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error returned while making this + * request. + * @param {boolean} callback.exists - Whether the subscription exists or + * not. + * + * @example + * subscription.exists(function(err, exists) {}); + */ + exists: true, + + /** + * Get a subscription if it exists. + * + * You may optionally use this to "get or create" an object by providing an + * object with `autoCreate` set to `true`. Any extra configuration that is + * normally required for the `create` method must be contained within this + * object as well. + * + * **`autoCreate` is only available if you accessed this object + * through {module:pubsub/topic#subscription}.** + * + * @param {options=} options - Configuration object. + * @param {boolean} options.autoCreate - Automatically create the object if + * it does not exist. Default: `false` + * + * @example + * subscription.get(function(err, subscription, apiResponse) { + * // `subscription.metadata` has been populated. + * }); + */ + get: true, + + /** + * Get the metadata for the subscription. + * + * @resource [Subscriptions: get API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get} + * + * @param {function} callback - The callback function. + * @param {?error} callback.err - An API error. + * @param {?object} callback.metadata - Metadata of the subscription from + * the API. + * @param {object} callback.apiResponse - Raw API response. + * + * @example + * subscription.getMetadata(function(err, metadata, apiResponse) {}); + */ + getMetadata: true + }; - this.name = Subscription.formatName_(pubsub.projectId, options.name); + var config = { + parent: pubsub, + baseUrl: baseUrl, + id: unformattedName, + methods: methods + }; - this.makeReq_ = pubsub.makeReq_.bind(pubsub); + if (options.topic) { + // Only a subscription with knowledge of its topic can be created. + config.createMethod = pubsub.subscribe.bind(pubsub, options.topic); + delete options.topic; + + /** + * Create a subscription. + * + * **This is only available if you accessed this object through + * {module:pubsub/topic#subscription}.** + * + * @param {object} config - See {module:pubsub#subscribe}. + * + * @example + * subscription.create(function(err, subscription, apiResponse) { + * if (!err) { + * // The subscription was created successfully. + * } + * }); + */ + config.methods.create = true; + } - this.autoAck = is.boolean(options.autoAck) ? options.autoAck : false; - this.closed = true; - this.interval = is.number(options.interval) ? options.interval : 10; - this.inProgressAckIds = {}; - this.maxInProgress = - is.number(options.maxInProgress) ? options.maxInProgress : Infinity; - this.messageListeners = 0; - this.paused = false; + ServiceObject.call(this, config); + events.EventEmitter.call(this); /** * [IAM (Identity and Access Management)](https://cloud.google.com/pubsub/access_control) @@ -176,26 +258,26 @@ function Subscription(pubsub, options) { * console.log(policy); * }); */ - this.iam = new IAM(pubsub, this.name); + this.iam = new IAM(pubsub, { + baseUrl: baseUrl, + id: unformattedName + }); + + this.name = Subscription.formatName_(pubsub.projectId, options.name); + + this.autoAck = is.boolean(options.autoAck) ? options.autoAck : false; + this.closed = true; + this.interval = is.number(options.interval) ? options.interval : 10; + this.inProgressAckIds = {}; + this.maxInProgress = + is.number(options.maxInProgress) ? options.maxInProgress : Infinity; + this.messageListeners = 0; + this.paused = false; this.listenForEvents_(); } -nodeutil.inherits(Subscription, events.EventEmitter); - -/** - * Format the name of a subscription. A subscription's full name is in the - * format of projects/{projectId}/subscriptions/{subName}. - * - * @private - */ -Subscription.formatName_ = function(projectId, name) { - // Simple check if the name is already formatted. - if (name.indexOf('/') > -1) { - return name; - } - return 'projects/' + projectId + '/subscriptions/' + name; -}; +modelo.inherits(Subscription, ServiceObject, events.EventEmitter); /** * Simplify a message from an API response to have three properties, `id`, @@ -229,84 +311,18 @@ Subscription.formatMessage_ = function(msg) { }; /** - * Begin listening for events on the subscription. This method keeps track of - * how many message listeners are assigned, and then removed, making sure - * polling is handled automatically. - * - * As long as there is one active message listener, the connection is open. As - * soon as there are no more message listeners, the connection is closed. - * - * @private - * - * @example - * subscription.listenForEvents_(); - */ -Subscription.prototype.listenForEvents_ = function() { - var self = this; - - this.on('newListener', function(event) { - if (event === 'message') { - self.messageListeners++; - if (self.closed) { - self.closed = false; - self.startPulling_(); - } - } - }); - - this.on('removeListener', function(event) { - if (event === 'message' && --self.messageListeners === 0) { - self.closed = true; - } - }); -}; - -/** - * Poll the backend for new messages. This runs a loop to ping the API at the - * provided interval from the subscription's instantiation. If one wasn't - * provided, the default value is 10 milliseconds. - * - * If messages are received, they are emitted on the `message` event. - * - * Note: This method is automatically called once a message event handler is - * assigned to the description. - * - * To stop pulling, see {@linkcode module:pubsub/subscription#close}. + * Format the name of a subscription. A subscription's full name is in the + * format of projects/{projectId}/subscriptions/{subName}. * * @private - * - * @example - * subscription.startPulling_(); */ -Subscription.prototype.startPulling_ = function() { - var self = this; - - if (this.closed || this.paused) { - return; - } - - var maxResults; - - if (this.maxInProgress < Infinity) { - maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length; +Subscription.formatName_ = function(projectId, name) { + // Simple check if the name is already formatted. + if (name.indexOf('/') > -1) { + return name; } - this.pull({ - returnImmediately: false, - maxResults: maxResults - }, function(err, messages, apiResponse) { - if (err) { - self.emit('error', err, apiResponse); - } - - if (messages) { - messages.forEach(function(message) { - self.emit('message', message, apiResponse); - }); - } - - setTimeout(self.startPulling_.bind(self), self.interval); - }); + return 'projects/' + projectId + '/subscriptions/' + name; }; /** @@ -326,26 +342,28 @@ Subscription.prototype.startPulling_ = function() { Subscription.prototype.ack = function(ackIds, callback) { var self = this; - if (!ackIds || ackIds.length === 0) { - throw new Error( - 'At least one ID must be specified before it can be acknowledged.'); - } - ackIds = arrify(ackIds); - var body = { - ackIds: ackIds - }; + if (ackIds.length === 0) { + throw new Error([ + 'At least one ID must be specified before it can be acknowledged.' + ].join('')); + } callback = callback || util.noop; - var path = this.name + ':acknowledge'; - - this.makeReq_('POST', path, null, body, function(err, resp) { + this.request({ + method: 'POST', + uri: ':acknowledge', + json: { + ackIds: ackIds + } + }, function(err, resp) { if (!err) { ackIds.forEach(function(ackId) { delete self.inProgressAckIds[ackId]; }); + self.refreshPausedStatus_(); } @@ -353,6 +371,36 @@ Subscription.prototype.ack = function(ackIds, callback) { }); }; +/** + * Add functionality on top of a message returned from the API, including the + * ability to `ack` and `skip` the message. + * + * This also records the message as being "in progress". See + * {module:subscription#refreshPausedStatus_}. + * + * @private + * + * @param {object} message - A message object. + * @return {object} message - The original message after being decorated. + * @param {function} message.ack - Ack the message. + * @param {function} message.skip - Increate the number of available messages to + * simultaneously receive. + */ +Subscription.prototype.decorateMessage_ = function(message) { + var self = this; + + this.inProgressAckIds[message.ackId] = true; + + message.ack = self.ack.bind(self, message.ackId); + + message.skip = function() { + delete self.inProgressAckIds[message.ackId]; + self.refreshPausedStatus_(); + }; + + return message; +}; + /** * Delete the subscription. Pull requests from the current subscription will be * errored once unsubscription is complete. @@ -366,15 +414,19 @@ Subscription.prototype.ack = function(ackIds, callback) { */ Subscription.prototype.delete = function(callback) { var self = this; + callback = callback || util.noop; - this.makeReq_('DELETE', this.name, null, true, function(err, result) { + + ServiceObject.prototype.delete.call(this, function(err, resp) { if (err) { - callback(err, result); + callback(err, resp); return; } + self.closed = true; self.removeAllListeners(); - callback(null, result); + + callback(null, resp); }); }; @@ -440,29 +492,27 @@ Subscription.prototype.pull = function(options, callback) { options.maxResults = MAX_EVENTS_LIMIT; } - var body = { - returnImmediately: !!options.returnImmediately, - maxMessages: options.maxResults - }; - - var path = this.name + ':pull'; - this.makeReq_('POST', path, null, body, function(err, response) { + this.request({ + method: 'POST', + uri: ':pull', + json: { + returnImmediately: !!options.returnImmediately, + maxMessages: options.maxResults + } + }, function(err, response) { if (err) { callback(err, null, response); return; } - var messages = response.receivedMessages || []; - messages = messages + var messages = arrify(response.receivedMessages) .map(Subscription.formatMessage_) .map(self.decorateMessage_.bind(self)); self.refreshPausedStatus_(); if (self.autoAck && messages.length !== 0) { - var ackIds = messages.map(function(message) { - return message.ackId; - }); + var ackIds = messages.map(prop('ackId')); self.ack(ackIds, function(err) { callback(err, messages, response); @@ -496,45 +546,51 @@ Subscription.prototype.pull = function(options, callback) { * subscription.setAckDeadline(options, function(err, apiResponse) {}); */ Subscription.prototype.setAckDeadline = function(options, callback) { - var body = { - ackIds: arrify(options.ackIds), - ackDeadlineSeconds: options.seconds - }; - callback = callback || util.noop; - var path = this.name + ':modifyAckDeadline'; - this.makeReq_('POST', path, null, body, callback); + this.request({ + method: 'POST', + uri: ':modifyAckDeadline', + json: { + ackIds: arrify(options.ackIds), + ackDeadlineSeconds: options.seconds + } + }, function(err, resp) { + callback(err, resp); + }); }; /** - * Add functionality on top of a message returned from the API, including the - * ability to `ack` and `skip` the message. + * Begin listening for events on the subscription. This method keeps track of + * how many message listeners are assigned, and then removed, making sure + * polling is handled automatically. * - * This also records the message as being "in progress". See - * {module:subscription#refreshPausedStatus_}. + * As long as there is one active message listener, the connection is open. As + * soon as there are no more message listeners, the connection is closed. * * @private * - * @param {object} message - A message object. - * @return {object} message - The original message after being decorated. - * @param {function} message.ack - Ack the message. - * @param {function} message.skip - Increate the number of available messages to - * simultaneously receive. + * @example + * subscription.listenForEvents_(); */ -Subscription.prototype.decorateMessage_ = function(message) { +Subscription.prototype.listenForEvents_ = function() { var self = this; - this.inProgressAckIds[message.ackId] = true; - - message.ack = self.ack.bind(self, message.ackId); - - message.skip = function() { - delete self.inProgressAckIds[message.ackId]; - self.refreshPausedStatus_(); - }; + this.on('newListener', function(event) { + if (event === 'message') { + self.messageListeners++; + if (self.closed) { + self.closed = false; + self.startPulling_(); + } + } + }); - return message; + this.on('removeListener', function(event) { + if (event === 'message' && --self.messageListeners === 0) { + self.closed = true; + } + }); }; /** @@ -551,6 +607,7 @@ Subscription.prototype.decorateMessage_ = function(message) { Subscription.prototype.refreshPausedStatus_ = function() { var isCurrentlyPaused = this.paused; var inProgress = Object.keys(this.inProgressAckIds).length; + this.paused = inProgress >= this.maxInProgress; if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) { @@ -558,4 +615,52 @@ Subscription.prototype.refreshPausedStatus_ = function() { } }; +/** + * Poll the backend for new messages. This runs a loop to ping the API at the + * provided interval from the subscription's instantiation. If one wasn't + * provided, the default value is 10 milliseconds. + * + * If messages are received, they are emitted on the `message` event. + * + * Note: This method is automatically called once a message event handler is + * assigned to the description. + * + * To stop pulling, see {@linkcode module:pubsub/subscription#close}. + * + * @private + * + * @example + * subscription.startPulling_(); + */ +Subscription.prototype.startPulling_ = function() { + var self = this; + + if (this.closed || this.paused) { + return; + } + + var maxResults; + + if (this.maxInProgress < Infinity) { + maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length; + } + + this.pull({ + returnImmediately: false, + maxResults: maxResults + }, function(err, messages, apiResponse) { + if (err) { + self.emit('error', err, apiResponse); + } + + if (messages) { + messages.forEach(function(message) { + self.emit('message', message, apiResponse); + }); + } + + setTimeout(self.startPulling_.bind(self), self.interval); + }); +}; + module.exports = Subscription; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index 20df59b5123..81508fa272c 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -22,6 +22,7 @@ var arrify = require('arrify'); var is = require('is'); +var nodeutil = require('util'); var prop = require('propprop'); /** @@ -36,6 +37,12 @@ var util = require('../common/util.js'); */ var IAM = require('./iam'); +/** + * @type {module:common/serviceObject} + * @private + */ +var ServiceObject = require('../common/service-object.js'); + /*! Developer Documentation * * @param {module:pubsub} pubsub - PubSub object. @@ -55,13 +62,88 @@ var IAM = require('./iam'); * var topic = pubsub.topic('my-topic'); */ function Topic(pubsub, name) { - this.name = Topic.formatName_(pubsub.projectId, name); + var baseUrl = '/topics'; - this.projectId = pubsub.projectId; - this.pubsub = pubsub; - this.unformattedName = name; + var methods = { + /** + * Create a topic. + * + * @param {object=} config - See {module:pubsub#createTopic}. + * + * @example + * topic.create(function(err, topic, apiResponse) { + * if (!err) { + * // The topic was created successfully. + * } + * }); + */ + create: true, + + /** + * Delete the topic. This will not delete subscriptions to this topic. + * + * @resource [Topics: delete API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete} + * + * @param {function=} callback - The callback function. + * + * @example + * topic.delete(function(err, apiResponse) {}); + */ + delete: true, + + /** + * Check if the topic exists. + * + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error returned while making this + * request. + * @param {boolean} callback.exists - Whether the topic exists or not. + * + * @example + * topic.exists(function(err, exists) {}); + */ + exists: true, + + /** + * Get a topic if it exists. + * + * You may optionally use this to "get or create" an object by providing an + * object with `autoCreate` set to `true`. Any extra configuration that is + * normally required for the `create` method must be contained within this + * object as well. + * + * @param {options=} options - Configuration object. + * @param {boolean} options.autoCreate - Automatically create the object if + * it does not exist. Default: `false` + * + * @example + * topic.get(function(err, topic, apiResponse) { + * // `topic.metadata` has been populated. + * }); + */ + get: true, + + /** + * Get the official representation of this topic from the API. + * + * @resource [Topics: get API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get} + * + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error returned while making this + * request. + * @param {object} callback.metadata - The metadata of the Topic. + * @param {object} callback.apiResponse - The full API response. + */ + getMetadata: true + }; - this.makeReq_ = this.pubsub.makeReq_.bind(this.pubsub); + ServiceObject.call(this, { + parent: pubsub, + baseUrl: baseUrl, + id: name, + createMethod: pubsub.createTopic.bind(pubsub), + methods: methods + }); /** * [IAM (Identity and Access Management)](https://cloud.google.com/pubsub/access_control) @@ -89,9 +171,18 @@ function Topic(pubsub, name) { * console.log(policy); * }); */ - this.iam = new IAM(pubsub, this.name); + this.iam = new IAM(pubsub, { + baseUrl: baseUrl, + id: name + }); + + this.name = Topic.formatName_(pubsub.projectId, name); + this.pubsub = pubsub; + this.unformattedName = name; } +nodeutil.inherits(Topic, ServiceObject); + /** * Format a message object as the upstream API expects it. * @@ -125,6 +216,67 @@ Topic.formatName_ = function(projectId, name) { return 'projects/' + projectId + '/topics/' + name; }; +/** + * Get a list of the subscriptions registered to this topic. You may optionally + * provide a query object as the first argument to customize the response. + * + * Your provided callback will be invoked with an error object if an API error + * occurred or an array of {@linkcode module:pubsub/subscription} objects. + * + * @resource [Subscriptions: list API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list} + * + * @param {object=} options - Configuration object. + * @param {number=} options.pageSize - Maximum number of results to return. + * @param {string=} options.pageToken - Page token. + * @param {function} callback - The callback function. + * + * @example + * var callback = function(err, subscriptions, nextQuery, apiResponse) { + * // If `nextQuery` is non-null, there may be more results to fetch. To do + * // so, run `topic.getSubscriptions(nextQuery, callback);`. + * }; + * + * // Get all subscriptions for this topic. + * topic.getSubscriptions(callback); + * + * // Customize the query. + * topic.getSubscriptions({ + * pageSize: 3 + * }, callback); + * + * //- + * // Get the subscriptions for this topic as a readable object stream. + * //- + * topic.getSubscriptions() + * .on('error', console.error) + * .on('data', function(subscription) { + * // subscription is a Subscription object. + * }) + * .on('end', function() { + * // All subscriptions retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * topic.getSubscriptions() + * .on('data', function(subscription) { + * this.end(); + * }); + */ +Topic.prototype.getSubscriptions = function(options, callback) { + if (is.fn(options)) { + callback = options; + options = {}; + } + + options = options || {}; + options.topic = this; + + return this.pubsub.getSubscriptions(options, callback); +}; + /** * Publish the provided message or array of messages. On success, an array of * messageIds is returned in the response. @@ -191,120 +343,22 @@ Topic.prototype.publish = function(messages, callback) { callback = callback || util.noop; - var body = { - messages: messages.map(Topic.formatMessage_) - }; - - var path = this.name + ':publish'; - this.makeReq_('POST', path, null, body, function(err, result) { - if (err) { - callback(err, null, result); - return; + this.request({ + method: 'POST', + uri: ':publish', + json: { + messages: messages.map(Topic.formatMessage_) } - callback(null, result && result.messageIds || [], result); - }); -}; - -/** - * Delete the topic. This will not delete subscriptions to this topic. - * - * @resource [Topics: delete API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete} - * - * @param {function=} callback - The callback function. - * - * @example - * topic.delete(function(err, apiResponse) {}); - */ -Topic.prototype.delete = function(callback) { - callback = callback || util.noop; - this.makeReq_('DELETE', this.name, null, null, callback); -}; - -/** - * Get the official representation of this topic from the API. - * - * @resource [Topics: get API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get} - * - * @param {function} callback - The callback function. - * @param {?error} callback.err - An error returned while making this request. - * @param {object} callback.metadata - The metadata of the Topic. - * @param {object} callback.apiResponse - The full API response. - */ -Topic.prototype.getMetadata = function(callback) { - var self = this; - - this.makeReq_('GET', this.name, null, null, function(err, resp) { + }, function(err, result) { if (err) { - callback(err, null, resp); + callback(err, null, result); return; } - self.metadata = resp; - callback(null, self.metadata, resp); + callback(null, arrify(result.messageIds), result); }); }; -/** - * Get a list of the subscriptions registered to this topic. You may optionally - * provide a query object as the first argument to customize the response. - * - * Your provided callback will be invoked with an error object if an API error - * occurred or an array of {@linkcode module:pubsub/subscription} objects. - * - * @resource [Subscriptions: list API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list} - * - * @param {object=} options - Configuration object. - * @param {number=} options.pageSize - Maximum number of results to return. - * @param {string=} options.pageToken - Page token. - * @param {function} callback - The callback function. - * - * @example - * var callback = function(err, subscriptions, nextQuery, apiResponse) { - * // If `nextQuery` is non-null, there may be more results to fetch. To do - * // so, run `topic.getSubscriptions(nextQuery, callback);`. - * }; - * - * // Get all subscriptions for this topic. - * topic.getSubscriptions(callback); - * - * // Customize the query. - * topic.getSubscriptions({ - * pageSize: 3 - * }, callback); - * - * //- - * // Get the subscriptions for this topic as a readable object stream. - * //- - * topic.getSubscriptions() - * .on('error', console.error) - * .on('data', function(subscription) { - * // subscription is a Subscription object. - * }) - * .on('end', function() { - * // All subscriptions retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. - * //- - * topic.getSubscriptions() - * .on('data', function(subscription) { - * this.end(); - * }); - */ -Topic.prototype.getSubscriptions = function(options, callback) { - if (is.fn(options)) { - callback = options; - options = {}; - } - - options = options || {}; - options.topic = this; - - return this.pubsub.getSubscriptions(options, callback); -}; - /** * Create a subscription to this topic. You may optionally provide an object to * customize the subscription. @@ -344,10 +398,9 @@ Topic.prototype.subscribe = function(subName, options, callback) { }; /** - * Create a Subscription object in reference to an existing subscription. This - * command by itself will not run any API requests. You will receive a - * {@linkcode module:pubsub/subscription} object, which will allow you to - * interact with your subscription. + * Create a Subscription object. This command by itself will not run any API + * requests. You will receive a {@linkcode module:pubsub/subscription} object, + * which will allow you to interact with a subscription. * * @param {string} name - Name of the subscription. * @param {object=} options - Configuration object. @@ -358,7 +411,7 @@ Topic.prototype.subscribe = function(subName, options, callback) { * @return {module:pubsub/subscription} * * @example - * var subscription = topic.subscription('my-existing-subscription'); + * var subscription = topic.subscription('my-subscription'); * * // Register a listener for `message` events. * subscription.on('message', function(message) { @@ -369,6 +422,9 @@ Topic.prototype.subscribe = function(subName, options, callback) { * }); */ Topic.prototype.subscription = function(name, options) { + options = options || {}; + options.topic = this; + return this.pubsub.subscription(name, options); }; diff --git a/package.json b/package.json index 601a807a954..d9fc91d8235 100644 --- a/package.json +++ b/package.json @@ -96,6 +96,7 @@ "is": "^3.0.1", "methmeth": "^1.0.0", "mime-types": "^2.0.8", + "modelo": "^4.2.0", "once": "^1.3.1", "prop-assign": "^1.0.0", "propprop": "^0.3.0", diff --git a/system-test/pubsub.js b/system-test/pubsub.js index 4fbf4c91431..5ae8b14a183 100644 --- a/system-test/pubsub.js +++ b/system-test/pubsub.js @@ -42,16 +42,22 @@ describe('pubsub', function() { generateTopicName() ]; - var TOPICS = TOPIC_NAMES.map(pubsub.topic.bind(pubsub)); + var TOPICS = [ + pubsub.topic(TOPIC_NAMES[0]), + pubsub.topic(TOPIC_NAMES[1]), + pubsub.topic(TOPIC_NAMES[2]) + ]; - var TOPIC_FULL_NAMES = TOPICS.map(function(topic) { - return topic.name; - }); + var TOPIC_FULL_NAMES = [ + TOPICS[0].name, + TOPICS[1].name, + TOPICS[2].name + ]; before(function(done) { // create all needed topics - async.each(TOPIC_NAMES, function(name, cb) { - pubsub.createTopic(name, cb); + async.each(TOPICS, function(topic, cb) { + topic.create(cb); }, done); }); @@ -136,6 +142,7 @@ describe('pubsub', function() { describe('Subscription', function() { var TOPIC_NAME = generateTopicName(); + var topic = pubsub.topic(TOPIC_NAME); var SUB_NAMES = [ generateSubName(), @@ -143,25 +150,16 @@ describe('pubsub', function() { ]; var SUBSCRIPTIONS = [ - { - name: SUB_NAMES[0], - options: { ackDeadlineSeconds: 30 } - }, - { - name: SUB_NAMES[1], - options: { ackDeadlineSeconds: 60 } - } + topic.subscription(SUB_NAMES[0], { ackDeadlineSeconds: 30 }), + topic.subscription(SUB_NAMES[1], { ackDeadlineSeconds: 60 }) ]; - var topic; - before(function(done) { - pubsub.createTopic(TOPIC_NAME, function(err, newTopic) { + topic.create(function(err) { assert.ifError(err); - topic = newTopic; function createSubscription(subscription, callback) { - topic.subscribe(subscription.name, subscription.options, callback); + subscription.create(callback); } async.each(SUBSCRIPTIONS, createSubscription, function(err) { @@ -178,12 +176,8 @@ describe('pubsub', function() { }); after(function(done) { - var SUBS = SUB_NAMES.map(function(name) { - return topic.subscription(name); - }); - // Delete subscriptions - async.each(SUBS, function(sub, callback) { + async.each(SUBSCRIPTIONS, function(sub, callback) { sub.delete(callback); }, function(err) { assert.ifError(err); @@ -194,8 +188,8 @@ describe('pubsub', function() { it('should list all subscriptions registered to the topic', function(done) { topic.getSubscriptions(function(err, subs) { assert.ifError(err); - assert(subs[0] instanceof Subscription); assert.equal(subs.length, SUBSCRIPTIONS.length); + assert(subs[0] instanceof Subscription); done(); }); }); @@ -322,7 +316,6 @@ describe('pubsub', function() { }); describe('IAM', function() { - it('should get a policy', function(done) { var topic = pubsub.topic(TOPIC_NAMES[0]); @@ -365,6 +358,5 @@ describe('pubsub', function() { done(); }); }); - }); }); diff --git a/test/pubsub/iam.js b/test/pubsub/iam.js index fff0a3cb05a..90a2b874b32 100644 --- a/test/pubsub/iam.js +++ b/test/pubsub/iam.js @@ -17,36 +17,66 @@ 'use strict'; var assert = require('assert'); -var IAM = require('../../lib/pubsub/iam'); -var noop = function() {}; +var mockery = require('mockery'); +var nodeutil = require('util'); + +var ServiceObject = require('../../lib/common/service-object.js'); +var util = require('../../lib/common/util.js'); + +function FakeServiceObject() { + this.calledWith_ = arguments; + ServiceObject.apply(this, arguments); +} + +nodeutil.inherits(FakeServiceObject, ServiceObject); describe('IAM', function() { - var RESOURCE = 'projects/test-project/topics/test-topic'; - var pubsubMock = { - makeReq_: noop - }; + var IAM; var iam; + var PUBSUB = {}; + var CONFIG = { + baseUrl: '/baseurl', + id: 'id' + }; + + before(function() { + mockery.registerMock('../common/service-object.js', FakeServiceObject); + + mockery.enable({ + useCleanCache: true, + warnOnUnregistered: false + }); + + IAM = require('../../lib/pubsub/iam.js'); + }); + + after(function() { + mockery.deregisterAll(); + mockery.disable(); + }); + beforeEach(function() { - iam = new IAM(pubsubMock, RESOURCE); + iam = new IAM(PUBSUB, CONFIG); }); describe('initialization', function() { - it('should localize the resource', function() { - assert.strictEqual(iam.resource, RESOURCE); + it('should inherit from ServiceObject', function() { + assert(iam instanceof ServiceObject); + + var calledWith = iam.calledWith_[0]; + + assert.strictEqual(calledWith.parent, PUBSUB); + assert.strictEqual(calledWith.baseUrl, CONFIG.baseUrl); + assert.strictEqual(calledWith.id, CONFIG.id); + assert.deepEqual(calledWith.methods, {}); }); }); describe('getPolicy', function() { it('should make the correct API request', function(done) { - iam.makeReq_ = function(method, path, q, body) { - assert.strictEqual(method, 'GET'); - - var expectedPath = RESOURCE + ':getIamPolicy'; - assert.strictEqual(path, expectedPath); - - assert.strictEqual(q, null); - assert.strictEqual(body, null); + iam.request = function(reqOpts) { + assert.strictEqual(reqOpts.uri, ':getIamPolicy'); done(); }; @@ -54,37 +84,35 @@ describe('IAM', function() { iam.getPolicy(assert.ifError); }); - it('should pass the callback the expected params', function(done) { - var _policy = { - bindings: [{ yo: 'yo' }] - }; + it('should handle errors properly', function(done) { + var apiResponse = {}; + var error = new Error('Error.'); - iam.makeReq_ = function(method, path, q, body, callback) { - callback(null, _policy, _policy); + iam.request = function(reqOpts, callback) { + callback(error, apiResponse); }; - iam.getPolicy(function(err, policy, apiResponse) { - assert.ifError(err); - assert.deepEqual(policy, _policy); - assert.deepEqual(apiResponse, _policy); + iam.getPolicy(function(err, policy, apiResponse_) { + assert.strictEqual(err, error); + assert.strictEqual(policy, null); + assert.strictEqual(apiResponse_, apiResponse); done(); }); }); - it('should handle errors properly', function(done) { - var fakeResponse = { - error: 'Ohnoes' + it('should pass the callback the expected params', function(done) { + var apiResponse = { + bindings: [{ yo: 'yo' }] }; - var error = new Error(fakeResponse.error); - iam.makeReq_ = function(method, path, q, body, callback) { - callback(error, fakeResponse); + iam.request = function(reqOpts, callback) { + callback(null, apiResponse); }; - iam.getPolicy(function(err, policy, apiResponse) { - assert.strictEqual(err, error); - assert.strictEqual(policy, null); - assert.strictEqual(apiResponse, fakeResponse); + iam.getPolicy(function(err, policy, apiResponse_) { + assert.ifError(err); + assert.strictEqual(policy, apiResponse); + assert.strictEqual(apiResponse_, apiResponse); done(); }); }); @@ -93,23 +121,17 @@ describe('IAM', function() { describe('setPolicy', function() { it('should throw an error if a policy is not supplied', function() { assert.throws(function() { - iam.setPolicy(noop); - }, /A policy is required/); + iam.setPolicy(util.noop); + }, /A policy object is required/); }); it('should make the correct API request', function(done) { var policy = { etag: 'ACAB' }; - iam.makeReq_ = function(method, path, q, body) { - assert.strictEqual(method, 'POST'); - - var expectedPath = RESOURCE + ':setIamPolicy'; - assert.strictEqual(path, expectedPath); - - assert.strictEqual(q, null); - - var expectedBody = { policy: policy }; - assert.deepEqual(body, expectedBody); + iam.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, ':setIamPolicy'); + assert.deepEqual(reqOpts.json, { policy: policy }); done(); }; @@ -117,37 +139,35 @@ describe('IAM', function() { iam.setPolicy(policy, assert.ifError); }); - it('should pass the callback the expected params', function(done) { - var _policy = { - bindings: [{ yo: 'yo' }] - }; + it('should handle errors properly', function(done) { + var apiResponse = {}; + var error = new Error('Error.'); - iam.makeReq_ = function(method, path, q, body, callback) { - callback(null, body.policy, body.policy); + iam.request = function(reqOpts, callback) { + callback(error, apiResponse); }; - iam.setPolicy(_policy, function(err, policy, apiResponse) { - assert.ifError(err); - assert.deepEqual(_policy, policy); - assert.deepEqual(_policy, apiResponse); + iam.setPolicy({}, function(err, policy, apiResponse_) { + assert.strictEqual(err, error); + assert.strictEqual(policy, null); + assert.strictEqual(apiResponse_, apiResponse); done(); }); }); - it('should handle errors properly', function(done) { - var fakeResponse = { - error: 'Ohnoes' + it('should pass the callback the expected params', function(done) { + var apiResponse = { + bindings: [{ yo: 'yo' }] }; - var error = new Error(fakeResponse.error); - iam.makeReq_ = function(method, path, q, body, callback) { - callback(error, fakeResponse); + iam.request = function(reqOpts, callback) { + callback(null, apiResponse); }; - iam.setPolicy({}, function(err, policy, apiResponse) { - assert.strictEqual(err, error); - assert.strictEqual(policy, null); - assert.strictEqual(apiResponse, fakeResponse); + iam.setPolicy({}, function(err, policy, apiResponse_) { + assert.ifError(err); + assert.strictEqual(policy, apiResponse); + assert.strictEqual(apiResponse_, apiResponse); done(); }); }); @@ -156,23 +176,17 @@ describe('IAM', function() { describe('testPermissions', function() { it('should throw an error if permissions are missing', function() { assert.throws(function() { - iam.testPermissions(noop); + iam.testPermissions(util.noop); }, /Permissions are required/); }); it('should make the correct API request', function(done) { var permissions = 'storage.bucket.list'; - iam.makeReq_ = function(method, path, q, body) { - assert.strictEqual(method, 'POST'); - - var expectedPath = RESOURCE + ':testIamPermissions'; - assert.strictEqual(path, expectedPath); - - assert.strictEqual(q, null); - - var expectedBody = { permissions: [permissions] }; - assert.deepEqual(body, expectedBody); + iam.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, ':testIamPermissions'); + assert.deepEqual(reqOpts.json, { permissions: [permissions] }); done(); }; @@ -183,16 +197,16 @@ describe('IAM', function() { it('should send an error back if the request fails', function(done) { var permissions = ['storage.bucket.list']; var error = new Error('Error.'); - var fakeResponse = {}; + var apiResponse = {}; - iam.makeReq_ = function(method, path, q, body, callback) { - callback(error, fakeResponse); + iam.request = function(reqOpts, callback) { + callback(error, apiResponse); }; - iam.testPermissions(permissions, function(err, perms, resp) { + iam.testPermissions(permissions, function(err, permissions, apiResp) { assert.strictEqual(err, error); - assert.strictEqual(perms, null); - assert.strictEqual(resp, fakeResponse); + assert.strictEqual(permissions, null); + assert.strictEqual(apiResp, apiResponse); done(); }); }); @@ -202,21 +216,22 @@ describe('IAM', function() { 'storage.bucket.list', 'storage.bucket.consume' ]; - var fakeResponse = { + var apiResponse = { permissions: ['storage.bucket.consume'] }; - iam.makeReq_ = function(method, path, q, body, callback) { - callback(null, fakeResponse); + iam.request = function(reqOpts, callback) { + callback(null, apiResponse); }; - iam.testPermissions(permissions, function(err, perms, resp) { + iam.testPermissions(permissions, function(err, permissions, apiResp) { assert.ifError(err); - assert.deepEqual(perms, { + assert.deepEqual(permissions, { 'storage.bucket.list': false, 'storage.bucket.consume': true }); - assert.strictEqual(resp, fakeResponse); + assert.strictEqual(apiResp, apiResponse); + done(); }); }); diff --git a/test/pubsub/index.js b/test/pubsub/index.js index 18e0dbf21c4..bd9bde925a8 100644 --- a/test/pubsub/index.js +++ b/test/pubsub/index.js @@ -18,11 +18,14 @@ var arrify = require('arrify'); var assert = require('assert'); +var extend = require('extend'); var mockery = require('mockery'); +var nodeutil = require('util'); var request = require('request'); -var extend = require('extend'); -var util = require('../../lib/common/util'); + +var Service = require('../../lib/common/service.js'); var Topic = require('../../lib/pubsub/topic.js'); +var util = require('../../lib/common/util.js'); var SubscriptionCached = require('../../lib/pubsub/subscription.js'); var SubscriptionOverride; @@ -45,6 +48,13 @@ fakeRequest.defaults = function() { var fakeUtil = extend({}, util); +function FakeService() { + this.calledWith_ = arguments; + Service.apply(this, arguments); +} + +nodeutil.inherits(FakeService, Service); + var extended = false; var fakeStreamRouter = { extend: function(Class, methods) { @@ -65,15 +75,18 @@ describe('PubSub', function() { var pubsub; before(function() { + mockery.registerMock('../common/service.js', FakeService); mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.registerMock('../common/util.js', fakeUtil); mockery.registerMock('./subscription.js', Subscription); mockery.registerMock('./topic.js', Topic); mockery.registerMock('request', fakeRequest); + mockery.enable({ useCleanCache: true, warnOnUnregistered: false }); + PubSub = require('../../lib/pubsub'); }); @@ -86,7 +99,7 @@ describe('PubSub', function() { SubscriptionOverride = null; requestOverride = null; pubsub = new PubSub({ projectId: PROJECT_ID }); - pubsub.makeReq_ = function(method, path, q, body, callback) { + pubsub.request = function(method, path, q, body, callback) { callback(); }; }); @@ -114,130 +127,92 @@ describe('PubSub', function() { fakeUtil.normalizeArguments = normalizeArguments; }); - }); - describe('getTopics', function() { - var topicName = 'fake-topic'; - var apiResponse = { topics: [{ name: topicName }]}; + it('should inherit from Service', function() { + assert(pubsub instanceof Service); - beforeEach(function() { - pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(null, apiResponse); - }; - }); + var calledWith = pubsub.calledWith_[0]; - it('should accept a query and a callback', function(done) { - pubsub.getTopics({}, done); + var baseUrl = 'https://pubsub.googleapis.com/v1'; + assert.strictEqual(calledWith.baseUrl, baseUrl); + assert.deepEqual(calledWith.scopes, [ + 'https://www.googleapis.com/auth/pubsub', + 'https://www.googleapis.com/auth/cloud-platform' + ]); }); + }); - it('should accept just a callback', function(done) { - pubsub.getTopics(done); - }); + describe('createTopic', function() { + it('should make the correct API request', function(done) { + var topicName = 'new-topic-name'; - it('should build the right request', function() { - pubsub.makeReq_ = function(method, path) { - assert.equal(method, 'GET'); - assert.equal(path, 'projects/' + PROJECT_ID + '/topics'); + pubsub.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'PUT'); + assert.strictEqual(reqOpts.uri, '/topics/' + topicName); + done(); }; - pubsub.getTopics(function() {}); - }); - it('should return Topic instances with metadata', function(done) { - var topic = {}; + pubsub.createTopic(topicName, function() {}); + }); - pubsub.topic = function(name) { - assert.strictEqual(name, topicName); - return topic; - }; + describe('error', function() { + var error = new Error('Error.'); + var apiResponse = {}; - pubsub.getTopics(function(err, topics) { - assert.ifError(err); - assert.strictEqual(topics[0], topic); - assert.strictEqual(topics[0].metadata, apiResponse.topics[0]); - done(); + beforeEach(function() { + pubsub.request = function(reqOpts, callback) { + callback(error, apiResponse); + }; }); - }); - it('should return a query if more results exist', function() { - var token = 'next-page-token'; - pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(null, { nextPageToken: token }); - }; - var query = { pageSize: 1 }; - pubsub.getTopics(query, function(err, topics, nextQuery) { - assert.ifError(err); - assert.strictEqual(query.pageSize, nextQuery.pageSize); - assert.equal(query.pageToken, token); + it('should return an error & API response', function(done) { + pubsub.createTopic('new-topic', function(err, topic, apiResponse_) { + assert.strictEqual(err, error); + assert.strictEqual(topic, null); + assert.strictEqual(apiResponse_, apiResponse); + done(); + }); }); }); - it('should pass error if api returns an error', function() { - var error = new Error('Error'); - pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(error); - }; - pubsub.getTopics(function(err) { - assert.equal(err, error); - }); - }); + describe('success', function() { + var apiResponse = {}; - it('should pass apiResponse to callback', function(done) { - var resp = { success: true }; - pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(null, resp); - }; - pubsub.getTopics(function(err, topics, nextQuery, apiResponse) { - assert.equal(resp, apiResponse); - done(); + beforeEach(function() { + pubsub.request = function(reqOpts, callback) { + callback(null, apiResponse); + }; }); - }); - }); - describe('createTopic', function() { - it('should create a topic', function() { - var topicName = 'new-topic-name'; - pubsub.makeReq_ = function(method, path, q, body) { - assert.equal(method, 'PUT'); - assert.equal(path, 'projects/' + PROJECT_ID + '/topics/' + topicName); - assert.equal(body, null); - }; - pubsub.createTopic(topicName, function() {}); - }); + it('should return a Topic object', function(done) { + var topicName = 'new-topic'; + var topicInstance = {}; - it('should return a Topic object', function() { - pubsub.createTopic('new-topic', function(err, topic) { - assert.ifError(err); - assert(topic instanceof Topic); - }); - }); + pubsub.topic = function(name) { + assert.strictEqual(name, topicName); + return topicInstance; + }; - it('should pass apiResponse to callback', function(done) { - var resp = { success: true }; - pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(null, resp); - }; - pubsub.createTopic('new-topic', function(err, topic, apiResponse) { - assert.equal(resp, apiResponse); - done(); + pubsub.createTopic(topicName, function(err, topic) { + assert.ifError(err); + assert.strictEqual(topic, topicInstance); + done(); + }); }); - }); - }); - describe('topic', function() { - it('should throw if a name is not provided', function() { - assert.throws(function() { - pubsub.topic(); - }, /name must be specified/); - }); - - it('should return a Topic object', function() { - assert(pubsub.topic('new-topic') instanceof Topic); + it('should pass apiResponse to callback', function(done) { + pubsub.createTopic('new-topic', function(err, topic, apiResponse_) { + assert.ifError(err); + assert.strictEqual(apiResponse_, apiResponse); + done(); + }); + }); }); }); describe('getSubscriptions', function() { beforeEach(function() { - pubsub.makeReq_ = function(method, path, q, body, callback) { + pubsub.request = function(reqOpts, callback) { callback(null, { subscriptions: [{ name: 'fake-subscription' }] }); }; }); @@ -250,11 +225,11 @@ describe('PubSub', function() { pubsub.getSubscriptions(done); }); - it('should pass the correct arguments to the API', function() { - pubsub.makeReq_ = function(method, path, query) { - assert.equal(method, 'GET'); - assert.equal(path, 'projects/' + PROJECT_ID + '/subscriptions'); - assert.equal(query.query, undefined); + it('should pass the correct arguments to the API', function(done) { + pubsub.request = function(reqOpts) { + assert.strictEqual(reqOpts.uri, '/subscriptions'); + assert.deepEqual(reqOpts.qs, {}); + done(); }; pubsub.getSubscriptions(assert.ifError); @@ -263,16 +238,15 @@ describe('PubSub', function() { describe('topics', function() { var TOPIC; var TOPIC_NAME = 'topic'; - var TOPIC_SUBCRIPTION_NAME = - 'projects/' + PROJECT_ID + '/topics/' + TOPIC_NAME + '/subscriptions'; + var TOPIC_SUBCRIPTION_NAME = '/topics/' + TOPIC_NAME + '/subscriptions'; before(function() { TOPIC = new Topic(pubsub, TOPIC_NAME); }); it('should subscribe to a topic by string', function(done) { - pubsub.makeReq_ = function(method, path) { - assert.equal(path, TOPIC_SUBCRIPTION_NAME); + pubsub.request = function(reqOpts) { + assert.equal(reqOpts.uri, TOPIC_SUBCRIPTION_NAME); done(); }; @@ -280,8 +254,8 @@ describe('PubSub', function() { }); it('should subscribe to a topic by Topic instance', function(done) { - pubsub.makeReq_ = function(method, path) { - assert.equal(path, TOPIC_SUBCRIPTION_NAME); + pubsub.request = function(reqOpts) { + assert.strictEqual(reqOpts.uri, TOPIC_SUBCRIPTION_NAME); done(); }; @@ -292,9 +266,9 @@ describe('PubSub', function() { it('should pass options to API request', function(done) { var opts = { pageSize: 10, pageToken: 'abc' }; - pubsub.makeReq_ = function(method, path, query) { - assert.equal(query.pageSize, opts.pageSize); - assert.equal(query.pageToken, opts.pageToken); + pubsub.request = function(reqOpts) { + assert.strictEqual(reqOpts.qs.pageSize, opts.pageSize); + assert.strictEqual(reqOpts.qs.pageToken, opts.pageToken); done(); }; @@ -305,7 +279,7 @@ describe('PubSub', function() { var error = new Error('Error'); var resp = { error: true }; - pubsub.makeReq_ = function(method, path, q, body, callback) { + pubsub.request = function(reqOpts, callback) { callback(error, resp); }; @@ -330,7 +304,7 @@ describe('PubSub', function() { var subFullName = 'projects/' + PROJECT_ID + '/subscriptions/' + subName; - pubsub.makeReq_ = function(method, path, query, body, callback) { + pubsub.request = function(reqOpts, callback) { callback(null, { subscriptions: [subName] }); }; @@ -346,7 +320,7 @@ describe('PubSub', function() { it('should return a query if more results exist', function() { var token = 'next-page-token'; - pubsub.makeReq_ = function(method, path, q, body, callback) { + pubsub.request = function(reqOpts, callback) { callback(null, { nextPageToken: token }); }; @@ -362,7 +336,7 @@ describe('PubSub', function() { it('should pass apiResponse to callback', function(done) { var resp = { success: true }; - pubsub.makeReq_ = function(method, path, q, body, callback) { + pubsub.request = function(reqOpts, callback) { callback(null, resp); }; @@ -373,15 +347,96 @@ describe('PubSub', function() { }); }); + describe('getTopics', function() { + var topicName = 'fake-topic'; + var apiResponse = { topics: [{ name: topicName }]}; + + beforeEach(function() { + pubsub.request = function(reqOpts, callback) { + callback(null, apiResponse); + }; + }); + + it('should accept a query and a callback', function(done) { + pubsub.getTopics({}, done); + }); + + it('should accept just a callback', function(done) { + pubsub.getTopics(done); + }); + + it('should build the right request', function(done) { + pubsub.request = function(reqOpts) { + assert.equal(reqOpts.uri, '/topics'); + done(); + }; + pubsub.getTopics(function() {}); + }); + + it('should return Topic instances with metadata', function(done) { + var topic = {}; + + pubsub.topic = function(name) { + assert.strictEqual(name, topicName); + return topic; + }; + + pubsub.getTopics(function(err, topics) { + assert.ifError(err); + assert.strictEqual(topics[0], topic); + assert.strictEqual(topics[0].metadata, apiResponse.topics[0]); + done(); + }); + }); + + it('should return a query if more results exist', function() { + var token = 'next-page-token'; + pubsub.request = function(reqOpts, callback) { + callback(null, { nextPageToken: token }); + }; + var query = { pageSize: 1 }; + pubsub.getTopics(query, function(err, topics, nextQuery) { + assert.ifError(err); + assert.strictEqual(query.pageSize, nextQuery.pageSize); + assert.equal(query.pageToken, token); + }); + }); + + it('should pass error if api returns an error', function() { + var error = new Error('Error'); + pubsub.request = function(reqOpts, callback) { + callback(error); + }; + pubsub.getTopics(function(err) { + assert.equal(err, error); + }); + }); + + it('should pass apiResponse to callback', function(done) { + var resp = { success: true }; + pubsub.request = function(reqOpts, callback) { + callback(null, resp); + }; + pubsub.getTopics(function(err, topics, nextQuery, apiResponse) { + assert.equal(resp, apiResponse); + done(); + }); + }); + }); + describe('subscribe', function() { var TOPIC_NAME = 'topic'; var TOPIC = { - name: 'projects/' + PROJECT_ID + '/topics/' + TOPIC_NAME + name: '/topics/' + TOPIC_NAME }; var SUB_NAME = 'subscription'; var SUBSCRIPTION = { - name: 'projects/' + PROJECT_ID + '/subscriptions/' + SUB_NAME + name: '/subscriptions/' + SUB_NAME + }; + + var apiResponse = { + name: 'subscription-name' }; it('should throw if no Topic is provided', function() { @@ -397,119 +452,133 @@ describe('PubSub', function() { }); it('should not require configuration options', function(done) { - pubsub.makeReq_ = function(method, path, qs, body, callback) { - callback(); + pubsub.request = function(reqOpts, callback) { + callback(null, apiResponse); }; pubsub.subscribe(TOPIC_NAME, SUB_NAME, done); }); - it('should cretae a topic object from a string', function(done) { + it('should create a topic object from a string', function(done) { + pubsub.request = util.noop; + pubsub.topic = function(topicName) { - assert.equal(topicName, TOPIC_NAME); - done(); + assert.strictEqual(topicName, TOPIC_NAME); + setImmediate(done); return TOPIC; }; pubsub.subscribe(TOPIC_NAME, SUB_NAME, assert.ifError); }); - it('should create a subscription object from a string', function(done) { - var opts = {}; + it('should send correct request', function(done) { + pubsub.topic = function(topicName) { + return { + name: topicName + }; + }; - pubsub.subscription = function(subName, options) { - assert.equal(subName, SUB_NAME); - assert.deepEqual(options, opts); + pubsub.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'PUT'); + assert.strictEqual(reqOpts.uri, SUBSCRIPTION.name); + assert.strictEqual(reqOpts.json.topic, TOPIC_NAME); done(); - return SUBSCRIPTION; }; - pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, assert.ifError); + pubsub.subscribe(TOPIC_NAME, SUB_NAME, assert.ifError); }); - it('should pass options to a created subscription object', function(done) { - var opts = { a: 'b', c: 'd' }; + it('should pass options to the api request', function(done) { + var opts = { ackDeadlineSeconds: 90 }; - pubsub.subscription = function(subName, options) { - assert.equal(subName, SUB_NAME); - assert.deepEqual(options, opts); + pubsub.request = function(reqOpts) { + assert.strictEqual(reqOpts.json.ackDeadlineSeconds, 90); done(); - return SUBSCRIPTION; }; pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, assert.ifError); }); - it('should send correct request', function(done) { - pubsub.makeReq_ = function(method, path, query, body) { - assert.equal(method, 'PUT'); - assert.equal(path, SUBSCRIPTION.name); - assert.equal(body.topic, TOPIC.name); - done(); - }; + describe('error', function() { + var error = new Error('Error.'); + var apiResponse = { name: SUB_NAME }; - pubsub.subscribe(TOPIC_NAME, SUB_NAME, assert.ifError); - }); + beforeEach(function() { + pubsub.request = function(reqOpts, callback) { + callback(error, apiResponse); + }; + }); - it('should re-use existing subscription if specified', function(done) { - pubsub.subscription = function() { - return SUBSCRIPTION; - }; + it('should re-use existing subscription if specified', function(done) { + pubsub.subscription = function() { + return SUBSCRIPTION; + }; - pubsub.makeReq_ = function(method, path, query, body, callback) { - callback({ code: 409 }); - }; + pubsub.request = function(reqOpts, callback) { + callback({ code: 409 }, apiResponse); + }; - // Don't re-use an existing subscription (error if one exists). - pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err) { - assert.equal(err.code, 409); + // Don't re-use an existing subscription (error if one exists). + pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err) { + assert.equal(err.code, 409); + }); + + // Re-use an existing subscription (ignore error if one exists). + var opts = { reuseExisting: true }; + pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, function(err, sub) { + assert.ifError(err); + assert.deepEqual(sub, SUBSCRIPTION); + + done(); + }); }); - // Re-use an existing subscription (ignore error if one exists). - var opts = { reuseExisting: true }; - pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, function(err, sub) { - assert.ifError(err); - assert.deepEqual(sub, SUBSCRIPTION); + it('should return error & API response to the callback', function(done) { + pubsub.request = function(reqOpts, callback) { + callback(error, apiResponse); + }; - done(); + pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err, sub, resp) { + assert.strictEqual(err, error); + assert.strictEqual(sub, null); + assert.strictEqual(resp, apiResponse); + done(); + }); }); }); - it('should return an api error to the callback', function(done) { - var error = new Error('Error.'); - - pubsub.makeReq_ = function(method, path, query, body, callback) { - callback(error); - }; + describe('success', function() { + var apiResponse = { name: SUB_NAME }; - pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err) { - assert.equal(err, error); - done(); + beforeEach(function() { + pubsub.request = function(reqOpts, callback) { + callback(null, apiResponse); + }; }); - }); - it('should return apiResponse to the callback', function(done) { - var resp = { success: true }; + it('should pass options to a new subscription object', function(done) { + var opts = { a: 'b', c: 'd' }; - pubsub.makeReq_ = function(method, path, query, body, callback) { - callback(null, resp); - }; + pubsub.subscription = function(subName, options) { + assert.strictEqual(subName, SUB_NAME); + assert.deepEqual(options, opts); + setImmediate(done); + return SUBSCRIPTION; + }; - pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err, sub, apiResponse) { - assert.deepEqual(resp, apiResponse); - done(); + pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, assert.ifError); }); - }); - - it('should pass options to the api request', function(done) { - var opts = { ackDeadlineSeconds: 90 }; - pubsub.makeReq_ = function(method, path, query, body) { - assert.strictEqual(body.ackDeadlineSeconds, opts.ackDeadlineSeconds); - done(); - }; + it('should return apiResponse to the callback', function(done) { + pubsub.request = function(reqOpts, callback) { + callback(null, apiResponse); + }; - pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, assert.ifError); + pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err, sub, resp) { + assert.strictEqual(resp, apiResponse); + done(); + }); + }); }); }); @@ -552,15 +621,15 @@ describe('PubSub', function() { }); }); - describe('makeReq_', function() { - it('should pass network requests to the connection object', function(done) { - var pubsub = new PubSub({ projectId: PROJECT_ID }); - - pubsub.makeAuthenticatedRequest_ = function() { - done(); - }; + describe('topic', function() { + it('should throw if a name is not provided', function() { + assert.throws(function() { + pubsub.topic(); + }, /name must be specified/); + }); - pubsub.makeReq_(null, null, null, null, assert.ifError); + it('should return a Topic object', function() { + assert(pubsub.topic('new-topic') instanceof Topic); }); }); }); diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index 9da29ab0113..25fbffb1da6 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -17,21 +17,33 @@ 'use strict'; var assert = require('assert'); +var extend = require('extend'); var mockery = require('mockery'); +var nodeutil = require('util'); + +var ServiceObject = require('../../lib/common/service-object.js'); var util = require('../../lib/common/util.js'); -var Subscription; function FakeIAM() { this.calledWith_ = [].slice.call(arguments); } +function FakeServiceObject() { + this.calledWith_ = arguments; + ServiceObject.apply(this, arguments); +} + +nodeutil.inherits(FakeServiceObject, ServiceObject); + describe('Subscription', function() { + var Subscription; + var subscription; + var PROJECT_ID = 'test-project'; var SUB_NAME = 'test-subscription'; var SUB_FULL_NAME = 'projects/' + PROJECT_ID + '/subscriptions/' + SUB_NAME; - var pubsubMock = { - projectId: PROJECT_ID, - makeReq_: util.noop + var PUBSUB = { + projectId: PROJECT_ID }; var message = 'howdy'; var messageBuffer = new Buffer(message).toString('base64'); @@ -49,19 +61,26 @@ describe('Subscription', function() { data: message, id: 7 }; - var subscription; before(function() { - mockery.registerMock('./iam', FakeIAM); + mockery.registerMock('../common/service-object.js', FakeServiceObject); + mockery.registerMock('./iam.js', FakeIAM); + mockery.enable({ useCleanCache: true, warnOnUnregistered: false }); + Subscription = require('../../lib/pubsub/subscription.js'); }); + after(function() { + mockery.deregisterAll(); + mockery.disable(); + }); + beforeEach(function() { - subscription = new Subscription(pubsubMock, { name: SUB_NAME }); + subscription = new Subscription(PUBSUB, { name: SUB_NAME }); }); describe('initialization', function() { @@ -71,7 +90,7 @@ describe('Subscription', function() { Subscription.formatName_ = formatName_; done(); }; - new Subscription(pubsubMock, { name: SUB_NAME }); + new Subscription(PUBSUB, { name: SUB_NAME }); }); it('should honor configuration settings', function() { @@ -81,7 +100,7 @@ describe('Subscription', function() { interval: 100, maxInProgress: 3 }; - var sub = new Subscription(pubsubMock, CONFIG); + var sub = new Subscription(PUBSUB, CONFIG); assert.strictEqual(sub.autoAck, CONFIG.autoAck); assert.strictEqual(sub.interval, CONFIG.interval); assert.strictEqual(sub.maxInProgress, 3); @@ -92,28 +111,23 @@ describe('Subscription', function() { }); it('should default autoAck to false if not specified', function() { - var sub = new Subscription(pubsubMock, { name: SUB_NAME }); - assert.strictEqual(sub.autoAck, false); + assert.strictEqual(subscription.autoAck, false); }); it('should set default interval if one is not specified', function() { - var sub = new Subscription(pubsubMock, { name: SUB_NAME }); - assert.equal(sub.interval, 10); + assert.equal(subscription.interval, 10); }); it('should start inProgressAckIds as an empty object', function() { - var sub = new Subscription(pubsubMock, { name: SUB_NAME }); - assert.equal(Object.keys(sub.inProgressAckIds).length, 0); + assert.deepEqual(subscription.inProgressAckIds, {}); }); it('should default maxInProgress to Infinity if not specified', function() { - var sub = new Subscription(pubsubMock, { name: SUB_NAME }); - assert.strictEqual(sub.maxInProgress, Infinity); + assert.strictEqual(subscription.maxInProgress, Infinity); }); it('should set messageListeners to 0', function() { - var sub = new Subscription(pubsubMock, { name: SUB_NAME }); - assert.strictEqual(sub.messageListeners, 0); + assert.strictEqual(subscription.messageListeners, 0); }); it('should not be paused', function() { @@ -122,73 +136,100 @@ describe('Subscription', function() { it('should create an iam object', function() { assert.deepEqual(subscription.iam.calledWith_, [ - pubsubMock, - SUB_FULL_NAME + PUBSUB, + { + baseUrl: '/subscriptions', + id: SUB_NAME + } ]); }); - }); - describe('formatName_', function() { - it('should format name', function() { - var formattedName = Subscription.formatName_(PROJECT_ID, SUB_NAME); - assert.equal(formattedName, SUB_FULL_NAME); - }); + it('should inherit from ServiceObject', function() { + assert(subscription instanceof ServiceObject); - it('should format name when given a complete name', function() { - var formattedName = Subscription.formatName_(PROJECT_ID, SUB_FULL_NAME); - assert.equal(formattedName, SUB_FULL_NAME); - }); - }); + var calledWith = subscription.calledWith_[0]; - describe('listenForEvents_', function() { - afterEach(function() { - subscription.removeAllListeners(); - }); - - it('should start pulling once a message listener is bound', function(done) { - subscription.startPulling_ = function() { - done(); - }; - subscription.on('message', util.noop); + assert.strictEqual(calledWith.parent, PUBSUB); + assert.strictEqual(calledWith.baseUrl, '/subscriptions'); + assert.strictEqual(calledWith.id, SUB_NAME); + assert.deepEqual(calledWith.methods, { + exists: true, + get: true, + getMetadata: true + }); }); - it('should track the number of listeners', function() { - subscription.startPulling_ = util.noop; - - assert.strictEqual(subscription.messageListeners, 0); + it('should allow creating if it is a Topic', function(done) { + var topicInstance = {}; - subscription.on('message', util.noop); - assert.strictEqual(subscription.messageListeners, 1); + var pubSubInstance = extend({}, PUBSUB, { + subscribe: { + bind: function(context, topic) { + assert.strictEqual(context, pubSubInstance); + assert.strictEqual(topic, topicInstance); + done(); + } + } + }); - subscription.removeListener('message', util.noop); - assert.strictEqual(subscription.messageListeners, 0); + var subscription = new Subscription(pubSubInstance, { + name: SUB_NAME, + topic: topicInstance + }); + assert(subscription instanceof ServiceObject); + + var calledWith = subscription.calledWith_[0]; + + assert.strictEqual(calledWith.parent, pubSubInstance); + assert.strictEqual(calledWith.baseUrl, '/subscriptions'); + assert.strictEqual(calledWith.id, SUB_NAME); + assert.deepEqual(calledWith.methods, { + create: true, + exists: true, + get: true, + getMetadata: true + }); }); + }); - it('should only run a single pulling loop', function() { - var startPullingCallCount = 0; + describe('formatMessage_', function() { + it('should decode stringified JSON to object', function() { + var obj = { hi: 'there' }; + var stringified = new Buffer(JSON.stringify(obj)).toString('base64'); + var attributes = {}; - subscription.startPulling_ = function() { - startPullingCallCount++; - }; + var msg = Subscription.formatMessage_({ + ackId: 3, + message: { + data: stringified, + messageId: 7, + attributes: attributes + } + }); - subscription.on('message', util.noop); - subscription.on('message', util.noop); + assert.deepEqual(msg, { + ackId: 3, + id: 7, + data: obj, + attributes: attributes + }); + }); - assert.strictEqual(startPullingCallCount, 1); + it('should decode buffer to string', function() { + var msg = Subscription.formatMessage_(messageObj.receivedMessages[0]); + assert.deepEqual(msg, expectedMessage); }); + }); - it('should close when no more message listeners are bound', function() { - subscription.startPulling_ = util.noop; - subscription.on('message', util.noop); - subscription.on('message', util.noop); - // 2 listeners: sub should be open. - assert.strictEqual(subscription.closed, false); - subscription.removeListener('message', util.noop); - // 1 listener: sub should be open. - assert.strictEqual(subscription.closed, false); - subscription.removeListener('message', util.noop); - // 0 listeners: sub should be closed. - assert.strictEqual(subscription.closed, true); + describe('formatName_', function() { + it('should format name', function() { + var formattedName = Subscription.formatName_(PROJECT_ID, SUB_NAME); + assert.equal(formattedName, SUB_FULL_NAME); + }); + + it('should format name when given a complete name', function() { + var formattedName = Subscription.formatName_(PROJECT_ID, SUB_FULL_NAME); + assert.equal(formattedName, SUB_FULL_NAME); }); }); @@ -204,20 +245,22 @@ describe('Subscription', function() { it('should accept a single id', function() { assert.doesNotThrow(function() { + subscription.request = util.noop; subscription.ack(1, util.noop); }); }); it('should accept an array of ids', function() { assert.doesNotThrow(function() { + subscription.request = util.noop; subscription.ack([1], util.noop); }); }); it('should make an array out of ids', function(done) { var ID = 1; - subscription.makeReq_ = function(method, path, qs, body) { - assert.deepEqual(body.ackIds, [ID]); + subscription.request = function(reqOpts) { + assert.deepEqual(reqOpts.json.ackIds, [ID]); done(); }; subscription.ack(ID, assert.ifError); @@ -225,16 +268,16 @@ describe('Subscription', function() { it('should make correct api request', function(done) { var IDS = [1, 2, 3]; - subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(path, SUB_FULL_NAME + ':acknowledge'); - assert.deepEqual(body.ackIds, IDS); + subscription.request = function(reqOpts) { + assert.strictEqual(reqOpts.uri, ':acknowledge'); + assert.deepEqual(reqOpts.json.ackIds, IDS); done(); }; subscription.ack(IDS, assert.ifError); }); it('should unmark the ack ids as being in progress', function(done) { - subscription.makeReq_ = function(method, path, query, body, callback) { + subscription.request = function(reqOpts, callback) { callback(); }; @@ -253,7 +296,7 @@ describe('Subscription', function() { }); it('should not unmark if there was an error', function(done) { - subscription.makeReq_ = function(method, path, query, body, callback) { + subscription.request = function(reqOpts, callback) { callback(new Error('Error.')); }; @@ -270,7 +313,7 @@ describe('Subscription', function() { }); it('should refresh paused status', function(done) { - subscription.makeReq_ = function(method, path, query, body, callback) { + subscription.request = function(reqOpts, callback) { callback(); }; @@ -282,7 +325,7 @@ describe('Subscription', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - subscription.makeReq_ = function(method, path, query, body, callback) { + subscription.request = function(reqOpts, callback) { callback(error); }; @@ -294,7 +337,7 @@ describe('Subscription', function() { it('should pass apiResponse to callback', function(done) { var resp = { success: true }; - subscription.makeReq_ = function(method, path, qs, body, callback) { + subscription.request = function(reqOpts, callback) { callback(null, resp); }; subscription.ack(1, function(err, apiResponse) { @@ -304,10 +347,68 @@ describe('Subscription', function() { }); }); + describe('delete', function() { + it('should delete a subscription', function(done) { + FakeServiceObject.prototype.delete = function() { + assert.strictEqual(this, subscription); + done(); + }; + subscription.delete(); + }); + + it('should close a subscription once deleted', function() { + FakeServiceObject.prototype.delete = function(callback) { + callback(); + }; + subscription.closed = false; + subscription.delete(); + assert.strictEqual(subscription.closed, true); + }); + + it('should remove all listeners', function(done) { + FakeServiceObject.prototype.delete = function(callback) { + callback(); + }; + subscription.removeAllListeners = function() { + done(); + }; + subscription.delete(); + }); + + it('should execute callback when deleted', function(done) { + FakeServiceObject.prototype.delete = function(callback) { + callback(); + }; + subscription.delete(done); + }); + + it('should execute callback with an api error', function(done) { + var error = new Error('Error.'); + FakeServiceObject.prototype.delete = function(callback) { + callback(error); + }; + subscription.delete(function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should execute callback with apiResponse', function(done) { + var resp = { success: true }; + FakeServiceObject.prototype.delete = function(callback) { + callback(null, resp); + }; + subscription.delete(function(err, apiResponse) { + assert.deepEqual(resp, apiResponse); + done(); + }); + }); + }); + describe('pull', function() { beforeEach(function() { subscription.ack = util.noop; - subscription.makeReq_ = function(method, path, qs, body, callback) { + subscription.request = function(reqOpts, callback) { callback(null, messageObj); }; }); @@ -317,27 +418,27 @@ describe('Subscription', function() { }); it('should default returnImmediately to false', function(done) { - subscription.makeReq_ = function(method, path, qs, body) { - assert.strictEqual(body.returnImmediately, false); + subscription.request = function(reqOpts) { + assert.strictEqual(reqOpts.json.returnImmediately, false); done(); }; subscription.pull({}, assert.ifError); }); it('should honor options', function(done) { - subscription.makeReq_ = function(method, path, qs, body) { - assert.strictEqual(body.returnImmediately, true); + subscription.request = function(reqOpts) { + assert.strictEqual(reqOpts.json.returnImmediately, true); done(); }; subscription.pull({ returnImmediately: true }, assert.ifError); }); it('should make correct api request', function(done) { - subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(method, 'POST'); - assert.equal(path, SUB_FULL_NAME + ':pull'); - assert.strictEqual(body.returnImmediately, false); - assert.equal(body.maxMessages, 1); + subscription.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, ':pull'); + assert.strictEqual(reqOpts.json.returnImmediately, false); + assert.strictEqual(reqOpts.json.maxMessages, 1); done(); }; @@ -346,7 +447,7 @@ describe('Subscription', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - subscription.makeReq_ = function(method, path, qs, body, callback) { + subscription.request = function(reqOpts, callback) { callback(error); }; subscription.pull(function(err) { @@ -417,7 +518,7 @@ describe('Subscription', function() { }); it('should not autoAck if no messages returned', function(done) { - subscription.makeReq_ = function(method, path, qs, body, callback) { + subscription.request = function(reqOpts, callback) { callback(null, { receivedMessages: [] }); }; subscription.ack = function() { @@ -470,7 +571,7 @@ describe('Subscription', function() { callback(null, { success: true }); }; - subscription.makeReq_ = function(method, path, qs, body, callback) { + subscription.request = function(reqOpts, callback) { callback(null, resp); }; @@ -482,6 +583,170 @@ describe('Subscription', function() { }); }); + describe('setAckDeadline', function() { + it('should set the ack deadline', function(done) { + subscription.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, ':modifyAckDeadline'); + assert.deepEqual(reqOpts.json, { + ackIds: [123], + ackDeadlineSeconds: 10 + }); + done(); + }; + subscription.setAckDeadline({ ackIds: [123], seconds: 10 }, done); + }); + + it('should execute the callback', function(done) { + subscription.request = function(reqOpts, callback) { + callback(); + }; + subscription.setAckDeadline({}, done); + }); + + it('should execute the callback with apiResponse', function(done) { + var resp = { success: true }; + subscription.request = function(reqOpts, callback) { + callback(null, resp); + }; + subscription.setAckDeadline({}, function(err, apiResponse) { + assert.deepEqual(resp, apiResponse); + done(); + }); + }); + }); + + describe('decorateMessage_', function() { + var message = { + ackId: 'b' + }; + + it('should return the message', function() { + var decoratedMessage = subscription.decorateMessage_(message); + assert.strictEqual(decoratedMessage.ackId, message.ackId); + }); + + it('should mark the message as being in progress', function() { + subscription.decorateMessage_(message); + assert.strictEqual(subscription.inProgressAckIds[message.ackId], true); + }); + + describe('ack', function() { + it('should add an ack function to ack', function() { + var decoratedMessage = subscription.decorateMessage_(message); + assert.equal(typeof decoratedMessage.ack, 'function'); + }); + + it('should pass the ackId to subscription.ack', function(done) { + subscription.ack = function(ackId, callback) { + assert.strictEqual(ackId, message.ackId); + callback(); + }; + + subscription.decorateMessage_(message).ack(done); + }); + }); + + describe('skip', function() { + it('should add a skip function', function() { + var decoratedMessage = subscription.decorateMessage_(message); + assert.equal(typeof decoratedMessage.skip, 'function'); + }); + + it('should unmark the message as being in progress', function() { + subscription.decorateMessage_(message).skip(); + + var inProgressAckIds = subscription.inProgressAckIds; + assert.strictEqual(inProgressAckIds[message.ackId], undefined); + }); + + it('should refresh the paused status', function(done) { + subscription.refreshPausedStatus_ = done; + subscription.decorateMessage_(message).skip(); + }); + }); + }); + + describe('refreshPausedStatus_', function() { + it('should pause if the ackIds in progress is too high', function() { + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + + subscription.maxInProgress = 2; + subscription.refreshPausedStatus_(); + assert.strictEqual(subscription.paused, true); + + subscription.maxInProgress = 3; + subscription.refreshPausedStatus_(); + assert.strictEqual(subscription.paused, true); + + subscription.maxInProgress = Infinity; + subscription.refreshPausedStatus_(); + assert.strictEqual(subscription.paused, false); + }); + + it('should start pulling if paused and listeners exist', function(done) { + subscription.startPulling_ = done; + + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + subscription.paused = true; + subscription.maxInProgress = Infinity; + subscription.messageListeners = 1; + subscription.refreshPausedStatus_(); + }); + }); + + describe('listenForEvents_', function() { + afterEach(function() { + subscription.removeAllListeners(); + }); + + it('should start pulling once a message listener is bound', function(done) { + subscription.startPulling_ = function() { + done(); + }; + subscription.on('message', util.noop); + }); + + it('should track the number of listeners', function() { + subscription.startPulling_ = util.noop; + + assert.strictEqual(subscription.messageListeners, 0); + + subscription.on('message', util.noop); + assert.strictEqual(subscription.messageListeners, 1); + + subscription.removeListener('message', util.noop); + assert.strictEqual(subscription.messageListeners, 0); + }); + + it('should only run a single pulling loop', function() { + var startPullingCallCount = 0; + + subscription.startPulling_ = function() { + startPullingCallCount++; + }; + + subscription.on('message', util.noop); + subscription.on('message', util.noop); + + assert.strictEqual(startPullingCallCount, 1); + }); + + it('should close when no more message listeners are bound', function() { + subscription.startPulling_ = util.noop; + subscription.on('message', util.noop); + subscription.on('message', util.noop); + // 2 listeners: sub should be open. + assert.strictEqual(subscription.closed, false); + subscription.removeListener('message', util.noop); + // 1 listener: sub should be open. + assert.strictEqual(subscription.closed, false); + subscription.removeListener('message', util.noop); + // 0 listeners: sub should be closed. + assert.strictEqual(subscription.closed, true); + }); + }); + describe('startPulling_', function() { beforeEach(function() { subscription.pull = util.noop; @@ -625,202 +890,4 @@ describe('Subscription', function() { subscription.startPulling_(); }); }); - - describe('delete', function() { - it('should delete a subscription', function(done) { - subscription.makeReq_ = function(method, path) { - assert.equal(method, 'DELETE'); - assert.equal(path, subscription.name); - done(); - }; - subscription.delete(); - }); - - it('should close a subscription once deleted', function() { - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(); - }; - subscription.closed = false; - subscription.delete(); - assert.strictEqual(subscription.closed, true); - }); - - it('should remove all listeners', function(done) { - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(); - }; - subscription.removeAllListeners = function() { - done(); - }; - subscription.delete(); - }); - - it('should execute callback when deleted', function(done) { - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(); - }; - subscription.delete(done); - }); - - it('should execute callback with an api error', function(done) { - var error = new Error('Error.'); - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(error); - }; - subscription.delete(function(err) { - assert.equal(err, error); - done(); - }); - }); - - it('should execute callback with apiResponse', function(done) { - var resp = { success: true }; - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(null, resp); - }; - subscription.delete(function(err, apiResponse) { - assert.deepEqual(resp, apiResponse); - done(); - }); - }); - }); - - describe('setAckDeadline', function() { - it('should set the ack deadline', function(done) { - subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(method, 'POST'); - assert.equal(path, this.name + ':modifyAckDeadline'); - assert.equal(qs, null); - assert.deepEqual(body, { ackIds: [123], ackDeadlineSeconds: 10 }); - done(); - }; - subscription.setAckDeadline({ ackIds: [123], seconds: 10 }, done); - }); - - it('should execute the callback', function(done) { - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(); - }; - subscription.setAckDeadline({}, done); - }); - - it('should execute the callback with apiResponse', function(done) { - var resp = { success: true }; - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(null, resp); - }; - subscription.setAckDeadline({}, function(err, apiResponse) { - assert.deepEqual(resp, apiResponse); - done(); - }); - }); - }); - - describe('decorateMessage_', function() { - var message = { - ackId: 'b' - }; - - it('should return the message', function() { - var decoratedMessage = subscription.decorateMessage_(message); - assert.strictEqual(decoratedMessage.ackId, message.ackId); - }); - - it('should mark the message as being in progress', function() { - subscription.decorateMessage_(message); - assert.strictEqual(subscription.inProgressAckIds[message.ackId], true); - }); - - describe('ack', function() { - it('should add an ack function to ack', function() { - var decoratedMessage = subscription.decorateMessage_(message); - assert.equal(typeof decoratedMessage.ack, 'function'); - }); - - it('should pass the ackId to subscription.ack', function(done) { - subscription.ack = function(ackId, callback) { - assert.strictEqual(ackId, message.ackId); - callback(); - }; - - subscription.decorateMessage_(message).ack(done); - }); - }); - - describe('skip', function() { - it('should add a skip function', function() { - var decoratedMessage = subscription.decorateMessage_(message); - assert.equal(typeof decoratedMessage.skip, 'function'); - }); - - it('should unmark the message as being in progress', function() { - subscription.decorateMessage_(message).skip(); - - var inProgressAckIds = subscription.inProgressAckIds; - assert.strictEqual(inProgressAckIds[message.ackId], undefined); - }); - - it('should refresh the paused status', function(done) { - subscription.refreshPausedStatus_ = done; - subscription.decorateMessage_(message).skip(); - }); - }); - }); - - describe('refreshPausedStatus_', function() { - it('should pause if the ackIds in progress is too high', function() { - subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; - - subscription.maxInProgress = 2; - subscription.refreshPausedStatus_(); - assert.strictEqual(subscription.paused, true); - - subscription.maxInProgress = 3; - subscription.refreshPausedStatus_(); - assert.strictEqual(subscription.paused, true); - - subscription.maxInProgress = Infinity; - subscription.refreshPausedStatus_(); - assert.strictEqual(subscription.paused, false); - }); - - it('should start pulling if paused and listeners exist', function(done) { - subscription.startPulling_ = done; - - subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; - subscription.paused = true; - subscription.maxInProgress = Infinity; - subscription.messageListeners = 1; - subscription.refreshPausedStatus_(); - }); - }); - - describe('formatMessage_', function() { - it('should decode stringified JSON to object', function() { - var obj = { hi: 'there' }; - var stringified = new Buffer(JSON.stringify(obj)).toString('base64'); - var attributes = {}; - - var msg = Subscription.formatMessage_({ - ackId: 3, - message: { - data: stringified, - messageId: 7, - attributes: attributes - } - }); - - assert.deepEqual(msg, { - ackId: 3, - id: 7, - data: obj, - attributes: attributes - }); - }); - - it('should decode buffer to string', function() { - var msg = Subscription.formatMessage_(messageObj.receivedMessages[0]); - assert.deepEqual(msg, expectedMessage); - }); - }); }); diff --git a/test/pubsub/topic.js b/test/pubsub/topic.js index 5e97b402008..a83a5cf81c3 100644 --- a/test/pubsub/topic.js +++ b/test/pubsub/topic.js @@ -17,60 +17,105 @@ 'use strict'; var assert = require('assert'); +var extend = require('extend'); var mockery = require('mockery'); +var nodeutil = require('util'); + var util = require('../../lib/common/util.js'); -var Topic; +var ServiceObject = require('../../lib/common/service-object.js'); function FakeIAM() { this.calledWith_ = [].slice.call(arguments); } +function FakeServiceObject() { + this.calledWith_ = arguments; + ServiceObject.apply(this, arguments); +} + +nodeutil.inherits(FakeServiceObject, ServiceObject); + describe('Topic', function() { + var Topic; + var topic; + var PROJECT_ID = 'test-project'; var TOPIC_NAME = 'test-topic'; - var TOPIC_FULL_NAME = 'projects/' + PROJECT_ID + '/topics/' + TOPIC_NAME; - var pubsubMock = { + var PUBSUB = { projectId: PROJECT_ID, - makeReq_: util.noop + createTopic: util.noop }; - var topic; before(function() { + mockery.registerMock('../common/service-object.js', FakeServiceObject); mockery.registerMock('./iam', FakeIAM); + mockery.enable({ useCleanCache: true, warnOnUnregistered: false }); + Topic = require('../../lib/pubsub/topic'); }); + after(function() { + mockery.deregisterAll(); + mockery.disable(); + }); + beforeEach(function() { - topic = new Topic(pubsubMock, TOPIC_NAME); + topic = new Topic(PUBSUB, TOPIC_NAME); }); describe('initialization', function() { + it('should inherit from ServiceObject', function(done) { + var pubsubInstance = extend({}, PUBSUB, { + createTopic: { + bind: function(context) { + assert.strictEqual(context, pubsubInstance); + done(); + } + } + }); + + var topic = new Topic(pubsubInstance, TOPIC_NAME); + assert(topic instanceof ServiceObject); + + var calledWith = topic.calledWith_[0]; + + assert.strictEqual(calledWith.parent, pubsubInstance); + assert.strictEqual(calledWith.baseUrl, '/topics'); + assert.strictEqual(calledWith.id, TOPIC_NAME); + assert.deepEqual(calledWith.methods, { + create: true, + delete: true, + exists: true, + get: true, + getMetadata: true + }); + }); + + it('should create an iam object', function() { + assert.deepEqual(topic.iam.calledWith_, [ + PUBSUB, + { + baseUrl: '/topics', + id: TOPIC_NAME + } + ]); + }); + it('should format name', function(done) { var formatName_ = Topic.formatName_; Topic.formatName_ = function() { Topic.formatName_ = formatName_; done(); }; - new Topic(pubsubMock, TOPIC_NAME); - }); - - it('should assign projectId to `this`', function() { - assert.equal(topic.projectId, PROJECT_ID); + new Topic(PUBSUB, TOPIC_NAME); }); it('should assign pubsub object to `this`', function() { - assert.deepEqual(topic.pubsub, pubsubMock); - }); - - it('should create an iam object', function() { - assert.deepEqual(topic.iam.calledWith_, [ - pubsubMock, - TOPIC_FULL_NAME - ]); + assert.deepEqual(topic.pubsub, PUBSUB); }); }); @@ -110,6 +155,31 @@ describe('Topic', function() { }); }); + describe('getSubscriptions', function() { + it('should accept just a callback', function(done) { + topic.pubsub.getSubscriptions = function(options, callback) { + assert.deepEqual(options, { topic: topic }); + callback(); + }; + + topic.getSubscriptions(done); + }); + + it('should pass correct args to pubsub#getSubscriptions', function(done) { + var opts = { a: 'b', c: 'd' }; + + topic.pubsub = { + getSubscriptions: function(options, callback) { + assert.deepEqual(options, opts); + assert.deepEqual(options.topic, topic); + callback(); + } + }; + + topic.getSubscriptions(opts, done); + }); + }); + describe('publish', function() { var message = 'howdy'; var messageObject = { data: message }; @@ -131,11 +201,10 @@ describe('Topic', function() { }); it('should send correct api request', function(done) { - topic.makeReq_ = function(method, path, query, body) { - assert.equal(method, 'POST'); - assert.equal(path, topic.name + ':publish'); - assert.strictEqual(query, null); - assert.deepEqual(body, { + topic.request = function(reqOpts) { + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, ':publish'); + assert.deepEqual(reqOpts.json, { messages: [ { data: new Buffer(JSON.stringify(message)).toString('base64') } ] @@ -147,142 +216,44 @@ describe('Topic', function() { }); it('should execute callback', function(done) { - topic.makeReq_ = function(method, path, query, body, callback) { - callback(); + topic.request = function(reqOpts, callback) { + callback(null, {}); }; topic.publish(messageObject, done); }); - it('should execute callback with apiResponse', function(done) { - var resp = { success: true }; - topic.makeReq_ = function(method, path, query, body, callback) { - callback(null, resp); - }; - - topic.publish(messageObject, function(err, ackIds, apiResponse) { - assert.deepEqual(resp, apiResponse); - done(); - }); - }); - }); + it('should execute callback with error', function(done) { + var error = new Error('Error.'); + var apiResponse = {}; - describe('delete', function() { - it('should delete a topic', function(done) { - topic.makeReq_ = function(method, path) { - assert.equal(method, 'DELETE'); - assert.equal(path, topic.name); - done(); + topic.request = function(reqOpts, callback) { + callback(error, apiResponse); }; - topic.delete(); - }); - it('should call the callback', function(done) { - topic.makeReq_ = function(method, path, q, body, callback) { - callback(); - }; - topic.delete(done); - }); + topic.publish(messageObject, function(err, ackIds, apiResponse_) { + assert.strictEqual(err, error); + assert.strictEqual(ackIds, null); + assert.strictEqual(apiResponse_, apiResponse); - it('should call the callback with apiResponse', function(done) { - var resp = { success: true }; - topic.makeReq_ = function(method, path, q, body, callback) { - callback(null, resp); - }; - topic.delete(function(err, apiResponse) { - assert.deepEqual(resp, apiResponse); done(); }); }); - }); - describe('getMetadata', function() { - it('should make the correct API request', function(done) { - topic.makeReq_ = function(method, path, query, body) { - assert.strictEqual(method, 'GET'); - assert.strictEqual(path, topic.name); - assert.strictEqual(query, null); - assert.strictEqual(body, null); + it('should execute callback with apiResponse', function(done) { + var resp = { success: true }; - done(); + topic.request = function(reqOpts, callback) { + callback(null, resp); }; - topic.getMetadata(assert.ifError); - }); - - describe('error', function() { - var error = new Error('Error.'); - var apiResponse = { a: 'b', c: 'd' }; - - beforeEach(function() { - topic.makeReq_ = function(method, path, query, body, callback) { - callback(error, apiResponse); - }; - }); - - it('should execute callback with error & API response', function(done) { - topic.getMetadata(function(err, metadata, apiResponse_) { - assert.strictEqual(err, error); - assert.strictEqual(metadata, null); - assert.strictEqual(apiResponse_, apiResponse); - done(); - }); - }); - }); - - describe('success', function() { - var apiResponse = { a: 'b', c: 'd' }; - - beforeEach(function() { - topic.makeReq_ = function(method, path, query, body, callback) { - callback(null, apiResponse); - }; - }); - - it('should assign the response to the metadata property', function(done) { - topic.getMetadata(function(err) { - assert.ifError(err); - assert.strictEqual(topic.metadata, apiResponse); - done(); - }); - }); - - it('should exec callback with metadata & API response', function(done) { - topic.getMetadata(function(err, metadata, apiResponse_) { - assert.ifError(err); - assert.strictEqual(metadata, apiResponse); - assert.strictEqual(apiResponse_, apiResponse); - done(); - }); + topic.publish(messageObject, function(err, ackIds, apiResponse) { + assert.deepEqual(resp, apiResponse); + done(); }); }); }); - describe('getSubscriptions', function() { - it('should accept just a callback', function(done) { - topic.pubsub.getSubscriptions = function(options, callback) { - assert.deepEqual(options, { topic: topic }); - callback(); - }; - - topic.getSubscriptions(done); - }); - - it('should pass correct args to pubsub#getSubscriptions', function(done) { - var opts = { a: 'b', c: 'd' }; - - topic.pubsub = { - getSubscriptions: function(options, callback) { - assert.deepEqual(options, opts); - assert.deepEqual(options.topic, topic); - callback(); - } - }; - - topic.getSubscriptions(opts, done); - }); - }); - describe('subscribe', function() { it('should pass correct arguments to pubsub#subscribe', function(done) { var subscriptionName = 'subName'; @@ -313,6 +284,15 @@ describe('Topic', function() { topic.subscription(subscriptionName, opts); }); + it('should attach the topic instance to the options', function(done) { + topic.pubsub.subscription = function(name, options) { + assert.strictEqual(options.topic, topic); + done(); + }; + + topic.subscription(); + }); + it('should return the result', function(done) { topic.pubsub.subscription = function() { return done;