message-queues.js

"use strict";
/*!
 * Copyright 2018 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Object.defineProperty(exports, "__esModule", { value: true });
const defer = require("p-defer");
/**
 * Error class used to signal a batch failure.
 *
 * @class
 *
 * @param {string} message The error message.
 * @param {ServiceError} err The grpc service error.
 */
class BatchError extends Error {
    constructor(err, ackIds, rpc) {
        super(`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
        this.ackIds = ackIds;
        this.code = err.code;
        this.details = err.details;
        this.metadata = err.metadata;
    }
}
exports.BatchError = BatchError;
/**
 * @typedef {object} BatchOptions
 * @property {object} [callOptions] Request configuration option, outlined
 *     here: {@link https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html}.
 * @property {number} [maxMessages=3000] Maximum number of messages allowed in
 *     each batch sent.
 * @property {number} [maxMilliseconds=100] Maximum duration to wait before
 *     sending a batch. Batches can be sent earlier if the maxMessages option
 *     is met before the configured duration has passed.
 */
/**
 * Class for buffering ack/modAck requests.
 *
 * @private
 * @class
 *
 * @param {Subscriber} sub The subscriber we're queueing requests for.
 * @param {BatchOptions} options Batching options.
 */
class MessageQueue {
    constructor(sub, options = {}) {
        this.numPendingRequests = 0;
        this._requests = [];
        this._subscriber = sub;
        this.setOptions(options);
    }
    /**
     * Gets the default buffer time in ms.
     *
     * @returns {number}
     * @private
     */
    get maxMilliseconds() {
        return this._options.maxMilliseconds;
    }
    /**
     * Adds a message to the queue.
     *
     * @param {Message} message The message to add.
     * @param {number} [deadline] The deadline.
     * @private
     */
    add({ ackId }, deadline) {
        const { maxMessages, maxMilliseconds } = this._options;
        this._requests.push([ackId, deadline]);
        this.numPendingRequests += 1;
        if (this._requests.length >= maxMessages) {
            this.flush();
        }
        else if (!this._timer) {
            this._timer = setTimeout(() => this.flush(), maxMilliseconds);
        }
    }
    /**
     * Sends a batch of messages.
     * @private
     */
    async flush() {
        if (this._timer) {
            clearTimeout(this._timer);
            delete this._timer;
        }
        const batch = this._requests;
        const batchSize = batch.length;
        const deferred = this._onFlush;
        this._requests = [];
        this.numPendingRequests -= batchSize;
        delete this._onFlush;
        try {
            await this._sendBatch(batch);
        }
        catch (e) {
            this._subscriber.emit('error', e);
        }
        if (deferred) {
            deferred.resolve();
        }
    }
    /**
     * Returns a promise that resolves after the next flush occurs.
     *
     * @returns {Promise}
     * @private
     */
    onFlush() {
        if (!this._onFlush) {
            this._onFlush = defer();
        }
        return this._onFlush.promise;
    }
    /**
     * Set the batching options.
     *
     * @param {BatchOptions} options Batching options.
     * @private
     */
    setOptions(options) {
        const defaults = { maxMessages: 3000, maxMilliseconds: 100 };
        this._options = Object.assign(defaults, options);
    }
}
exports.MessageQueue = MessageQueue;
/**
 * Queues up Acknowledge (ack) requests.
 *
 * @private
 * @class
 */
class AckQueue extends MessageQueue {
    /**
     * Sends a batch of ack requests.
     *
     * @private
     *
     * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
     * @return {Promise}
     */
    async _sendBatch(batch) {
        const client = await this._subscriber.getClient();
        const ackIds = batch.map(([ackId]) => ackId);
        const reqOpts = { subscription: this._subscriber.name, ackIds };
        try {
            await client.acknowledge(reqOpts, this._options.callOptions);
        }
        catch (e) {
            throw new BatchError(e, ackIds, 'acknowledge');
        }
    }
}
exports.AckQueue = AckQueue;
/**
 * Queues up ModifyAckDeadline requests and sends them out in batches.
 *
 * @private
 * @class
 */
class ModAckQueue extends MessageQueue {
    /**
     * Sends a batch of modAck requests. Each deadline requires its own request,
     * so we have to group all the ackIds by deadline and send multiple requests.
     *
     * @private
     *
     * @param {Array.<Array.<string|number>>} batch Array of ackIds and deadlines.
     * @return {Promise}
     */
    async _sendBatch(batch) {
        const client = await this._subscriber.getClient();
        const subscription = this._subscriber.name;
        const modAckTable = batch.reduce((table, [ackId, deadline]) => {
            if (!table[deadline]) {
                table[deadline] = [];
            }
            table[deadline].push(ackId);
            return table;
        }, {});
        const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
            const ackIds = modAckTable[deadline];
            const ackDeadlineSeconds = Number(deadline);
            const reqOpts = { subscription, ackIds, ackDeadlineSeconds };
            try {
                await client.modifyAckDeadline(reqOpts, this._options.callOptions);
            }
            catch (e) {
                throw new BatchError(e, ackIds, 'modifyAckDeadline');
            }
        });
        await Promise.all(modAckRequests);
    }
}
exports.ModAckQueue = ModAckQueue;
//# sourceMappingURL=message-queues.js.map