lease-manager.js

"use strict";
/*!
 * Copyright 2018 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = require("events");
const os_1 = require("os");
/**
 * @typedef {object} FlowControlOptions
 * @property {boolean} [allowExcessMessages=true] PubSub delivers messages in
 *     batches with no way to configure the batch size. Sometimes this can be
 *     overwhelming if you only want to process a few messages at a time.
 *     Setting this option to false will make the client manage any excess
 *     messages until you're ready for them. This will prevent them from being
 *     redelivered and make the maxMessages option behave more predictably.
 * @property {number} [maxBytes] The desired amount of memory to allow message
 *     data to consume, defaults to 20% of available memory. Its possible that
 *     this value will be exceeded since messages are received in batches.
 * @property {number} [maxExtension=Infinity] The maximum duration (in seconds)
 *      to extend the message deadline before redelivering.
 * @property {number} [maxMessages=100] The desired number of messages to allow
 *     in memory before pausing the message stream. Unless allowExcessMessages
 *     is set to false, it is very likely that this value will be exceeded since
 *     any given message batch could contain a greater number of messages than
 *     the desired amount of messages.
 */
/**
 * Manages a Subscribers inventory while auto-magically extending the message
 * deadlines.
 *
 * @private
 * @class
 *
 * @param {Subscriber} sub The subscriber to manage leases for.
 * @param {FlowControlOptions} options Flow control options.
 */
class LeaseManager extends events_1.EventEmitter {
    constructor(sub, options = {}) {
        super();
        this.bytes = 0;
        this._isLeasing = false;
        this._messages = new Set();
        this._pending = [];
        this._subscriber = sub;
        this.setOptions(options);
    }
    /**
     * @type {number}
     * @private
     */
    get pending() {
        return this._pending.length;
    }
    /**
     * @type {number}
     * @private
     */
    get size() {
        return this._messages.size;
    }
    /**
     * Adds a message to the inventory, kicking off the deadline extender if it
     * isn't already running.
     *
     * @param {Message} message The message.
     * @private
     */
    add(message) {
        const { allowExcessMessages } = this._options;
        const wasFull = this.isFull();
        this._messages.add(message);
        this.bytes += message.length;
        if (allowExcessMessages || !wasFull) {
            this._dispense(message);
        }
        else {
            this._pending.push(message);
        }
        if (!this._isLeasing) {
            this._isLeasing = true;
            this._scheduleExtension();
        }
        if (!wasFull && this.isFull()) {
            this.emit('full');
        }
    }
    /**
     * Removes ALL messages from inventory.
     * @private
     */
    clear() {
        const wasFull = this.isFull();
        this._pending = [];
        this._messages.clear();
        this.bytes = 0;
        if (wasFull) {
            process.nextTick(() => this.emit('free'));
        }
        this._cancelExtension();
    }
    /**
     * Indicates if we're at or over capacity.
     *
     * @returns {boolean}
     * @private
     */
    isFull() {
        const { maxBytes, maxMessages } = this._options;
        return this.size >= maxMessages || this.bytes >= maxBytes;
    }
    /**
     * Removes a message from the inventory. Stopping the deadline extender if no
     * messages are left over.
     *
     * @fires LeaseManager#free
     *
     * @param {Message} message The message to remove.
     * @private
     */
    remove(message) {
        if (!this._messages.has(message)) {
            return;
        }
        const wasFull = this.isFull();
        this._messages.delete(message);
        this.bytes -= message.length;
        if (wasFull && !this.isFull()) {
            process.nextTick(() => this.emit('free'));
        }
        else if (this._pending.includes(message)) {
            const index = this._pending.indexOf(message);
            this._pending.splice(index, 1);
        }
        else if (this.pending > 0) {
            this._dispense(this._pending.shift());
        }
        if (this.size === 0 && this._isLeasing) {
            this._cancelExtension();
        }
    }
    /**
     * Sets options for the LeaseManager.
     *
     * @param {FlowControlOptions} [options] The options.
     * @private
     */
    setOptions(options) {
        const defaults = {
            allowExcessMessages: true,
            maxBytes: os_1.freemem() * 0.2,
            maxExtension: Infinity,
            maxMessages: 100,
        };
        this._options = Object.assign(defaults, options);
    }
    /**
     * Stops extending message deadlines.
     *
     * @private
     */
    _cancelExtension() {
        this._isLeasing = false;
        if (this._timer) {
            clearTimeout(this._timer);
            delete this._timer;
        }
    }
    /**
     * Emits the message. Emitting messages is very slow, so to avoid it acting
     * as a bottleneck, we're wrapping it in nextTick.
     *
     * @private
     *
     * @fires Subscriber#message
     *
     * @param {Message} message The message to emit.
     */
    _dispense(message) {
        if (this._subscriber.isOpen) {
            process.nextTick(() => this._subscriber.emit('message', message));
        }
    }
    /**
     * Loops through inventory and extends the deadlines for any messages that
     * have not hit the max extension option.
     *
     * @private
     */
    _extendDeadlines() {
        const deadline = this._subscriber.ackDeadline;
        for (const message of this._messages) {
            const lifespan = (Date.now() - message.received) / 1000;
            if (lifespan < this._options.maxExtension) {
                message.modAck(deadline);
            }
            else {
                this.remove(message);
            }
        }
        if (this._isLeasing) {
            this._scheduleExtension();
        }
    }
    /**
     * Creates a timeout(ms) that should allow us to extend any message deadlines
     * before they would be redelivered.
     *
     * @private
     *
     * @returns {number}
     */
    _getNextExtensionTimeoutMs() {
        const jitter = Math.random();
        const deadline = this._subscriber.ackDeadline * 1000;
        const latency = this._subscriber.modAckLatency;
        return (deadline * 0.9 - latency) * jitter;
    }
    /**
     * Schedules an deadline extension for all messages.
     *
     * @private
     */
    _scheduleExtension() {
        const timeout = this._getNextExtensionTimeoutMs();
        this._timer = setTimeout(() => this._extendDeadlines(), timeout);
    }
}
exports.LeaseManager = LeaseManager;
//# sourceMappingURL=lease-manager.js.map