Subscription

Subscription

A Subscription object will give you access to your Cloud Pub/Sub subscription.

Subscriptions are sometimes retrieved when using various methods:

Subscription objects may be created directly with:

All Subscription objects are instances of an EventEmitter. The subscription will pull for messages automatically as long as there is at least one listener assigned for the message event. Available events:

Upon receipt of a message: on(event: 'message', listener: (message: Message) => void): this;

Upon receipt of an error: on(event: 'error', listener: (error: Error) => void): this;

Upon the closing of the subscriber: on(event: 'close', listener: Function): this;

By default Subscription objects allow you to process 100 messages at the same time. You can fine tune this value by adjusting the options.flowControl.maxMessages option.

If your subscription is seeing more re-deliveries than preferable, you might try increasing your options.ackDeadline value or decreasing the options.streamingOptions.maxStreams value.

Subscription objects handle ack management, by automatically extending the ack deadline while the message is being processed, to then issue the ack or nack of such message when the processing is done. Note: message redelivery is still possible.

By default each PubSub instance can handle 100 open streams, with default options this translates to less than 20 Subscriptions per PubSub instance. If you wish to create more Subscriptions than that, you can either create multiple PubSub instances or lower the options.streamingOptions.maxStreams value on each Subscription object.

Constructor

new Subscription(pubsub, name, optionsopt)

Parameters:
Name Type Attributes Description
pubsub PubSub

PubSub object.

name string

The name of the subscription.

options SubscriberOptions <optional>

Options for handling messages.

Examples

From PubSub#getSubscriptions

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.getSubscriptions((err, subscriptions) => {
  // `subscriptions` is an array of Subscription objects.
});

From Topic#getSubscriptions

const topic = pubsub.topic('my-topic');
topic.getSubscriptions((err, subscriptions) => {
  // `subscriptions` is an array of Subscription objects.
});

Topic#createSubscription

const topic = pubsub.topic('my-topic');
topic.createSubscription('new-subscription', (err, subscription) => {
  // `subscription` is a Subscription object.
});

Topic#subscription

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
// `subscription` is a Subscription object.

Once you have obtained a subscription object, you may begin to register listeners. This will automatically trigger pulling for messages.

// Register an error handler.
subscription.on('error', (err) => {});

// Register a close handler in case the subscriber closes unexpectedly
subscription.on('close', () => {});

// Register a listener for `message` events.
function onMessage(message) {
  // Called every time a message is received.

  // message.id = ID of the message.
  // message.ackId = ID used to acknowledge the message receival.
  // message.data = Contents of the message.
  // message.attributes = Attributes of the message.
  // message.publishTime = Date when Pub/Sub received the message.

  // Ack the message:
  // message.ack();

  // This doesn't ack the message, but allows more messages to be retrieved
  // if your limit was hit or if you don't want to ack the message.
  // message.nack();
}
subscription.on('message', onMessage);

// Remove the listener from receiving `message` events.
subscription.removeListener('message', onMessage);

To apply a fine level of flow control, consider the following configuration

const subscription = topic.subscription('my-sub', {
  flowControl: {
    maxMessages: 1,
    // this tells the client to manage and lock any excess messages
    allowExcessMessages: false
  }
});

Members

iam

IAM (Identity and Access Management) allows you to set permissions on individual resources and offers a wider range of roles: editor, owner, publisher, subscriber, and viewer. This gives you greater flexibility and allows you to set more fine-grained access control.

The IAM access control features described in this document are Beta, including the API methods to get and set IAM policies, and to test IAM permissions. Cloud Pub/Sub's use of IAM features is not covered by any SLA or deprecation policy, and may be subject to backward-incompatible changes.

