File

src/longRunningCalls/longrunning.ts

Extends

EventEmitter

Index

Properties
Methods

Constructor

constructor(grpcOp: LROOperation, longrunningDescriptor: LongRunningDescriptor, backoffSettings: BackoffSettings, callOptions?: CallOptions)

Wrapper for a google.longrunnung.Operation.

operations service client and unpacking mechanisms for the operation. in polling the operation. requests.

Parameters :
Name Type Optional Description
grpcOp LROOperation No
  • The operation to be wrapped.
longrunningDescriptor LongRunningDescriptor No
  • This defines the operations service client and unpacking mechanisms for the operation.
backoffSettings BackoffSettings No
  • The backoff settings used in in polling the operation.
callOptions CallOptions Yes
  • CallOptions used in making get operation requests.

Properties

Optional _callOptions
Type : CallOptions
backoffSettings
Type : BackoffSettings
completeListeners
Type : number
Optional currentCallPromise_
Type : CancellablePromise<ResultTuple>
Optional done
Type : boolean
Optional error
Type : GoogleError
hasActiveListeners
Type : boolean
latestResponse
Type : LROOperation
longrunningDescriptor
Type : LongRunningDescriptor
metadata
Type : Metadata | null
Optional name
Type : string
Optional response
Type : literal type
result
Type : literal type | null

Methods

_listenForEvents
_listenForEvents()

Begin listening for events on the operation. This method keeps track of how many "complete" listeners are registered and removed, making sure polling is handled automatically.

As long as there is one active "complete" listener, the connection is open. When there are no more listeners, the polling stops.

Returns : void
_unpackResponse
_unpackResponse(op: LROOperation, callback?: GetOperationCallback)
Parameters :
Name Type Optional
op LROOperation No
callback GetOperationCallback Yes
Returns : void
cancel
cancel()

Cancels current polling api call and cancels the operation.

request.

Returns : any

the promise of the OperationsClient#cancelOperation api request.

getOperation
getOperation()

Get the updated status of the operation. If the Operation has previously completed, this will use the status of the cached completed operation.

  • callback(err): Operation failed
  • callback(null, result, metadata, rawResponse): Operation complete
  • callback(null, null, metadata, rawResponse): Operation incomplete

operation result and metadata. The promise resolves to an array where the first element is the unpacked result, the second element is the metadata, and the third element is the raw response of the api call. The promise rejects if the operation returns an error.

Returns : Promise<literal type>
  • This returns a promise if a callback is not specified. The promise resolves to an array where the first element is the unpacked result, the second element is the metadata, and the third element is the raw response of the api call. The promise rejects if the operation returns an error.
getOperation
getOperation(callback: GetOperationCallback)
Parameters :
Name Type Optional
callback GetOperationCallback No
Returns : void
getOperation
getOperation(callback?: GetOperationCallback)
Parameters :
Name Type Optional
callback GetOperationCallback Yes
Returns : Promise | void
promise
promise()

Wraps the complete and error events in a Promise.

on operation error.

Returns : any
  • Promise that resolves on operation completion and rejects on operation error.
startPolling_
startPolling_()

Poll getOperation to check the operation's status. This runs a loop to ping using the backoff strategy specified at initialization.

Note: This method is automatically called once a "complete" event handler is registered on the operation.

Returns : void
import {EventEmitter} from 'events';
import {Status} from '../status';

import {ResultTuple} from '../apitypes';
import {CancellablePromise} from '../call';
import {BackoffSettings, CallOptions} from '../gax';
import {GoogleError} from '../googleError';
import {Metadata} from '../grpc';
import {LongRunningDescriptor} from './longRunningDescriptor';
import * as operationProtos from '../../protos/operations';

/**
 * @callback GetOperationCallback
 * @param {?Error} error
 * @param {?Object} result
 * @param {?Object} metadata
 * @param {?google.longrunning.Operation} rawResponse
 */
export interface GetOperationCallback {
  (
    err?: Error | null,
    result?: {},
    metadata?: {},
    rawResponse?: LROOperation
  ): void;
}

type LROOperation = operationProtos.google.longrunning.Operation;

export class Operation extends EventEmitter {
  completeListeners: number;
  hasActiveListeners: boolean;
  latestResponse: LROOperation;
  longrunningDescriptor: LongRunningDescriptor;
  result: {} | null;
  metadata: Metadata | null;
  backoffSettings: BackoffSettings;
  _callOptions?: CallOptions;
  currentCallPromise_?: CancellablePromise<ResultTuple>;
  name?: string;
  done?: boolean;
  error?: GoogleError;
  response?: {};

