File

src/grpc.ts

Index

Properties

Properties

clone
clone: function
Type : function
get
get: function
Type : function
set
set: function
Type : function
value
value: MetadataValue
Type : MetadataValue
import * as grpcProtoLoader from '@grpc/proto-loader';
import {execFile} from 'child_process';
import * as fs from 'fs';
import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library';
import * as grpc from '@grpc/grpc-js';
import * as os from 'os';
import {join} from 'path';
import {OutgoingHttpHeaders} from 'http';
import * as path from 'path';
import * as protobuf from 'protobufjs';
import * as objectHash from 'object-hash';

import * as gax from './gax';
import {ClientOptions} from '@grpc/grpc-js/build/src/client';

const googleProtoFilesDir = path.join(__dirname, '..', '..', 'build', 'protos');

// INCLUDE_DIRS is passed to @grpc/proto-loader
const INCLUDE_DIRS: string[] = [];
INCLUDE_DIRS.push(googleProtoFilesDir);

// COMMON_PROTO_FILES logic is here for protobufjs loads (see
// GoogleProtoFilesRoot below)
import * as commonProtoFiles from './protosList.json';
import {google} from '../protos/http';
// use the correct path separator for the OS we are running on
const COMMON_PROTO_FILES: string[] = commonProtoFiles.map(file =>
  file.replace(/[/\\]/g, path.sep)
);

export interface GrpcClientOptions extends GoogleAuthOptions {
  auth?: GoogleAuth;
  grpc?: GrpcModule;
  protoJson?: protobuf.Root;
  httpRules?: Array<google.api.IHttpRule>;
  numericEnums?: boolean;
}

export interface MetadataValue {
  equals: Function;
}

/*
 * Async version of readFile.
 *
 * @returns {Promise} Contents of file at path.
 */
async function readFileAsync(path: string): Promise<string> {
  return new Promise((resolve, reject) => {
    fs.readFile(path, 'utf8', (err, content) => {
      if (err) return reject(err);
      else resolve(content);
    });
  });
}

/*
 * Async version of execFile.
 *
 * @returns {Promise} stdout from command execution.
 */
async function execFileAsync(command: string, args: string[]): Promise<string> {
  return new Promise((resolve, reject) => {
    execFile(command, args, (err, stdout) => {
      if (err) return reject(err);
      else resolve(stdout);
    });
  });
}

export interface Metadata {
  // eslint-disable-next-line @typescript-eslint/no-misused-new
  new (): Metadata;
  set: (key: {}, value?: {} | null) => void;
  clone: () => Metadata;
  value: MetadataValue;
  get: (key: {}) => {};
}

export type GrpcModule = typeof grpc;

export interface ClientStubOptions {
  protocol?: string;
  servicePath?: string;
  port?: number;
  sslCreds?: grpc.ChannelCredentials;
  [index: string]: string | number | undefined | {};
  // For mtls:
  cert?: string;
  key?: string;
}

export class ClientStub extends grpc.Client {
  [name: string]: Function;
}

export class GrpcClient {
  auth: GoogleAuth;
  grpc: GrpcModule;
  grpcVersion: string;
  fallback: boolean | 'rest' | 'proto';
  private static protoCache = new Map<string, grpc.GrpcObject>();
  httpRules?: Array<google.api.IHttpRule>;

  /**
   * Key for proto cache map. We are doing our best to make sure we respect
   * the options, so if the same proto file is loaded with different set of
   * options, the cache won't be used.  Since some of the options are
   * Functions (e.g. `enums: String` - see below in `loadProto()`),
   * they will be omitted from the cache key.  If the cache breaks anything
   * for you, use the `ignoreCache` parameter of `loadProto()` to disable it.
   */
  private static protoCacheKey(
    filename: string | string[],
    options: grpcProtoLoader.Options
  ) {
    if (
      !filename ||
      (Array.isArray(filename) && (filename.length === 0 || !filename[0]))
    ) {
      return undefined;
    }
    return JSON.stringify(filename) + ' ' + JSON.stringify(options);
  }