Mixes In:
See:
Example
//-
// Get the IAM policy for your subscription.
//-
subscription.iam.getPolicy((err, policy) => {
  console.log(policy);
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.iam.getPolicy().then((data) => {
  const policy = data[0];
  const apiResponse = data[1];
});

isOpen

Indicates if the Subscription is open and receiving messages.

projectId

Methods

close(callbackopt)

Closes the Subscription, once this is called you will no longer receive message events unless you call {Subscription#open} or add new message listeners.

Parameters:
Name Type Attributes Description
callback function <optional>

The callback function.

Properties
Name Type Attributes Description
err error <nullable>

An error returned while closing the Subscription.

Example
subscription.close(err => {
  if (err) {
    // Error handling omitted.
  }
});

// If the callback is omitted a Promise will be returned.
subscription.close().then(() => {});

create(name, optionsopt, callbackopt) → {Promise.<CreateSubscriptionResponse>}

Create a subscription.

Parameters:
Name Type Attributes Description
name string

The name of the subscription.

options CreateSubscriptionRequest <optional>

See a Subscription resource.

callback CreateSubscriptionCallback <optional>

Callback function.

Returns:
Type Description
Promise.<CreateSubscriptionResponse>
See:
Throws:

If subscription name is omitted.

Type
Error
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('newMessages');
const callback = function(err, subscription, apiResponse) {};

subscription.create(callback);

With options

subscription.create({
  ackDeadlineSeconds: 90
}, callback);
<caption>If the callback is omitted, we'll return a
Promise.</caption> const [sub, apiResponse] = await subscription.create();

createSnapshot(name, gaxOptsopt, callbackopt) → {Promise.<CreateSnapshotResponse>}

Create a snapshot with the given name.

Parameters:
Name Type Attributes Description
name string

Name of the snapshot.

gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

callback CreateSnapshotCallback <optional>

Callback function.

Returns:
Type Description
Promise.<CreateSnapshotResponse>
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

const callback = (err, snapshot, apiResponse) => {
  if (!err) {
    // The snapshot was created successfully.
  }
};

subscription.createSnapshot('my-snapshot', callback);

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.createSnapshot('my-snapshot').then((data) => {
  const snapshot = data[0];
  const apiResponse = data[1];
});

delete(gaxOptsopt, callbackopt)

Delete the subscription. Pull requests from the current subscription will be errored once unsubscription is complete.

Parameters:
Name Type Attributes Description
gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

callback function <optional>

The callback function.

Properties
Name Type Attributes Description
err error <nullable>

An error returned while making this request.

apiResponse object

Raw API response.

See:
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.delete((err, apiResponse) => {});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.delete().then((data) => {
  const apiResponse = data[0];
});

detached(callbackopt) → {Promise.<SubscriptionDetachedResponse>}

Check if a subscription is detached.

Parameters:
Name Type Attributes Description
callback SubscriptionDetachedCallback <optional>

Callback function.

Returns:
Type Description
Promise.<SubscriptionDetachedResponse>
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.detached((err, exists) => {});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.detached().then((data) => {
  const detached = data[0];
});

exists(callbackopt) → {Promise.<SubscriptionExistsResponse>}

Check if a subscription exists.

Parameters:
Name Type Attributes Description
callback SubscriptionExistsCallback <optional>

Callback function.

Returns:
Type Description
Promise.<SubscriptionExistsResponse>
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.exists((err, exists) => {});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.exists().then((data) => {
  const exists = data[0];
});

get(gaxOptsopt, callbackopt) → {Promise.<GetSubscriptionResponse>}

Get a subscription if it exists.

Parameters:
Name Type Attributes Description
gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Properties
Name Type Attributes Default Description
autoCreate boolean <optional>
false

Automatically create the subscription if it does not already exist.

callback GetSubscriptionCallback <optional>

Callback function.

Returns:
Type Description
Promise.<GetSubscriptionResponse>
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.get((err, subscription, apiResponse) => {
  // The `subscription` data has been populated.
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.get().then((data) => {
  const subscription = data[0];
  const apiResponse = data[1];
});

getMetadata(gaxOptsopt, callbackopt) → {Promise.<GetSubscriptionMetadataResponse>}

Fetches the subscriptions metadata.

Parameters:
Name Type Attributes Description
gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

callback GetSubscriptionMetadataCallback <optional>

Callback function.

Returns:
Type Description
Promise.<GetSubscriptionMetadataResponse>
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.getMetadata((err, apiResponse) => {
  if (err) {
    // Error handling omitted.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.getMetadata().then((data) => {
  const apiResponse = data[0];
});

modifyPushConfig(config, gaxOptsopt, callbackopt) → {Promise.<ModifyPushConfigResponse>}

Modify the push config for the subscription.

Parameters:
Name Type Attributes Description
config object

The push config.

Properties
Name Type Description
pushEndpoint string

A URL locating the endpoint to which messages should be published.

attributes object

PushConfig attributes.

oidcToken object

If specified, Pub/Sub will generate and attach an OIDC JWT token as an Authorization header in the HTTP request for every pushed message. This object should have the same structure as OidcToken

gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

callback ModifyPushConfigCallback <optional>

Callback function.

Returns:
Type Description
Promise.<ModifyPushConfigResponse>
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

const pushConfig = {
  pushEndpoint: 'https://mydomain.com/push',
  attributes: {
    key: 'value'
  },
  oidcToken: {
    serviceAccountEmail: 'myproject@appspot.gserviceaccount.com',
    audience: 'myaudience'
  }
};

subscription.modifyPushConfig(pushConfig, (err, apiResponse) => {
  if (err) {
    // Error handling omitted.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.modifyPushConfig(pushConfig).then((data) => {
  const apiResponse = data[0];
});

open()

Opens the Subscription to receive messages. In general this method shouldn't need to be called, unless you wish to receive messages after calling Subscription#close. Alternatively one could just assign a new message event listener which will also re-open the Subscription.

Example
subscription.on('message', message => message.ack());

// Close the subscription.
subscription.close(err => {
  if (err) {
    // Error handling omitted.
  }

  The subscription has been closed and messages will no longer be received.
});

// Resume receiving messages.
subscription.open();

seek(snapshot, gaxOptsopt, callbackopt) → {Promise.<SeekResponse>}

Seeks an existing subscription to a point in time or a given snapshot.

Parameters:
Name Type Attributes Description
snapshot string | date

The point to seek to. This will accept the name of the snapshot or a Date object.

gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

callback SeekCallback <optional>

Callback function.

Returns:
Type Description
Promise.<SeekResponse>
Example
const callback = (err, resp) => {
  if (!err) {
    // Seek was successful.
  }
};

subscription.seek('my-snapshot', callback);

//-
// Alternatively, to specify a certain point in time, you can provide a
Date
// object.
//-
const date = new Date('October 21 2015');

subscription.seek(date, callback);

setMetadata(metadata, gaxOptsopt, callbackopt) → {Promise.<SetSubscriptionMetadataResponse>}

Update the subscription object.

Parameters:
Name Type Attributes Description
metadata object

The subscription metadata.

gaxOpts object <optional>

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

callback SetSubscriptionMetadataCallback <optional>

Callback function.

Returns:
Type Description
Promise.<SetSubscriptionMetadataResponse>
Example
const metadata = {
  key: 'value'
};

subscription.setMetadata(metadata, (err, apiResponse) => {
  if (err) {
    // Error handling omitted.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.setMetadata(metadata).then((data) => {
  const apiResponse = data[0];
});

setOptions(options)

Sets the Subscription options.

Parameters:
Name Type Description
options SubscriberOptions

The options.

snapshot(name) → {Snapshot}

Create a Snapshot object. See Subscription#createSnapshot to create a snapshot.

Parameters:
Name Type Description
name string

The name of the snapshot.

Returns:
Type Description
Snapshot
Throws:

If a name is not provided.

Type
Error
Example
const snapshot = subscription.snapshot('my-snapshot');