File

src/plugins/plugin-grpc.ts

Index

Properties

Properties

Server
Server:
import {EventEmitter} from 'events';
import * as grpcModule from 'grpc'; // for types only.
import {
  Client,
  MethodDefinition,
  ServerReadableStream,
  ServerUnaryCall,
  StatusObject,
} from 'grpc';
import * as shimmer from 'shimmer';

import {
  Plugin,
  RootSpan,
  RootSpanOptions,
  Span,
  TraceContext,
  Tracer,
} from '../plugin-types';

// Re-definition of Metadata with private fields
type Metadata = grpcModule.Metadata & {
  _internal_repr: {};
};
// Module exports of metadata.js
type MetadataModule = typeof grpcModule.Metadata;
// Type of makeClientConstructor as exported from client.js
type MakeClientConstructorFunction = (
  methods: {[key: string]: {originalName?: string}},
  serviceName: string,
  classOptions: never
) => typeof Client;
// Meta-type of client-side handlers
type ClientMethod<S, T> = (
  | typeof Client.prototype.makeUnaryRequest
  | typeof Client.prototype.makeClientStreamRequest
  | typeof Client.prototype.makeServerStreamRequest
  | typeof Client.prototype.makeBidiStreamRequest
) &
  (() => EventEmitter) &
  MethodDefinition<S, T>;
// Partial module exports of client.js
interface ClientModule {
  Client: typeof Client;
  makeClientConstructor: MakeClientConstructorFunction;
}
// Callback type for unary calls/client streams
type ServerUnaryCallback<T> = (
  err: Error,
  value: T,
  trailer: Metadata,
  flags: number
) => void;
// Re-definition of ServerWriteableStream with private fields
type ServerWriteableStream<S> = grpcModule.ServerWriteableStream<S> & {
  status: StatusObject;
};
// Re-definition of ServerDuplexStream with private fields
type ServerDuplexStream<S, T> = grpcModule.ServerDuplexStream<S, T> & {
  status: StatusObject;
};
// Type of server-side unary call handler
type ServerUnaryCallHandler<S, T> = (
  call: ServerUnaryCall<S>,
  cb: ServerUnaryCallback<T>
) => void;
// Type of server-side server streaming handler
type ServerServerStreamingHandler<S> = (call: ServerWriteableStream<S>) => void;
// Type of server-side client streaming handler
type ServerClientStreamingHandler<S, T> = (
  call: ServerReadableStream<S>,
  cb: ServerUnaryCallback<T>
) => void;
// Type of server-side bidirectional streaming handler
type ServerBidiectionalStreamingHandler<S, T> = (
  call: ServerDuplexStream<S, T>
) => void;
// Meta-type for all server-side handlers
type ServerHandler<S, T> =
  | ServerUnaryCallHandler<S, T>
  | ServerServerStreamingHandler<S>
  | ServerClientStreamingHandler<S, T>
  | ServerBidiectionalStreamingHandler<S, T>;
// Private representation of functions used on the server side
interface ServerHandlerFunctions<HandlerFunctionType> {
  func: HandlerFunctionType;
  serialize: () => never;
  deserialize: () => never;
  type: string;
}
// Re-definition of Server with private fields
type Server = grpcModule.Server & {
  handlers: {
    [key: string]: ServerHandlerFunctions<ServerHandler<never, never>>;
  };
};
// Partial module exports of server.js
interface ServerModule {
  Server: typeof grpcModule.Server;
}
// Convenience type representing Server#register
type ServerRegisterFunction = typeof grpcModule.Server.prototype.register;
// Convenience type for ordinary callbacks
type Callback<T> = (err: Error | null, value: T) => void;

const SKIP_FRAMES = 1;

// Required for adding distributed tracing metadata to outgoing gRPC requests.
// This value is assigned in patchMetadata, and used in patchClient.
// patchMetadata is guaranteed to be called before patchClient because Client
// depends on Metadata.
// TODO(kjin): This could cause bugs if there are multiple gRPC modules being
// used at once.
let MetadataModuleValue: MetadataModule;

function patchMetadata(metadata: MetadataModule) {
  // metadata is the value of module.exports of src/node/src/metadata.js
  MetadataModuleValue = metadata;
}