  /**
   * Wrapper for a google.longrunnung.Operation.
   *
   * @constructor
   *
   * @param {google.longrunning.Operation} grpcOp - The operation to be wrapped.
   * @param {LongRunningDescriptor} longrunningDescriptor - This defines the
   * operations service client and unpacking mechanisms for the operation.
   * @param {BackoffSettings} backoffSettings - The backoff settings used in
   * in polling the operation.
   * @param {CallOptions} callOptions - CallOptions used in making get operation
   * requests.
   */
  constructor(
    grpcOp: LROOperation,
    longrunningDescriptor: LongRunningDescriptor,
    backoffSettings: BackoffSettings,
    callOptions?: CallOptions
  ) {
    super();
    this.completeListeners = 0;
    this.hasActiveListeners = false;
    this.latestResponse = grpcOp;
    this.name = this.latestResponse.name;
    this.done = this.latestResponse.done;
    this.error = this.latestResponse.error as unknown as GoogleError;
    this.longrunningDescriptor = longrunningDescriptor;
    this.result = null;
    this.metadata = null;
    this.backoffSettings = backoffSettings;
    this._unpackResponse(grpcOp);
    this._listenForEvents();
    this._callOptions = callOptions;
  }

  /**
   * Begin listening for events on the operation. This method keeps track of how
   * many "complete" listeners are registered and removed, making sure polling
   * is handled automatically.
   *
   * As long as there is one active "complete" listener, the connection is open.
   * When there are no more listeners, the polling stops.
   *
   * @private
   */
  _listenForEvents() {
    this.on('newListener', event => {
      if (event === 'complete') {
        this.completeListeners++;

        if (!this.hasActiveListeners) {
          this.hasActiveListeners = true;
          this.startPolling_();
        }
      }
    });

    this.on('removeListener', event => {
      if (event === 'complete' && --this.completeListeners === 0) {
        this.hasActiveListeners = false;
      }
    });
  }

  /**
   * Cancels current polling api call and cancels the operation.
   *
   * @return {Promise} the promise of the OperationsClient#cancelOperation api
   * request.
   */
  cancel() {
    if (this.currentCallPromise_) {
      this.currentCallPromise_.cancel();
    }
    const operationsClient = this.longrunningDescriptor.operationsClient;
    const cancelRequest =
      new operationProtos.google.longrunning.CancelOperationRequest();
    cancelRequest.name = this.latestResponse.name;
    return operationsClient.cancelOperation(cancelRequest);
  }

  /**
   * Get the updated status of the operation. If the Operation has previously
   * completed, this will use the status of the cached completed operation.
   *
   *   - callback(err): Operation failed
   *   - callback(null, result, metadata, rawResponse): Operation complete
   *   - callback(null, null, metadata, rawResponse): Operation incomplete
   *
   * @param {getOperationCallback} callback - Callback to handle the polled
   * operation result and metadata.
   * @return {Promise|undefined} - This returns a promise if a callback is not specified.
   * The promise resolves to an array where the first element is the unpacked
   * result, the second element is the metadata, and the third element is the
   * raw response of the api call. The promise rejects if the operation returns
   * an error.
   */
  getOperation(): Promise<{}>;
  getOperation(callback: GetOperationCallback): void;
  getOperation(callback?: GetOperationCallback): Promise<{}> | void {
    // eslint-disable-next-line @typescript-eslint/no-this-alias
    const self = this;
    const operationsClient = this.longrunningDescriptor.operationsClient;

    function promisifyResponse() {
      if (!callback) {
        return new Promise((resolve, reject) => {
          if (self.latestResponse.error) {
            const error = new GoogleError(self.latestResponse.error.message!);
            error.code = self.latestResponse.error.code!;
            reject(error);
          } else {
            resolve([self.result, self.metadata, self.latestResponse]);
          }
        });
      }
      return;
    }

    if (this.latestResponse.done) {
      this._unpackResponse(this.latestResponse, callback);
      return promisifyResponse() as Promise<{}>;
    }
    const request =
      new operationProtos.google.longrunning.GetOperationRequest();
    request.name = this.latestResponse.name;
    this.currentCallPromise_ = operationsClient.getOperationInternal(
      request,
      this._callOptions!
    );

    const noCallbackPromise = this.currentCallPromise_.then(
      responses => {
        self.latestResponse = responses[0] as LROOperation;
        self._unpackResponse(responses[0] as LROOperation, callback);
        return promisifyResponse()!;
      },
      (err: Error) => {
        if (callback) {
          callback(err);
          return;
        }
        return Promise.reject(err);
      }
    );

    if (!callback) {
      return noCallbackPromise as Promise<{}>;
    }
  }