  /**
   * In rare cases users might need to deallocate all memory consumed by loaded protos.
   * This method will delete the proto cache content.
   */
  static clearProtoCache() {
    GrpcClient.protoCache.clear();
  }

  /**
   * A class which keeps the context of gRPC and auth for the gRPC.
   *
   * @param {Object=} options - The optional parameters. It will be directly
   *   passed to google-auth-library library, so parameters like keyFile or
   *   credentials will be valid.
   * @param {Object=} options.auth - An instance of google-auth-library.
   *   When specified, this auth instance will be used instead of creating
   *   a new one.
   * @param {Object=} options.grpc - When specified, this will be used
   *   for the 'grpc' module in this context. By default, it will load the grpc
   *   module in the standard way.
   * @constructor
   */
  constructor(options: GrpcClientOptions = {}) {
    this.auth = options.auth || new GoogleAuth(options);
    this.fallback = false;

    const minimumVersion = 10;
    const major = Number(process.version.match(/^v(\d+)/)?.[1]);
    if (Number.isNaN(major) || major < minimumVersion) {
      const errorMessage =
        `Node.js v${minimumVersion}.0.0 is a minimum requirement. To learn about legacy version support visit: ` +
        'https://github.com/googleapis/google-cloud-node#supported-nodejs-versions';
      throw new Error(errorMessage);
    }

    if ('grpc' in options) {
      this.grpc = options.grpc!;
      this.grpcVersion = '';
    } else {
      this.grpc = grpc;
      this.grpcVersion = require('@grpc/grpc-js/package.json').version;
    }
  }

  /**
   * Creates a gRPC credentials. It asks the auth data if necessary.
   * @private
   * @param {Object} opts - options values for configuring credentials.
   * @param {Object=} opts.sslCreds - when specified, this is used instead
   *   of default channel credentials.
   * @return {Promise} The promise which will be resolved to the gRPC credential.
   */
  async _getCredentials(opts: ClientStubOptions) {
    if (opts.sslCreds) {
      return opts.sslCreds;
    }
    const grpc = this.grpc;
    const sslCreds =
      opts.cert && opts.key
        ? grpc.credentials.createSsl(
            null,
            Buffer.from(opts.key),
            Buffer.from(opts.cert)
          )
        : grpc.credentials.createSsl();
    const client = await this.auth.getClient();
    const credentials = grpc.credentials.combineChannelCredentials(
      sslCreds,
      grpc.credentials.createFromGoogleCredential(client)
    );
    return credentials;
  }

  private static defaultOptions() {
    // This set of @grpc/proto-loader options
    // 'closely approximates the existing behavior of grpc.load'
    const includeDirs = INCLUDE_DIRS.slice();
    const options = {
      keepCase: false,
      longs: String,
      enums: String,
      defaults: true,
      oneofs: true,
      includeDirs,
    };
    return options;
  }

  /**
   * Loads the gRPC service from the proto file(s) at the given path and with the
   * given options. Caches the loaded protos so the subsequent loads don't do
   * any disk reads.
   * @param filename The path to the proto file(s).
   * @param options Options for loading the proto file.
   * @param ignoreCache Defaults to `false`. Set it to `true` if the caching logic
   *   incorrectly decides that the options object is the same, or if you want to
   *   re-read the protos from disk for any other reason.
   */
  loadFromProto(
    filename: string | string[],
    options: grpcProtoLoader.Options,
    ignoreCache = false
  ) {
    const cacheKey = GrpcClient.protoCacheKey(filename, options);
    let grpcPackage = cacheKey
      ? GrpcClient.protoCache.get(cacheKey)
      : undefined;
    if (ignoreCache || !grpcPackage) {
      const packageDef = grpcProtoLoader.loadSync(filename, options);
      grpcPackage = this.grpc.loadPackageDefinition(packageDef);
      if (cacheKey) {
        GrpcClient.protoCache.set(cacheKey, grpcPackage);
      }
    }
    return grpcPackage;
  }

