message-stream.js

"use strict";
/*!
 * Copyright 2018 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Object.defineProperty(exports, "__esModule", { value: true });
const promisify_1 = require("@google-cloud/promisify");
const grpc_js_1 = require("@grpc/grpc-js");
const isStreamEnded = require("is-stream-ended");
const stream_1 = require("stream");
const pull_retry_1 = require("./pull-retry");
/*!
 * Frequency to ping streams.
 */
const KEEP_ALIVE_INTERVAL = 30000;
/*!
 * Deadline for the stream.
 */
const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces['google.pubsub.v1.Subscriber'].methods.StreamingPull.timeout_millis;
/*!
 * default stream options
 */
const DEFAULT_OPTIONS = {
    highWaterMark: 0,
    maxStreams: 5,
    timeout: 300000,
};
/**
 * Error wrapper for gRPC status objects.
 *
 * @class
 *
 * @param {object} status The gRPC status object.
 */
class StatusError extends Error {
    constructor(status) {
        super(status.details);
        this.code = status.code;
        this.details = status.details;
        this.metadata = status.metadata;
    }
}
exports.StatusError = StatusError;
/**
 * Error thrown when we fail to open a channel for the message stream.
 *
 * @class
 *
 * @param {Error} err The original error.
 */
class ChannelError extends Error {
    constructor(err) {
        super(`Failed to connect to channel. Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
        this.code = err.message.includes('deadline')
            ? grpc_js_1.status.DEADLINE_EXCEEDED
            : grpc_js_1.status.UNKNOWN;
        this.details = err.message;
        this.metadata = new grpc_js_1.Metadata();
    }
}
exports.ChannelError = ChannelError;
/**
 * @typedef {object} MessageStreamOptions
 * @property {number} [highWaterMark=0] Configures the Buffer level for all
 *     underlying streams. See
 *     {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
 *     more details.
 * @property {number} [maxStreams=5] Number of streaming connections to make.
 * @property {number} [timeout=300000] Timeout for establishing a connection.
 */
/**
 * Streaming class used to manage multiple StreamingPull requests.
 *
 * @private
 * @class
 *
 * @param {Subscriber} sub The parent subscriber.
 * @param {MessageStreamOptions} [options] The message stream options.
 */
class MessageStream extends stream_1.PassThrough {
    constructor(sub, options = {}) {
        options = Object.assign({}, DEFAULT_OPTIONS, options);
        super({ objectMode: true, highWaterMark: options.highWaterMark });
        this.destroyed = false;
        this._options = options;
        this._retrier = new pull_retry_1.PullRetry();
        this._streams = new Map();
        this._subscriber = sub;
        this._fillStreamPool();
        this._keepAliveHandle = setInterval(() => this._keepAlive(), KEEP_ALIVE_INTERVAL);
        this._keepAliveHandle.unref();
    }
    /**
     * Destroys the stream and any underlying streams.
     *
     * @param {error?} err An error to emit, if any.
     * @private
     */
    destroy(err) {
        if (this.destroyed) {
            return;
        }
        this.destroyed = true;
        clearInterval(this._keepAliveHandle);
        for (const stream of this._streams.keys()) {
            this._removeStream(stream);
            stream.cancel();
        }
        if (typeof super.destroy === 'function') {
            return super.destroy(err);
        }
        process.nextTick(() => {
            if (err) {
                this.emit('error', err);
            }
            this.emit('close');
        });
    }
    /**
     * Adds a StreamingPull stream to the combined stream.
     *
     * @private
     *
     * @param {stream} stream The StreamingPull stream.
     */
    _addStream(stream) {
        this._setHighWaterMark(stream);
        this._streams.set(stream, false);
        stream
            .on('error', err => this._onError(stream, err))
            .once('status', status => this._onStatus(stream, status))
            .pipe(this, { end: false });
    }
    /**
     * Attempts to create and cache the desired number of StreamingPull requests.
     * gRPC does not supply a way to confirm that a stream is connected, so our
     * best bet is to open the streams and use the client.waitForReady() method to
     * confirm everything is ok.
     *
     * @private
     *
     * @returns {Promise}
     */
    async _fillStreamPool() {
        let client;
        try {
            client = await this._getClient();
        }
        catch (e) {
            this.destroy(e);
        }
        if (this.destroyed) {
            return;
        }
        const deadline = Date.now() + PULL_TIMEOUT;
        const request = {
            subscription: this._subscriber.name,
            streamAckDeadlineSeconds: this._subscriber.ackDeadline,
        };
        delete this._fillHandle;
        for (let i = this._streams.size; i < this._options.maxStreams; i++) {
            const stream = client.streamingPull({ deadline });
            this._addStream(stream);
            stream.write(request);
        }
        try {
            await this._waitForClientReady(client);
        }
        catch (e) {
            this.destroy(e);
        }
    }
    /**
     * It is critical that we keep as few `PullResponse` objects in memory as
     * possible to reduce the number of potential redeliveries. Because of this we
     * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
     * stream, doing so essentially doubles the size of our readable buffer.
     *
     * @private
     *
     * @returns {Promise.<object>}
     */
    async _getClient() {
        const client = await this._subscriber.getClient();
        return client.getSubscriberStub();
    }
    /**
     * Since we do not use the streams to ack/modAck messages, they will close
     * by themselves unless we periodically send empty messages.
     *
     * @private
     */
    _keepAlive() {
        this._streams.forEach((receivedStatus, stream) => {
            // its possible that a status event fires off (signaling the rpc being
            // closed) but the stream hasn't drained yet, writing to this stream will
            // result in a `write after end` error
            if (!receivedStatus) {
                stream.write({});
            }
        });
    }
    /**
     * Once the stream has nothing left to read, we'll remove it and attempt to
     * refill our stream pool if needed.
     *
     * @private
     *
     * @param {Duplex} stream The ended stream.
     * @param {object} status The stream status.
     */
    _onEnd(stream, status) {
        this._removeStream(stream);
        if (this._fillHandle) {
            return;
        }
        if (this._retrier.retry(status)) {
            const delay = this._retrier.createTimeout();
            this._fillHandle = setTimeout(() => this._fillStreamPool(), delay);
        }
        else if (!this._streams.size) {
            this.destroy(new StatusError(status));
        }
    }
    /**
     * gRPC will usually emit a status as a ServiceError via `error` event before
     * it emits the status itself. In order to cut back on emitted errors, we'll
     * wait a tick on error and ignore it if the status has been received.
     *
     * @private
     *
     * @param {stream} stream The stream that errored.
     * @param {Error} err The error.
     */
    async _onError(stream, err) {
        await promisify_1.promisify(setImmediate)();
        const code = err.code;
        const receivedStatus = this._streams.get(stream) !== false;
        if (typeof code !== 'number' || !receivedStatus) {
            this.emit('error', err);
        }
    }
    /**
     * gRPC streams will emit a status event once the connection has been
     * terminated. This is preferable to end/close events because we'll receive
     * information as to why the stream closed and if it is safe to open another.
     *
     * @private
     *
     * @param {stream} stream The stream that was closed.
     * @param {object} status The status message stating why it was closed.
     */
    _onStatus(stream, status) {
        if (this.destroyed) {
            return;
        }
        this._streams.set(stream, true);
        if (isStreamEnded(stream)) {
            this._onEnd(stream, status);
        }
        else {
            stream.once('end', () => this._onEnd(stream, status));
            stream.push(null);
        }
    }
    /**
     * Removes a stream from the combined stream.
     *
     * @private
     *
     * @param {stream} stream The stream to remove.
     */
    _removeStream(stream) {
        stream.unpipe(this);
        this._streams.delete(stream);
    }
    /**
     * Neither gRPC or gax allow for the highWaterMark option to be specified.
     * However using the default value (16) it is possible to end up with a lot of
     * PullResponse objects stored in internal buffers. If this were to happen
     * and the client were slow to process messages, we could potentially see a
     * very large number of redeliveries happen before the messages even made it
     * to the client.
     *
     * @private
     *
     * @param {Duplex} stream The duplex stream to adjust the
     *     highWaterMarks for.
     */
    _setHighWaterMark(stream) {
        stream._readableState.highWaterMark = this._options.highWaterMark;
    }
    /**
     * Promisified version of gRPCs Client#waitForReady function.
     *
     * @private
     *
     * @param {object} client The gRPC client to wait for.
     * @returns {Promise}
     */
    async _waitForClientReady(client) {
        const deadline = Date.now() + this._options.timeout;
        try {
            await promisify_1.promisify(client.waitForReady).call(client, deadline);
        }
        catch (e) {
            throw new ChannelError(e);
        }
    }
}
exports.MessageStream = MessageStream;
//# sourceMappingURL=message-stream.js.map