"use strict";
/*!
* Copyright 2018 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Object.defineProperty(exports, "__esModule", { value: true });
const precise_date_1 = require("@google-cloud/precise-date");
const projectify_1 = require("@google-cloud/projectify");
const promisify_1 = require("@google-cloud/promisify");
const events_1 = require("events");
const histogram_1 = require("./histogram");
const lease_manager_1 = require("./lease-manager");
const message_queues_1 = require("./message-queues");
const message_stream_1 = require("./message-stream");
/**
* Date object with nanosecond precision. Supports all standard Date arguments
* in addition to several custom types.
*
* @external PreciseDate
* @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
*/
/**
* Message objects provide a simple interface for users to get message data and
* acknowledge the message.
*
* @example
* subscription.on('message', message => {
* // {
* // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
* // attributes: {key: 'value'},
* // data: Buffer.from('Hello, world!),
* // id: '1551297743043',
* // orderingKey: 'ordering-key',
* // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
* // received: 1551297743043,
* // length: 13
* // }
* });
*/
class Message {
/**
* @hideconstructor
*
* @param {Subscriber} sub The parent subscriber.
* @param {object} message The raw message response.
*/
constructor(sub, { ackId, message }) {
/**
* This ID is used to acknowledge the message.
*
* @name Message#ackId
* @type {string}
*/
this.ackId = ackId;
/**
* Optional attributes for this message.
*
* @name Message#attributes
* @type {object}
*/
this.attributes = message.attributes || {};
/**
* The message data as a Buffer.
*
* @name Message#data
* @type {Buffer}
*/
this.data = message.data;
/**
* ID of the message, assigned by the server when the message is published.
* Guaranteed to be unique within the topic.
*
* @name Message#id
* @type {string}
*/
this.id = message.messageId;
/**
* Identifies related messages for which publish order should be respected.
* If a `Subscription` has `enableMessageOrdering` set to `true`, messages
* published with the same `orderingKey` value will be delivered to
* subscribers in the order in which they are received by the Pub/Sub
* system.
*
* **EXPERIMENTAL:** This feature is part of a closed alpha release. This
* API might be changed in backward-incompatible ways and is not recommended
* for production use. It is not subject to any SLA or deprecation policy.
*
* @name Message#orderingKey
* @type {string}
*/
this.orderingKey = message.orderingKey;
/**
* The time at which the message was published.
*
* @name Message#publishTime
* @type {external:PreciseDate}
*/
this.publishTime = new precise_date_1.PreciseDate(message.publishTime);
/**
* The time at which the message was recieved by the subscription.
*
* @name Message#received
* @type {number}
*/
this.received = Date.now();
this._handled = false;
this._length = this.data.length;
this._subscriber = sub;
}
/**
* The length of the message data.
*
* @type {number}
*/
get length() {
return this._length;
}
/**
* Acknowledges the message.
*
* @example
* subscription.on('message', message => {
* message.ack();
* });
*/
ack() {
if (!this._handled) {
this._handled = true;
this._subscriber.ack(this);
}
}
/**
* Modifies the ack deadline.
*
* @param {number} deadline The number of seconds to extend the deadline.
* @private
*/
modAck(deadline) {
if (!this._handled) {
this._subscriber.modAck(this, deadline);
}
}
/**
* Removes the message from our inventory and schedules it to be redelivered.
*
* @example
* subscription.on('message', message => {
* message.nack();
* });
*/
nack() {
if (!this._handled) {
this._handled = true;
this._subscriber.nack(this);
}
}
}
exports.Message = Message;
/**
* @typedef {object} SubscriberOptions
* @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left
* unset the initial value will be 10 seconds, but it will evolve into the
* 99th percentile time it takes to acknowledge a message.
* @property {BatchOptions} [batching] Request batching options.
* @property {FlowControlOptions} [flowControl] Flow control options.
* @property {MessageStreamOptions} [streamingOptions] Streaming options.
*/
/**
* Subscriber class is used to manage all message related functionality.
*
* @private
* @class
*
* @param {Subscription} subscription The corresponding subscription.
* @param {SubscriberOptions} options The subscriber options.
*/
class Subscriber extends events_1.EventEmitter {
constructor(subscription, options = {}) {
super();
this.ackDeadline = 10;
this.isOpen = false;
this._isUserSetDeadline = false;
this._histogram = new histogram_1.Histogram({ min: 10, max: 600 });
this._latencies = new histogram_1.Histogram();
this._subscription = subscription;
this.setOptions(options);
}
/**
* The 99th percentile of request latencies.
*
* @type {number}
* @private
*/
get modAckLatency() {
const latency = this._latencies.percentile(99);
let bufferTime = 0;
if (this._modAcks) {
bufferTime = this._modAcks.maxMilliseconds;
}
return latency * 1000 + bufferTime;
}
/**
* The full name of the Subscription.
*
* @type {string}
* @private
*/
get name() {
if (!this._name) {
const { name, projectId } = this._subscription;
this._name = projectify_1.replaceProjectIdToken(name, projectId);
}
return this._name;
}
/**
* Acknowledges the supplied message.
*
* @param {Message} message The message to acknowledge.
* @returns {Promise}
* @private
*/
async ack(message) {
if (!this._isUserSetDeadline) {
const ackTimeSeconds = (Date.now() - message.received) / 1000;
this._histogram.add(ackTimeSeconds);
this.ackDeadline = this._histogram.percentile(99);
}
this._acks.add(message);
await this._acks.onFlush();
this._inventory.remove(message);
}
/**
* Closes the subscriber. The returned promise will resolve once any pending
* acks/modAcks are finished.
*
* @returns {Promise}
* @private
*/
async close() {
if (!this.isOpen) {
return;
}
this.isOpen = false;
this._stream.destroy();
this._inventory.clear();
await this._waitForFlush();
this.emit('close');
}
/**
* Gets the subscriber client instance.
*
* @returns {Promise<object>}
* @private
*/
async getClient() {
const pubsub = this._subscription.pubsub;
const [client] = await promisify_1.promisify(pubsub.getClient_).call(pubsub, {
client: 'SubscriberClient',
});
return client;
}
/**
* Modifies the acknowledge deadline for the provided message.
*
* @param {Message} message The message to modify.
* @param {number} deadline The deadline.
* @returns {Promise}
* @private
*/
async modAck(message, deadline) {
const startTime = Date.now();
this._modAcks.add(message, deadline);
await this._modAcks.onFlush();
const latency = (Date.now() - startTime) / 1000;
this._latencies.add(latency);
}
/**
* Modfies the acknowledge deadline for the provided message and then removes
* it from our inventory.
*
* @param {Message} message The message.
* @return {Promise}
* @private
*/
async nack(message) {
await this.modAck(message, 0);
this._inventory.remove(message);
}
/**
* Starts pulling messages.
* @private
*/
open() {
const { batching, flowControl, streamingOptions } = this._options;
this._acks = new message_queues_1.AckQueue(this, batching);
this._modAcks = new message_queues_1.ModAckQueue(this, batching);
this._inventory = new lease_manager_1.LeaseManager(this, flowControl);
this._stream = new message_stream_1.MessageStream(this, streamingOptions);
this._stream
.on('error', err => this.emit('error', err))
.on('data', (data) => this._onData(data))
.once('close', () => this.close());
this._inventory
.on('full', () => this._stream.pause())
.on('free', () => this._stream.resume());
this.isOpen = true;
}
/**
* Sets subscriber options.
*
* @param {SubscriberOptions} options The options.
* @private
*/
setOptions(options) {
this._options = options;
if (options.ackDeadline) {
this.ackDeadline = options.ackDeadline;
this._isUserSetDeadline = true;
}
// in the event that the user has specified the maxMessages option, we want
// to make sure that the maxStreams option isn't higher
// it doesn't really make sense to open 5 streams if the user only wants
// 1 message at a time.
if (options.flowControl) {
const { maxMessages = 100 } = options.flowControl;
if (!options.streamingOptions) {
options.streamingOptions = {};
}
const { maxStreams = 5 } = options.streamingOptions;
options.streamingOptions.maxStreams = Math.min(maxStreams, maxMessages);
}
}
/**
* Callback to be invoked when a new message is available.
*
* New messages will be added to the subscribers inventory, which in turn will
* automatically extend the messages ack deadline until either:
* a. the user acks/nacks it
* b. the maxExtension option is hit
*
* If the message puts us at/over capacity, then we'll pause our message
* stream until we've freed up some inventory space.
*
* New messages must immediately issue a ModifyAckDeadline request
* (aka receipt) to confirm with the backend that we did infact receive the
* message and its ok to start ticking down on the deadline.
*
* @private
*/
_onData({ receivedMessages }) {
for (const data of receivedMessages) {
const message = new Message(this, data);
if (this.isOpen) {
message.modAck(this.ackDeadline);
this._inventory.add(message);
}
else {
message.nack();
}
}
}
/**
* Returns a promise that will resolve once all pending requests have settled.
*
* @private
*
* @returns {Promise}
*/
async _waitForFlush() {
const promises = [];
if (this._acks.numPendingRequests) {
promises.push(this._acks.onFlush());
this._acks.flush();
}
if (this._modAcks.numPendingRequests) {
promises.push(this._modAcks.onFlush());
this._modAcks.flush();
}
await Promise.all(promises);
}
}
exports.Subscriber = Subscriber;
//# sourceMappingURL=subscriber.js.map