  /**
   * Load gRPC proto service from a filename looking in googleapis common protos
   * when necessary. Caches the loaded protos so the subsequent loads don't do
   * any disk reads.
   * @param {String} protoPath - The directory to search for the protofile.
   * @param {String|String[]} filename - The filename(s) of the proto(s) to be loaded.
   *   If omitted, protoPath will be treated as a file path to load.
   * @param ignoreCache Defaults to `false`. Set it to `true` if the caching logic
   *   incorrectly decides that the options object is the same, or if you want to
   *   re-read the protos from disk for any other reason.
   * @return {Object<string, *>} The gRPC loaded result (the toplevel namespace
   *   object).
   */
  loadProto(
    protoPath: string,
    filename?: string | string[],
    ignoreCache = false
  ) {
    if (!filename) {
      filename = path.basename(protoPath);
      protoPath = path.dirname(protoPath);
    }

    if (Array.isArray(filename) && filename.length === 0) {
      return {};
    }
    const options = GrpcClient.defaultOptions();
    options.includeDirs.unshift(protoPath);
    return this.loadFromProto(filename, options, ignoreCache);
  }

  static _resolveFile(protoPath: string, filename: string) {
    if (fs.existsSync(path.join(protoPath, filename))) {
      return path.join(protoPath, filename);
    } else if (COMMON_PROTO_FILES.indexOf(filename) > -1) {
      return path.join(googleProtoFilesDir, filename);
    }
    throw new Error(filename + ' could not be found in ' + protoPath);
  }

  loadProtoJSON(json: protobuf.INamespace, ignoreCache = false) {
    const hash = objectHash(JSON.stringify(json)).toString();
    const cached = GrpcClient.protoCache.get(hash);
    if (cached && !ignoreCache) {
      return cached;
    }
    const options = GrpcClient.defaultOptions();
    const packageDefinition = grpcProtoLoader.fromJSON(json, options);
    const grpcPackage = this.grpc.loadPackageDefinition(packageDefinition);
    GrpcClient.protoCache.set(hash, grpcPackage);
    return grpcPackage;
  }

  metadataBuilder(headers: OutgoingHttpHeaders) {
    const Metadata = this.grpc.Metadata;
    const baseMetadata = new Metadata();
    for (const key in headers) {
      const value = headers[key];
      if (Array.isArray(value)) {
        value.forEach(v => baseMetadata.add(key, v));
      } else {
        baseMetadata.set(key, `${value}`);
      }
    }
    return function buildMetadata(
      abTests?: {},
      moreHeaders?: OutgoingHttpHeaders
    ) {
      // TODO: bring the A/B testing info into the metadata.
      let copied = false;
      let metadata = baseMetadata;
      if (moreHeaders) {
        for (const key in moreHeaders) {
          if (key.toLowerCase() !== 'x-goog-api-client') {
            if (!copied) {
              copied = true;
              metadata = metadata.clone();
            }
            const value = moreHeaders[key];
            if (Array.isArray(value)) {
              value.forEach(v => metadata.add(key, v));
            } else {
              metadata.set(key, `${value}`);
            }
          }
        }
      }
      return metadata;
    };
  }

  /**
   * A wrapper of {@link constructSettings} function under the gRPC context.
   *
   * Most of parameters are common among constructSettings, please take a look.
   * @param {string} serviceName - The fullly-qualified name of the service.
   * @param {Object} clientConfig - A dictionary of the client config.
   * @param {Object} configOverrides - A dictionary of overriding configs.
   * @param {Object} headers - A dictionary of additional HTTP header name to
   *   its value.
   * @return {Object} A mapping of method names to CallSettings.
   */
  constructSettings(
    serviceName: string,
    clientConfig: gax.ClientConfig,
    configOverrides: gax.ClientConfig,
    headers: OutgoingHttpHeaders
  ) {
    return gax.constructSettings(
      serviceName,
      clientConfig,
      configOverrides,
      this.grpc.status,
      {metadataBuilder: this.metadataBuilder(headers)}
    );
  }