function unpatchMetadata() {
  // patchMetadata doesn't modify the module exports of metadata.js.
  // So it's safe to provide a no-op unpatch function.
}

function patchClient(client: ClientModule, api: Tracer) {
  /**
   * Set trace context on a Metadata object if it exists.
   * @param metadata The Metadata object to which a trace context should be
   * added.
   * @param stringifiedTraceContext The stringified trace context. If this is
   * a falsey value, metadata will not be modified.
   */
  function setTraceContextFromString(
    metadata: Metadata,
    traceContext: TraceContext | null
  ): void {
    if (traceContext) {
      const metadataValue =
        api.traceContextUtils.encodeAsByteArray(traceContext);
      metadata.set(
        api.constants.TRACE_CONTEXT_GRPC_METADATA_NAME,
        metadataValue
      );
    }
  }

  /**
   * Wraps a callback so that the current span for this trace is also ended when
   * the callback is invoked.
   * @param span - The span that should end after this callback.
   * @param done - The callback to be wrapped.
   */
  function wrapCallback<T>(span: Span, done: Callback<T>) {
    const fn: Callback<T> = (err, res) => {
      if (api.enhancedDatabaseReportingEnabled()) {
        if (err) {
          span.addLabel('error', err);
        }
        if (res) {
          span.addLabel('result', JSON.stringify(res));
        }
      }
      span.endSpan();
      done(err, res);
    };
    return api.wrap(fn);
  }

  /**
   * This function is passed to shimmer.wrap in makeClientConstructorWrap below.
   * It starts a child span immediately before the client method is invoked,
   * and ends it either in a callback or stream event handler, depending on the
   * method type.
   */
  function makeClientMethod<S, T>(
    method: ClientMethod<S, T>
  ): ClientMethod<S, T> {
    // TODO(kjin): When we upgrade to TypeScript 2.8, make the return type
    // ReturnType<ClientMethod<S, T>>
    function clientMethodTrace(this: Client): EventEmitter {
      // The span name will be of form "grpc:/[Service]/[MethodName]".
      const span = api.createChildSpan({name: 'grpc:' + method.path});
      if (!api.isRealSpan(span)) {
        // Span couldn't be created, either by policy or because a root span
        // doesn't exist.
        // eslint-disable-next-line prefer-rest-params
        return method.apply(this, arguments);
      }
      const args: Array<
        Metadata | Callback<T> | undefined | never
        // eslint-disable-next-line prefer-rest-params
      > = Array.prototype.slice.call(arguments);
      // Check if the response is through a stream or a callback.
      if (!method.responseStream) {
        // We need to wrap the callback with the context, to propagate it.
        // The callback is always required. It should be the only function in
        // the arguments, since we cannot send a function as an argument through
        // gRPC.
        const cbIndex = args.findIndex(arg => {
          return typeof arg === 'function';
        });
        if (cbIndex !== -1) {
          args[cbIndex] = wrapCallback(span, args[cbIndex] as Callback<T>);
        }
      }
      // This finds an instance of Metadata among the arguments.
      // A possible issue that could occur is if the 'options' parameter from
      // the user contains an '_internal_repr' as well as a 'getMap' function,
      // but this is an extremely rare case.
      let metaIndex = args.findIndex(arg => {
        return (
          !!arg &&
          typeof arg === 'object' &&
          arg._internal_repr &&
          typeof arg.getMap === 'function'
        );
      });
      if (metaIndex === -1) {
        const metadata = new MetadataModuleValue() as Metadata;
        if (!method.requestStream) {
          // unary or server stream
          if (args.length === 0) {
            // No argument (for the gRPC call) was provided, so we will have to
            // provide one, since metadata cannot be the first argument.
            // The internal representation of argument defaults to undefined
            // in its non-presence.
            // Note that we can't pass null instead of undefined because the
            // serializer within gRPC doesn't accept it.
            args.push(undefined);
          }
          metaIndex = 1;
        } else {
          // client stream or bidi
          metaIndex = 0;
        }
        args.splice(metaIndex, 0, metadata);
      }
      // TS: Safe cast as we either found the index of the Metadata argument
      //     or spliced it in at metaIndex.
      const metadata = args[metaIndex] as Metadata;
      setTraceContextFromString(metadata, span.getTraceContext());
      const call: EventEmitter = method.apply(this, args);
      // Add extra data only when call successfully goes through. At this point
      // we know that the arguments are correct.
      if (api.enhancedDatabaseReportingEnabled()) {
        span.addLabel('metadata', JSON.stringify(metadata.getMap()));
        if (!method.requestStream) {
          span.addLabel('argument', JSON.stringify(args[0]));
        }
      }
      // The user might need the current context in listeners to this stream.
      api.wrapEmitter(call);
      if (method.responseStream) {
        let spanEnded = false;
        call.on('error', (err: Error) => {
          if (api.enhancedDatabaseReportingEnabled()) {
            span.addLabel('error', err);
          }
          if (!spanEnded) {
            span.endSpan();
            spanEnded = true;
          }
        });
        call.on('status', (status: StatusObject) => {
          if (api.enhancedDatabaseReportingEnabled()) {
            span.addLabel('status', JSON.stringify(status));
          }
          if (!spanEnded) {
            span.endSpan();
            spanEnded = true;
          }
        });
      }
      return call;
    }
    // TODO(kjin): Investigate whether we need to copy properties of
    // method onto clientMethodTrace.
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    return clientMethodTrace as any as ClientMethod<S, T>;
  }

  /**
   * Modifies `makeClientConstructor` so that all of the methods available
   * through the client are wrapped upon calling the client object constructor.
   */
  function makeClientConstructorWrap(
    makeClientConstructor: MakeClientConstructorFunction
  ): MakeClientConstructorFunction {
    return function makeClientConstructorTrace(this: never, methods) {
      // Client is a class.
      // eslint-disable-next-line prefer-rest-params
      const Client = makeClientConstructor.apply(this, arguments);
      const methodsToWrap = [
        ...Object.keys(methods),
        ...(Object.keys(methods)
          .map(methodName => methods[methodName].originalName)
          .filter(
            originalName =>
              // eslint-disable-next-line no-prototype-builtins
              !!originalName && Client.prototype.hasOwnProperty(originalName)
          ) as string[]),
      ];
      shimmer.massWrap([Client.prototype], methodsToWrap, makeClientMethod);
      return Client;
    };
  }

  shimmer.wrap(client, 'makeClientConstructor', makeClientConstructorWrap);
}