  _unpackResponse(op: LROOperation, callback?: GetOperationCallback) {
    const responseDecoder = this.longrunningDescriptor.responseDecoder;
    const metadataDecoder = this.longrunningDescriptor.metadataDecoder;
    let response: {};
    let metadata: Metadata;

    if (op.done) {
      if (op.result === 'error') {
        const error = new GoogleError(op.error!.message!);
        error.code = op.error!.code!;
        this.error = error;
        if (callback) {
          callback(error);
        }
        return;
      }

      if (responseDecoder && op.response) {
        this.response = op.response;
        response = responseDecoder(op.response.value!);
        this.result = response;
        this.done = true;
      }
    }

    if (metadataDecoder && op.metadata) {
      metadata = metadataDecoder(op.metadata.value!) as unknown as Metadata;
      this.metadata = metadata;
    }
    if (callback) {
      callback(null, response!, metadata!, op);
    }
  }

  /**
   * Poll `getOperation` to check the operation's status. This runs a loop to
   * ping using the backoff strategy specified at initialization.
   *
   * Note: This method is automatically called once a "complete" event handler
   * is registered on the operation.
   *
   * @private
   */
  startPolling_() {
    // eslint-disable-next-line @typescript-eslint/no-this-alias
    const self = this;

    let now = new Date();
    const delayMult = this.backoffSettings.retryDelayMultiplier;
    const maxDelay = this.backoffSettings.maxRetryDelayMillis;
    let delay = this.backoffSettings.initialRetryDelayMillis;
    let deadline = Infinity;
    if (this.backoffSettings.totalTimeoutMillis) {
      deadline = now.getTime() + this.backoffSettings.totalTimeoutMillis;
    }
    let previousMetadataBytes: Uint8Array;
    if (this.latestResponse.metadata) {
      previousMetadataBytes = this.latestResponse.metadata.value!;
    }

    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    function emit(event: string | symbol, ...args: any[]) {
      self.emit(event, ...args);
    }

    // Helper function to replace nodejs buffer's equals()
    function arrayEquals(a: Uint8Array, b: Uint8Array): boolean {
      if (a.byteLength !== b.byteLength) {
        return false;
      }
      for (let i = 0; i < a.byteLength; ++i) {
        if (a[i] !== b[i]) return false;
      }
      return true;
    }

    function retry() {
      if (!self.hasActiveListeners) {
        return;
      }

      if (now.getTime() >= deadline) {
        const error = new GoogleError(
          'Total timeout exceeded before any response was received'
        );
        error.code = Status.DEADLINE_EXCEEDED;
        setImmediate(emit, 'error', error);
        return;
      }

      self.getOperation((err, result, metadata, rawResponse) => {
        if (err) {
          setImmediate(emit, 'error', err);
          return;
        }

        if (!result) {
          if (
            rawResponse!.metadata &&
            (!previousMetadataBytes ||
              (rawResponse &&
                !arrayEquals(
                  rawResponse.metadata.value!,
                  previousMetadataBytes
                )))
          ) {
            setImmediate(emit, 'progress', metadata, rawResponse);
            previousMetadataBytes = rawResponse!.metadata!.value!;
          }
          // special case: some APIs fail to set either result or error
          // but set done = true (e.g. speech with silent file).
          // Some APIs just use this for the normal completion
          // (e.g. nodejs-contact-center-insights), so let's just return
          // an empty response in this case.
          if (rawResponse!.done) {
            setImmediate(emit, 'complete', {}, metadata, rawResponse);
            return;
          }
          setTimeout(() => {
            now = new Date();
            delay = Math.min(delay * delayMult, maxDelay);
            retry();
          }, delay);
          return;
        }

        setImmediate(emit, 'complete', result, metadata, rawResponse);
      });
    }
    retry();
  }

  /**
   * Wraps the `complete` and `error` events in a Promise.
   *
   * @return {promise} - Promise that resolves on operation completion and rejects
   * on operation error.
   */
  promise() {
    return new Promise((resolve, reject) => {
      this.on('error', reject).on(
        'complete',
        (result, metadata, rawResponse) => {
          resolve([result, metadata, rawResponse]);
        }
      );
    });
  }
}

/**
 * Method used to create Operation objects.
 *
 * @constructor
 *
 * @param {google.longrunning.Operation} op - The operation to be wrapped.
 * @param {LongRunningDescriptor} longrunningDescriptor - This defines the
 * operations service client and unpacking mechanisms for the operation.
 * @param {BackoffSettings} backoffSettings - The backoff settings used in
 * in polling the operation.
 * @param {CallOptions=} callOptions - CallOptions used in making get operation
 * requests.
 */
export function operation(
  op: LROOperation,
  longrunningDescriptor: LongRunningDescriptor,
  backoffSettings: BackoffSettings,
  callOptions?: CallOptions
) {
  return new Operation(op, longrunningDescriptor, backoffSettings, callOptions);
}

results matching ""

    No results matching ""