  /**
   * Creates a gRPC stub with current gRPC and auth.
   * @param {function} CreateStub - The constructor function of the stub.
   * @param {Object} options - The optional arguments to customize
   *   gRPC connection. This options will be passed to the constructor of
   *   gRPC client too.
   * @param {string} options.servicePath - The name of the server of the service.
   * @param {number} options.port - The port of the service.
   * @param {grpcTypes.ClientCredentials=} options.sslCreds - The credentials to be used
   *   to set up gRPC connection.
   * @param {string} defaultServicePath - The default service path.
   * @return {Promise} A promise which resolves to a gRPC stub instance.
   */
  async createStub(
    CreateStub: typeof ClientStub,
    options: ClientStubOptions,
    customServicePath?: boolean
  ) {
    // The following options are understood by grpc-gcp and need a special treatment
    // (should be passed without a `grpc.` prefix)
    const grpcGcpOptions = [
      'grpc.callInvocationTransformer',
      'grpc.channelFactoryOverride',
      'grpc.gcpApiConfig',
    ];
    const [cert, key] = await this._detectClientCertificate(options);
    const servicePath = this._mtlsServicePath(
      options.servicePath,
      customServicePath,
      cert && key
    );
    const opts = Object.assign({}, options, {cert, key, servicePath});
    const serviceAddress = servicePath + ':' + opts.port;
    const creds = await this._getCredentials(opts);
    const grpcOptions: ClientOptions = {};
    // @grpc/grpc-js limits max receive/send message length starting from v0.8.0
    // https://github.com/grpc/grpc-node/releases/tag/%40grpc%2Fgrpc-js%400.8.0
    // To keep the existing behavior and avoid libraries breakage, we pass -1 there as suggested.
    grpcOptions['grpc.max_receive_message_length'] = -1;
    grpcOptions['grpc.max_send_message_length'] = -1;
    grpcOptions['grpc.initial_reconnect_backoff_ms'] = 1000;
    Object.keys(opts).forEach(key => {
      const value = options[key];
      // the older versions had a bug which required users to call an option
      // grpc.grpc.* to make it actually pass to gRPC as grpc.*, let's handle
      // this here until the next major release
      if (key.startsWith('grpc.grpc.')) {
        key = key.replace(/^grpc\./, '');
      }
      if (key.startsWith('grpc.')) {
        if (grpcGcpOptions.includes(key)) {
          key = key.replace(/^grpc\./, '');
        }
        grpcOptions[key] = value as string | number;
      }
      if (key.startsWith('grpc-node.')) {
        grpcOptions[key] = value as string | number;
      }
    });
    const stub = new CreateStub(
      serviceAddress,
      creds,
      grpcOptions as ClientOptions
    );
    return stub;
  }

  /**
   * Detect mTLS client certificate based on logic described in
   * https://google.aip.dev/auth/4114.
   *
   * @param {object} [options] - The configuration object.
   * @returns {Promise} Resolves array of strings representing cert and key.
   */
  async _detectClientCertificate(opts?: ClientOptions) {
    const certRegex =
      /(?<cert>-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----)/s;
    const keyRegex =
      /(?<key>-----BEGIN PRIVATE KEY-----.*?-----END PRIVATE KEY-----)/s;
    // If GOOGLE_API_USE_CLIENT_CERTIFICATE is true...:
    if (
      typeof process !== 'undefined' &&
      process?.env?.GOOGLE_API_USE_CLIENT_CERTIFICATE === 'true'
    ) {
      if (opts?.cert && opts?.key) {
        return [opts.cert, opts.key];
      }
      // If context aware metadata exists, run the cert provider command,
      // parse the output to extract cert and key, and use this cert/key.
      const metadataPath = join(
        os.homedir(),
        '.secureConnect',
        'context_aware_metadata.json'
      );
      const metadata = JSON.parse(await readFileAsync(metadataPath));
      if (!metadata.cert_provider_command) {
        throw Error('no cert_provider_command found');
      }
      const stdout = await execFileAsync(
        metadata.cert_provider_command[0],
        metadata.cert_provider_command.slice(1)
      );
      const matchCert = stdout.toString().match(certRegex);
      const matchKey = stdout.toString().match(keyRegex);
      if (!(matchCert?.groups && matchKey?.groups)) {
        throw Error('unable to parse certificate and key');
      } else {
        return [matchCert.groups.cert, matchKey.groups.key];
      }
    }
    // If GOOGLE_API_USE_CLIENT_CERTIFICATE is not set or false,
    // use no cert or key:
    return [undefined, undefined];
  }

