publisher.js

"use strict";
/*!
 * Copyright 2017 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Object.defineProperty(exports, "__esModule", { value: true });
const promisify_1 = require("@google-cloud/promisify");
const arrify = require("arrify");
const each = require('async-each');
const extend = require("extend");
const is_1 = require("@sindresorhus/is");
/**
 * @typedef BatchPublishOptions
 * @property {number} [maxBytes=1024^2 * 5] The maximum number of bytes to
 *     buffer before sending a payload.
 * @property {number} [maxMessages=1000] The maximum number of messages to
 *     buffer before sending a payload.
 * @property {number} [maxMilliseconds=100] The maximum duration to wait before
 *     sending a payload.
 */
/**
 * @typedef PublishOptions
 * @property {BatchPublishOptions} [batching] Batching settings.
 * @property {object} [gaxOpts] Request configuration options, outlined
 *     here: https://googleapis.github.io/gax-nodejs/CallSettings.html.
 */
/**
 * A Publisher object allows you to publish messages to a specific topic.
 *
 * @private
 * @class
 *
 * @see [Topics: publish API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/publish}
 *
 * @param {Topic} topic The topic associated with this publisher.
 * @param {PublishOptions} [options] Configuration object.
 *
 * @example
 * const {PubSub} = require('@google-cloud/pubsub');
 * const pubsub = new PubSub();
 *
 * const topic = pubsub.topic('my-topic');
 * const publisher = topic.publisher();
 */
class Publisher {
    constructor(topic, options) {
        if (topic.Promise) {
            this.Promise = topic.Promise;
        }
        this.setOptions(options);
        this.topic = topic;
        // this object keeps track of all messages scheduled to be published
        // queued is essentially the `messages` field for the publish rpc req opts
        // bytes is used to track the size of the combined payload
        // callbacks is an array of callbacks - each callback is associated with a
        // specific message.
        this.inventory_ = {
            callbacks: [],
            queued: [],
            bytes: 0,
        };
    }
    publish(data, attributesOrCallback, callback) {
        if (!(data instanceof Buffer)) {
            throw new TypeError('Data must be in the form of a Buffer.');
        }
        const attributes = typeof attributesOrCallback === 'object' ? attributesOrCallback : {};
        callback =
            typeof attributesOrCallback === 'function'
                ? attributesOrCallback
                : callback;
        // Ensure the `attributes` object only has string values
        for (const key of Object.keys(attributes)) {
            const value = attributes[key];
            if (!is_1.default.string(value)) {
                throw new TypeError(`All attributes must be in the form of a string.
\nInvalid value of type "${typeof value}" provided for "${key}".`);
            }
        }
        const opts = this.settings.batching;
        // if this message puts us over the maxBytes option, then let's ship
        // what we have and add it to the next batch
        if (this.inventory_.bytes > 0 &&
            this.inventory_.bytes + data.length > opts.maxBytes) {
            this.publish_();
        }
        // add it to the queue!
        this.queue_(data, attributes, callback);
        // next lets check if this message brings us to the message cap or if we
        // hit the max byte limit
        const hasMaxMessages = this.inventory_.queued.length === opts.maxMessages;
        if (this.inventory_.bytes >= opts.maxBytes || hasMaxMessages) {
            this.publish_();
            return;
        }
        // otherwise let's set a timeout to send the next batch
        if (!this.timeoutHandle_) {
            this.timeoutHandle_ = setTimeout(this.publish_.bind(this), opts.maxMilliseconds);
        }
    }
    /**
     * Sets the Publisher options.
     *
     * @private
     *
     * @param {PublishOptions} options The publisher options.
     */
    setOptions(options = {}) {
        const defaults = {
            batching: {
                maxBytes: Math.pow(1024, 2) * 5,
                maxMessages: 1000,
                maxMilliseconds: 100,
            },
        };
        const { batching, gaxOpts } = extend(true, defaults, options);
        this.settings = {
            batching: {
                maxBytes: Math.min(batching.maxBytes, Math.pow(1024, 2) * 9),
                maxMessages: Math.min(batching.maxMessages, 1000),
                maxMilliseconds: batching.maxMilliseconds,
            },
            gaxOpts,
        };
    }
    /**
     * This publishes a batch of messages and should never be called directly.
     *
     * @private
     */
    publish_() {
        const callbacks = this.inventory_.callbacks;
        const messages = this.inventory_.queued;
        this.inventory_.callbacks = [];
        this.inventory_.queued = [];
        this.inventory_.bytes = 0;
        if (this.timeoutHandle_) {
            clearTimeout(this.timeoutHandle_);
            delete this.timeoutHandle_;
        }
        const reqOpts = {
            topic: this.topic.name,
            messages,
        };
        this.topic.request({
            client: 'PublisherClient',
            method: 'publish',
            reqOpts,
            gaxOpts: this.settings.gaxOpts,
        }, (err, resp) => {
            const messageIds = arrify(resp && resp.messageIds);
            each(callbacks, (callback, next) => {
                const messageId = messageIds[callbacks.indexOf(callback)];
                callback(err, messageId);
                next();
            });
        });
    }
    queue_(data, attrs, callback) {
        this.inventory_.queued.push({
            data,
            attributes: attrs,
        });
        this.inventory_.bytes += data.length;
        this.inventory_.callbacks.push(callback);
    }
}
exports.Publisher = Publisher;
/*! Developer Documentation
 *
 * All async methods (except for streams) will return a Promise in the event
 * that a callback is omitted.
 */
promisify_1.promisifyAll(Publisher, {
    singular: true,
    exclude: ['setOptions'],
});
//# sourceMappingURL=publisher.js.map