function unpatchClient(client: ClientModule) {
  // Only the Client constructor is unwrapped, so that future grpc.load's
  // will not wrap Client methods with tracing. However, existing Client
  // objects with wrapped prototype methods will continue tracing.
  shimmer.unwrap(client, 'makeClientConstructor');
}

function patchServer(server: ServerModule, api: Tracer) {
  /**
   * Returns a trace context on a Metadata object if it exists and is
   * well-formed, or null otherwise.
   * @param metadata The Metadata object from which trace context should be
   * retrieved.
   */
  function getTraceContext(metadata: grpcModule.Metadata): TraceContext | null {
    const metadataValue = metadata.getMap()[
      api.constants.TRACE_CONTEXT_GRPC_METADATA_NAME
    ] as Buffer;
    // Entry doesn't exist.
    if (!metadataValue) {
      return null;
    }
    return api.traceContextUtils.decodeFromByteArray(metadataValue);
  }

  /**
   * A helper function to record metadata in a trace span. The return value of
   * this function can be used as the 'wrapper' argument to wrap sendMetadata.
   * sendMetadata is a member of each of ServerUnaryCall, ServerWriteableStream,
   * ServerReadableStream, and ServerDuplexStream.
   * @param rootSpan The span object to which the metadata should be added.
   * @returns A function that returns a wrapped form of sendMetadata.
   */
  function sendMetadataWrapper(rootSpan: RootSpan) {
    return (sendMetadata: (responseMetadata: Metadata) => void) => {
      return function sendMetadataTrace(
        this: never,
        responseMetadata: Metadata
      ): void {
        rootSpan.addLabel(
          'metadata',
          JSON.stringify(responseMetadata.getMap())
        );
        // eslint-disable-next-line prefer-rest-params
        return sendMetadata.apply(this, arguments);
      };
    };
  }

  /**
   * Wraps a unary function in order to record trace spans.
   * @param handlerSet An object containing references to the function
   * handle.
   * @param requestName The human-friendly name of the request.
   */
  function wrapUnary<S, T>(
    handlerSet: ServerHandlerFunctions<ServerUnaryCallHandler<S, T>>,
    requestName: string
  ) {
    // handlerSet.func is the gRPC method implementation itself.
    // We wrap it so that a span is started immediately beforehand, and ended
    // when the callback provided to it as an argument is invoked.
    // TODO(kjin): shimmer cannot wrap AsyncFunction objects.
    // Once shimmer introduces this functionality, change this code to use it
    // here, and in other server wrap* methods.
    // See also https://github.com/othiym23/shimmer/pull/14.
    const serverMethod = handlerSet.func;
    handlerSet.func = function serverMethodTrace(
      this: Server,
      call: ServerUnaryCall<S>,
      callback: ServerUnaryCallback<T>
    ) {
      const rootSpanOptions = {
        name: requestName,
        url: requestName,
        traceContext: getTraceContext(call.metadata),
        skipFrames: SKIP_FRAMES,
      };
      return api.runInRootSpan(rootSpanOptions, rootSpan => {
        if (!api.isRealSpan(rootSpan)) {
          return serverMethod.call(this, call, callback);
        }
        if (api.enhancedDatabaseReportingEnabled()) {
          shimmer.wrap(call, 'sendMetadata', sendMetadataWrapper(rootSpan));
          rootSpan.addLabel('argument', JSON.stringify(call.request));
        }
        rootSpan.addLabel(api.labels.HTTP_METHOD_LABEL_KEY, 'POST');
        // Here, we patch the callback so that the span is ended immediately
        // beforehand.
        const wrappedCb: ServerUnaryCallback<T> = (
          err,
          result,
          trailer,
          flags
        ) => {
          if (api.enhancedDatabaseReportingEnabled()) {
            if (err) {
              rootSpan.addLabel('error', err);
            } else {
              rootSpan.addLabel('result', JSON.stringify(result));
            }
            if (trailer) {
              rootSpan.addLabel(
                'trailing_metadata',
                JSON.stringify(trailer.getMap())
              );
            }
          }
          rootSpan.endSpan();
          return callback(err, result, trailer, flags);
        };
        return serverMethod.call(this, call, wrappedCb);
      });
    };
  }

  /**
   * Wraps a server streaming function in order to record trace spans.
   * @param handlerSet An object containing references to the function
   * handle.
   * @param requestName The human-friendly name of the request.
   */
  function wrapServerStream<S>(
    handlerSet: ServerHandlerFunctions<ServerServerStreamingHandler<S>>,
    requestName: string
  ) {
    // handlerSet.func is the gRPC method implementation itself.
    // We wrap it so that a span is started immediately beforehand, and ended
    // when there is no data to be sent from the server.
    const serverMethod = handlerSet.func;
    handlerSet.func = function serverMethodTrace(
      this: Server,
      stream: ServerWriteableStream<S>
    ) {
      // TODO(kjin): Is it possible for a metadata value to be a buffer?
      const rootSpanOptions = {
        name: requestName,
        url: requestName,
        traceContext: getTraceContext(stream.metadata),
        skipFrames: SKIP_FRAMES,
      } as RootSpanOptions;
      return api.runInRootSpan(rootSpanOptions, rootSpan => {
        if (!api.isRealSpan(rootSpan)) {
          return serverMethod.call(this, stream);
        }
        if (api.enhancedDatabaseReportingEnabled()) {
          shimmer.wrap(stream, 'sendMetadata', sendMetadataWrapper(rootSpan));
          rootSpan.addLabel('argument', JSON.stringify(stream.request));
        }
        rootSpan.addLabel(api.labels.HTTP_METHOD_LABEL_KEY, 'POST');
        let spanEnded = false;
        const endSpan = () => {
          if (!spanEnded) {
            spanEnded = true;
            rootSpan.endSpan();
          }
        };
        // Propagate context to stream event handlers.
        api.wrapEmitter(stream);
        // stream is a WriteableStream. Emitting a 'finish' or 'error' event
        // suggests that no more data will be sent, so we end the span in
        // these event handlers.
        stream.on('finish', () => {
          // End the span unless there is an error. (If there is, the span
          // will be ended in the error event handler. This is to ensure that
          // the 'error' label is applied.)
          if (stream.status.code === 0) {
            endSpan();
          }
        });
        stream.on('error', err => {
          if (api.enhancedDatabaseReportingEnabled()) {
            rootSpan.addLabel('error', err);
          }
          endSpan();
        });
        return serverMethod.call(this, stream);
      });
    };
  }

  /**
   * Wraps a client streaming function in order to record trace spans.
   * @param handlerSet An object containing references to the function
   * handle.
   * @param requestName The human-friendly name of the request.
   */
  function wrapClientStream<S, T>(
    handlerSet: ServerHandlerFunctions<ServerClientStreamingHandler<S, T>>,
    requestName: string
  ) {
    // handlerSet.func is the gRPC method implementation itself.
    // We wrap it so that a span is started immediately beforehand, and ended
    // when the callback provided to it as an argument is invoked.
    const serverMethod = handlerSet.func;
    handlerSet.func = function serverMethodTrace(
      this: Server,
      stream: ServerReadableStream<S>,
      callback: ServerUnaryCallback<T>
    ) {
      const rootSpanOptions = {
        name: requestName,
        url: requestName,
        traceContext: getTraceContext(stream.metadata),
        skipFrames: SKIP_FRAMES,
      } as RootSpanOptions;
      return api.runInRootSpan(rootSpanOptions, rootSpan => {
        if (!api.isRealSpan(rootSpan)) {
          return serverMethod.call(this, stream, callback);
        }
        if (api.enhancedDatabaseReportingEnabled()) {
          shimmer.wrap(stream, 'sendMetadata', sendMetadataWrapper(rootSpan));
        }
        rootSpan.addLabel(api.labels.HTTP_METHOD_LABEL_KEY, 'POST');
        // Propagate context to stream event handlers.
        // stream is a ReadableStream.
        // Note that unlike server streams, the length of the span is not
        // tied to the lifetime of the stream. It should measure the time for
        // the server to send a response, not the time until all data has been
        // received from the client.
        api.wrapEmitter(stream);
        // Here, we patch the callback so that the span is ended immediately
        // beforehand.
        const wrappedCb: ServerUnaryCallback<T> = (
          err,
          result,
          trailer,
          flags
        ) => {
          if (api.enhancedDatabaseReportingEnabled()) {
            if (err) {
              rootSpan.addLabel('error', err);
            } else {
              rootSpan.addLabel('result', JSON.stringify(result));
            }
            if (trailer) {
              rootSpan.addLabel(
                'trailing_metadata',
                JSON.stringify(trailer.getMap())
              );
            }
          }
          rootSpan.endSpan();
          return callback(err, result, trailer, flags);
        };
        return serverMethod.call(this, stream, wrappedCb);
      });
    };
  }

  /**
   * Wraps a bidirectional streaming function in order to record trace spans.
   * @param handlerSet An object containing references to the function
   * handle.
   * @param requestName The human-friendly name of the request.
   */
  function wrapBidi<S, T>(
    handlerSet: ServerHandlerFunctions<
      ServerBidiectionalStreamingHandler<S, T>
    >,
    requestName: string
  ) {
    // handlerSet.func is the gRPC method implementation itself.
    // We wrap it so that a span is started immediately beforehand, and ended
    // when there is no data to be sent from the server.
    const serverMethod = handlerSet.func;
    handlerSet.func = function serverMethodTrace(
      this: Server,
      stream: ServerDuplexStream<S, T>
    ) {
      const rootSpanOptions = {
        name: requestName,
        url: requestName,
        traceContext: getTraceContext(stream.metadata),
        skipFrames: SKIP_FRAMES,
      } as RootSpanOptions;
      return api.runInRootSpan(rootSpanOptions, rootSpan => {
        if (!api.isRealSpan(rootSpan)) {
          return serverMethod.call(this, stream);
        }
        if (api.enhancedDatabaseReportingEnabled()) {
          shimmer.wrap(stream, 'sendMetadata', sendMetadataWrapper(rootSpan));
        }
        rootSpan.addLabel(api.labels.HTTP_METHOD_LABEL_KEY, 'POST');
        let spanEnded = false;
        const endSpan = () => {
          if (!spanEnded) {
            spanEnded = true;
            rootSpan.endSpan();
          }
        };
        // Propagate context in stream event handlers.
        api.wrapEmitter(stream);
        // stream is a Duplex. Emitting a 'finish' or 'error' event
        // suggests that no more data will be sent, so we end the span in
        // these event handlers.
        // Similar to client streams, the trace span should measure the time
        // until the server has finished sending data back to the client, not
        // the time that all data has been received from the client.
        stream.on('finish', () => {
          // End the span unless there is an error.
          if (stream.status.code === 0) {
            endSpan();
          }
        });
        stream.on('error', (err: Error) => {
          if (!spanEnded && api.enhancedDatabaseReportingEnabled()) {
            rootSpan.addLabel('error', err);
          }
          endSpan();
        });
        return serverMethod.call(this, stream);
      });
    };
  }

  /**
   * Returns a function that wraps the gRPC server register function in order
   * to create trace spans for gRPC service methods.
   * @param register The function Server.prototype.register
   * @returns registerTrace The new wrapper function.
   */
  function serverRegisterWrap<S, T>(
    register: ServerRegisterFunction
  ): ServerRegisterFunction {
    return function registerTrace(
      this: Server,
      name,
      handler,
      serialize,
      deserialize,
      methodType
    ) {
      // register(n, h, s, d, m) is called in addService once for each service
      // method. Its role is to assign the serialize, deserialize, and user
      // logic handlers for each exposed service method. Here, we wrap these
      // functions depending on the method type.
      // eslint-disable-next-line prefer-rest-params
      const result = register.apply(this, arguments);
      const handlerSet = this.handlers[name] as ServerHandlerFunctions<
        ServerHandler<S, T>
      >;
      const requestName = 'grpc:' + name;
      // Proceed to wrap methods that are invoked when a gRPC service call is
      // made. In every case, the function 'func' is the user-implemented
      // handling function.
      switch (methodType) {
        case 'unary':
          wrapUnary(
            handlerSet as ServerHandlerFunctions<ServerUnaryCallHandler<S, T>>,
            requestName
          );
          break;
        case 'server_stream':
          wrapServerStream(
            handlerSet as ServerHandlerFunctions<
              ServerServerStreamingHandler<S>
            >,
            requestName
          );
          break;
        case 'client_stream':
          wrapClientStream(
            handlerSet as ServerHandlerFunctions<
              ServerClientStreamingHandler<S, T>
            >,
            requestName
          );
          break;
        case 'bidi':
          wrapBidi(
            handlerSet as ServerHandlerFunctions<
              ServerBidiectionalStreamingHandler<S, T>
            >,
            requestName
          );
          break;
        default:
          // Not expected. gRPC does not assign methodType to anything other
          // than the values above.
          break;
      }
      return result;
    };
  }

  // Wrap Server.prototype.register
  shimmer.wrap(server.Server.prototype, 'register', serverRegisterWrap);
}

function unpatchServer(server: ServerModule) {
  // Unwrap Server.prototype.register
  shimmer.unwrap(server.Server.prototype, 'register');
}

// # Exports

const plugin: Plugin = [
  {
    file: 'src/node/src/client.js',
    versions: '0.13 - 1.6',
    patch: patchClient,
    unpatch: unpatchClient,
  },
  {
    file: 'src/node/src/metadata.js',
    versions: '0.13 - 1.6',
    patch: patchMetadata,
    unpatch: unpatchMetadata,
  },
  {
    file: 'src/node/src/server.js',
    versions: '0.13 - 1.6',
    patch: patchServer,
    unpatch: unpatchServer,
  },
  {
    file: 'src/client.js',
    versions: '^1.7',
    patch: patchClient,
    unpatch: unpatchClient,
  },
  {
    file: 'src/metadata.js',
    versions: '^1.7',
    patch: patchMetadata,
    unpatch: unpatchMetadata,
  },
  {
    file: 'src/server.js',
    versions: '^1.7',
    patch: patchServer,
    unpatch: unpatchServer,
  },
];

export = plugin;

results matching ""

    No results matching ""