  /**
   * Return service path, taking into account mTLS logic.
   * See: https://google.aip.dev/auth/4114
   *
   * @param {string|undefined} servicePath - The path of the service.
   * @param {string|undefined} customServicePath - Did the user provide a custom service URL.
   * @param {boolean} hasCertificate - Was a certificate found.
   * @returns {string} The DNS address for this service.
   */
  _mtlsServicePath(
    servicePath: string | undefined,
    customServicePath: boolean | undefined,
    hasCertificate: boolean
  ): string | undefined {
    // If user provides a custom service path, return the current service
    // path and do not attempt to add mtls subdomain:
    if (customServicePath || !servicePath) return servicePath;
    if (
      typeof process !== 'undefined' &&
      process?.env?.GOOGLE_API_USE_MTLS_ENDPOINT === 'never'
    ) {
      // It was explicitly asked that mtls endpoint not be used:
      return servicePath;
    } else if (
      (typeof process !== 'undefined' &&
        process?.env?.GOOGLE_API_USE_MTLS_ENDPOINT === 'always') ||
      hasCertificate
    ) {
      // Either auto-detect or explicit setting of endpoint:
      return servicePath.replace('googleapis.com', 'mtls.googleapis.com');
    }
    return servicePath;
  }

  /**
   * Creates a 'bytelength' function for a given proto message class.
   *
   * See {@link BundleDescriptor} about the meaning of the return value.
   *
   * @param {function} message - a constructor function that is generated by
   *   protobuf.js. Assumes 'encoder' field in the message.
   * @return {function(Object):number} - a function to compute the byte length
   *   for an object.
   */
  static createByteLengthFunction(message: typeof protobuf.Message) {
    return gax.createByteLengthFunction(message);
  }
}

export class GoogleProtoFilesRoot extends protobuf.Root {
  constructor(...args: Array<{}>) {
    super(...args);
  }

  // Causes the loading of an included proto to check if it is a common
  // proto. If it is a common proto, use the bundled proto.
  resolvePath(originPath: string, includePath: string) {
    originPath = path.normalize(originPath);
    includePath = path.normalize(includePath);

    // Fully qualified paths don't need to be resolved.
    if (path.isAbsolute(includePath)) {
      if (!fs.existsSync(includePath)) {
        throw new Error('The include `' + includePath + '` was not found.');
      }
      return includePath;
    }

    if (COMMON_PROTO_FILES.indexOf(includePath) > -1) {
      return path.join(googleProtoFilesDir, includePath);
    }

    return GoogleProtoFilesRoot._findIncludePath(originPath, includePath);
  }

  static _findIncludePath(originPath: string, includePath: string) {
    originPath = path.normalize(originPath);
    includePath = path.normalize(includePath);

    let current = originPath;
    let found = fs.existsSync(path.join(current, includePath));
    while (!found && current.length > 0) {
      current = current.substring(0, current.lastIndexOf(path.sep));
      found = fs.existsSync(path.join(current, includePath));
    }
    if (!found) {
      throw new Error('The include `' + includePath + '` was not found.');
    }
    return path.join(current, includePath);
  }
}

results matching ""

    No results matching ""