import {
Abortable,
BodyResponseCallback,
DecorateRequestOptions,
Service,
ServiceConfig,
util,
} from '@google-cloud/common';
import {replaceProjectIdToken} from '@google-cloud/projectify';
import {
loadSync,
PackageDefinition,
ServiceDefinition,
} from '@grpc/proto-loader';
import * as duplexify from 'duplexify';
import {EventEmitter} from 'events';
import * as extend from 'extend';
import * as grpc from '@grpc/grpc-js';
import * as is from 'is';
import {Request, Response} from 'teeny-request';
import * as retryRequest from 'retry-request';
import {Duplex, PassThrough} from 'stream';
export interface ServiceRequestCallback {
(err: Error | null, apiResponse?: Response): void;
}
interface RetryOptions {
objectMode?: boolean;
request?: any;
retries?: number;
noResponseRetries?: number;
currentRetryAttempt?: number;
shouldRetryFn?: (response: Response) => boolean;
}
export interface ProtoOpts {
service: string;
method: string;
timeout?: number;
retryOpts?: RetryOptions;
stream?: Duplex;
}
interface GrpcOptions {
deadline?: Date;
}
export interface GrpcServiceConfig extends ServiceConfig {
grpc?: typeof grpc;
grpcVersion?: string;
grpcMetadata: grpc.Metadata;
protosDir: string;
protoServices: {
[serviceName: string]: {path: string; service: string; baseUrl: string};
};
customEndpoint: boolean;
}
const GRPC_ERROR_CODE_TO_HTTP = {
0: {
code: 200,
message: 'OK',
},
1: {
code: 499,
message: 'Client Closed Request',
},
2: {
code: 500,
message: 'Internal Server Error',
},
3: {
code: 400,
message: 'Bad Request',
},
4: {
code: 504,
message: 'Gateway Timeout',
},
5: {
code: 404,
message: 'Not Found',
},
6: {
code: 409,
message: 'Conflict',
},
7: {
code: 403,
message: 'Forbidden',
},
8: {
code: 429,
message: 'Too Many Requests',
},
9: {
code: 412,
message: 'Precondition Failed',
},
10: {
code: 409,
message: 'Conflict',
},
11: {
code: 400,
message: 'Bad Request',
},
12: {
code: 501,
message: 'Not Implemented',
},
13: {
code: 500,
message: 'Internal Server Error',
},
14: {
code: 503,
message: 'Service Unavailable',
},
15: {
code: 500,
message: 'Internal Server Error',
},
16: {
code: 401,
message: 'Unauthorized',
},
};
const GRPC_SERVICE_OPTIONS = {
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': -1,
'grpc.initial_reconnect_backoff_ms': 5000,
};
export interface ObjectToStructConverterConfig {
removeCircular?: boolean;
stringify?: boolean;
}
export class ObjectToStructConverter {
seenObjects: Set<{}>;
removeCircular: boolean;
stringify?: boolean;
constructor(options?: ObjectToStructConverterConfig) {
options = options || {};
this.seenObjects = new Set();
this.removeCircular = options.removeCircular === true;
this.stringify = options.stringify === true;
}
convert(obj: {}) {
const convertedObject = {
fields: {},
};
this.seenObjects.add(obj);
for (const prop in obj) {
if (Object.prototype.hasOwnProperty.call(obj, prop)) {
const value = obj[prop];
if (is.undefined(value)) {
continue;
}
convertedObject.fields[prop] = this.encodeValue_(value);
}
}
this.seenObjects.delete(obj);
return convertedObject;
}
encodeValue_(value: {}) {
let convertedValue;
if (is.null(value)) {
convertedValue = {
nullValue: 0,
};
} else if (is.number(value)) {
convertedValue = {
numberValue: value,
};
} else if (is.string(value)) {
convertedValue = {
stringValue: value,
};
} else if (is.boolean(value)) {
convertedValue = {
boolValue: value,
};
} else if (Buffer.isBuffer(value)) {
convertedValue = {
blobValue: value,
};
} else if (is.object(value)) {
if (this.seenObjects.has(value)) {
if (!this.removeCircular) {
throw new Error(
[
'This object contains a circular reference. To automatically',
'remove it, set the `removeCircular` option to true.',
].join(' ')
);
}
convertedValue = {
stringValue: '[Circular]',
};
} else {
convertedValue = {
structValue: this.convert(value),
};
}
} else if (is.array(value)) {
convertedValue = {
listValue: {
values: (value as Array<{}>).map(this.encodeValue_.bind(this)),
},
};
} else {
if (!this.stringify) {
throw new Error('Value of type ' + typeof value + ' not recognized.');
}
convertedValue = {
stringValue: String(value),
};
}
return convertedValue;
}
}
export class GrpcService extends Service {
grpc?: typeof grpc;
grpcVersion?: string;
grpcCredentials?: {};
grpcMetadata?: {add: Function};
maxRetries?: number;
userAgent?: string;
activeServiceMap_ = new Map();
protos = {};
private static protoObjectCache: {[name: string]: PackageDefinition} = {};
static readonly GRPC_SERVICE_OPTIONS = GRPC_SERVICE_OPTIONS;
static readonly GRPC_ERROR_CODE_TO_HTTP = GRPC_ERROR_CODE_TO_HTTP;
static readonly ObjectToStructConverter = ObjectToStructConverter;
constructor(config: GrpcServiceConfig, options) {
super(config, options);
if (global['GCLOUD_SANDBOX_ENV']) {
return global['GCLOUD_SANDBOX_ENV'];
}
if (config.grpc) {
this.grpc = config.grpc;
this.grpcVersion = config.grpcVersion || 'grpc/unknown';
} else {
this.grpc = grpc;
this.grpcVersion =
'grpc-js/' + require('@grpc/grpc-js/package.json').version;
}
if (config.customEndpoint) {
this.grpcCredentials = this.grpc.credentials.createInsecure();
}
this.grpcMetadata = new this.grpc.Metadata();
this.grpcMetadata.add(
'x-goog-api-client',
[
'gl-node/' + process.versions.node,
'gccl/' + config.packageJson.version,
this.grpcVersion,
].join(' ')
);
if (config.grpcMetadata) {
for (const prop in config.grpcMetadata) {
if (config.grpcMetadata.hasOwnProperty(prop)) {
this.grpcMetadata.add(prop, config.grpcMetadata[prop]);
}
}
}
this.maxRetries = options.maxRetries;
this.userAgent = util.getUserAgentFromPackageJson(config.packageJson);
this.activeServiceMap_ = new Map();
this.protos = {};
const protoServices = config.protoServices;
Object.keys(protoServices).forEach(name => {
const protoConfig = protoServices[name];
const services = this.loadProtoFile(protoConfig.path, config);
const serviceKey = ['google', protoConfig.service, name]
.filter(x => x)
.join('.');
const service = services[serviceKey] as ServiceDefinition & {
baseUrl?: string;
};
this.protos[name] = service;
if (protoConfig.baseUrl) {
service.baseUrl = protoConfig.baseUrl;
}
});
}
request(reqOpts: DecorateRequestOptions): Promise<Response>;
request(
reqOpts: DecorateRequestOptions,
callback: BodyResponseCallback
): void;
request(
reqOpts: DecorateRequestOptions,
callback?: BodyResponseCallback
): void | Promise<Response>;
request(
protoOpts: ProtoOpts,
reqOpts: DecorateRequestOptions,
callback: ServiceRequestCallback
): Abortable | void;
request(
pOpts: ProtoOpts | DecorateRequestOptions,
rOpts?: DecorateRequestOptions | BodyResponseCallback,
callback?: ServiceRequestCallback
): Abortable | void | Promise<Response> {
const protoOpts = pOpts as ProtoOpts;
let reqOpts = rOpts as DecorateRequestOptions;
if (global['GCLOUD_SANDBOX_ENV']) {
return global['GCLOUD_SANDBOX_ENV'];
}
if (!this.grpcCredentials) {
this.getGrpcCredentials_((err, credentials) => {
if (err) {
callback!(err);
return;
}
this.grpcCredentials = credentials;
this.request(protoOpts, reqOpts, callback!);
});
return;
}
const service = this.getService_(protoOpts);
const metadata = this.grpcMetadata;
const grpcOpts: GrpcOptions = {};
if (typeof protoOpts.timeout === 'number') {
grpcOpts.deadline = GrpcService.createDeadline_(protoOpts.timeout);
}
try {
reqOpts = this.decorateRequest_(reqOpts);
} catch (e) {
callback!(e);
return;
}
let respError;
const retryOpts = Object.assign(
{
retries: this.maxRetries,
currentRetryAttempt: 0,
shouldRetryFn: GrpcService.shouldRetryRequest_,
request(_, onResponse) {
respError = null;
return service[protoOpts.method](
reqOpts,
metadata,
grpcOpts,
(err, resp) => {
if (err) {
respError = GrpcService.decorateError_(err);
if (respError) {
onResponse(null, respError);
return;
}
onResponse(err, resp);
return;
}
onResponse(null, resp);
}
);
},
},
protoOpts.retryOpts
);
return retryRequest(null!, retryOpts, (err, resp: object) => {
if (!err && resp === respError) {
err = respError;
resp = null!;
}
callback!(err, resp as Response);
});
}
requestStream(reqOpts: DecorateRequestOptions): Request;
requestStream(protoOpts: ProtoOpts, reqOpts: DecorateRequestOptions): Duplex;
requestStream(
pOpts: ProtoOpts | DecorateRequestOptions,
rOpts?: DecorateRequestOptions
): Duplex | Request {
if (global['GCLOUD_SANDBOX_ENV']) {
return new PassThrough({objectMode: true});
}
const protoOpts = pOpts as ProtoOpts;
let reqOpts = rOpts as DecorateRequestOptions;
if (!protoOpts.stream) {
protoOpts.stream = new PassThrough({objectMode: true});
}
const stream = protoOpts.stream;
if (!this.grpcCredentials) {
this.getGrpcCredentials_((err, credentials) => {
if (err) {
stream.destroy(err);
return;
}
this.grpcCredentials = credentials;
this.requestStream(protoOpts, reqOpts);
});
return stream;
}
const objectMode = !!reqOpts.objectMode;
const service = this.getService_(protoOpts);
const grpcMetadata = this.grpcMetadata;
const grpcOpts: GrpcOptions = {};
if (typeof protoOpts.timeout === 'number') {
grpcOpts.deadline = GrpcService.createDeadline_(protoOpts.timeout);
}
try {
reqOpts = this.decorateRequest_(reqOpts);
} catch (e) {
setImmediate(() => {
stream.destroy(e);
});
return stream;
}
const retryOpts = Object.assign(
{
retries: this.maxRetries,
currentRetryAttempt: 0,
objectMode,
shouldRetryFn: GrpcService.shouldRetryRequest_,
request() {
const ee: EventEmitter = service[protoOpts.method](
reqOpts,
grpcMetadata,
grpcOpts
).on('metadata', () => {
const grcpStatus = GrpcService.decorateStatus_({code: 0});
ee.emit('response', grcpStatus);
});
return ee;
},
},
protoOpts.retryOpts
);
return (retryRequest(null!, retryOpts) as any)
.on('error', err => {
const grpcError = GrpcService.decorateError_(err);
stream.destroy(grpcError || err);
})
.on('request', stream.emit.bind(stream, 'request'))
.pipe(stream);
}
requestWritableStream(protoOpts, reqOpts) {
const stream =
(protoOpts.stream = protoOpts.stream || (duplexify as any).obj());
if (global['GCLOUD_SANDBOX_ENV']) {
return stream;
}
const self = this;
if (!this.grpcCredentials) {
this.getGrpcCredentials_((err, credentials) => {
if (err) {
stream.destroy(err);
return;
}
self.grpcCredentials = credentials;
self.requestWritableStream(protoOpts, reqOpts);
});
return stream;
}
const service = this.getService_(protoOpts);
const grpcMetadata = this.grpcMetadata;
const grpcOpts: GrpcOptions = {};
if (is.number(protoOpts.timeout)) {
grpcOpts.deadline = GrpcService.createDeadline_(protoOpts.timeout);
}
try {
reqOpts = this.decorateRequest_(reqOpts);
} catch (e) {
setImmediate(() => {
stream.destroy(e);
});
return stream;
}
const grpcStream = service[protoOpts.method](
reqOpts,
grpcMetadata,
grpcOpts
)
.on('status', status => {
const grcpStatus = GrpcService.decorateStatus_(status);
stream.emit('response', grcpStatus || status);
})
.on('error', err => {
const grpcError = GrpcService.decorateError_(err);
stream.destroy(grpcError || err);
});
stream.setReadable(grpcStream);
stream.setWritable(grpcStream);
return stream;
}
static decodeValue_(value) {
switch (value.kind) {
case 'structValue': {
return GrpcService.structToObj_(value.structValue);
}
case 'nullValue': {
return null;
}
case 'listValue': {
return value.listValue.values.map(GrpcService.decodeValue_);
}
default: {
return value[value.kind];
}
}
}
static encodeValue_(value) {
return new GrpcService.ObjectToStructConverter().encodeValue_(value);
}
private static createDeadline_(timeout: number) {
return new Date(Date.now() + timeout);
}
static decorateError_(err: Error): Error | null {
const errorObj = is.error(err) ? err : {};
return GrpcService.decorateGrpcResponse_(errorObj, err);
}
private static decorateGrpcResponse_(obj, response) {
if (response && GRPC_ERROR_CODE_TO_HTTP[response.code]) {
const defaultResponseDetails = GRPC_ERROR_CODE_TO_HTTP[response.code];
let message = defaultResponseDetails.message;
if (response.message) {
try {
message = JSON.parse(response.message).description;
} catch (e) {
message = response.message;
}
}
return extend(true, obj, response, {
code: defaultResponseDetails.code,
message,
});
}
return null;
}
private static decorateStatus_(status) {
return GrpcService.decorateGrpcResponse_({}, status);
}
private static shouldRetryRequest_(response) {
return [429, 500, 502, 503].indexOf(response.code) > -1;
}
private static objToStruct_(obj, options) {
return new GrpcService.ObjectToStructConverter(options).convert(obj);
}
private static structToObj_(struct) {
const convertedObject = {};
for (const prop in struct.fields) {
if (struct.fields.hasOwnProperty(prop)) {
const value = struct.fields[prop];
convertedObject[prop] = GrpcService.decodeValue_(value);
}
}
return convertedObject;
}
decorateRequest_(reqOpts) {
reqOpts = Object.assign({}, reqOpts);
delete reqOpts.autoPaginate;
delete reqOpts.autoPaginateVal;
delete reqOpts.objectMode;
return replaceProjectIdToken(reqOpts, this.projectId);
}
private getGrpcCredentials_(callback) {
this.authClient.getClient().then(client => {
const credentials = this.grpc!.credentials.combineChannelCredentials(
this.grpc!.credentials.createSsl(),
this.grpc!.credentials.createFromGoogleCredential(client)
);
if (!this.projectId || this.projectId === '{{projectId}}') {
this.projectId = client.projectId!;
}
callback(null, credentials);
}, callback);
}
private loadProtoFile(
protoPath: string,
config: GrpcServiceConfig
): PackageDefinition {
const protoObjectCacheKey = [config.protosDir, protoPath].join('$');
if (!GrpcService.protoObjectCache[protoObjectCacheKey]) {
const services = loadSync(protoPath, {
keepCase: false,
defaults: true,
bytes: String,
longs: String,
enums: String,
oneofs: true,
includeDirs: [config.protosDir],
});
GrpcService.protoObjectCache[protoObjectCacheKey] = services;
}
return GrpcService.protoObjectCache[protoObjectCacheKey];
}
private getService_(protoOpts) {
const proto = this.protos[protoOpts.service];
let service = this.activeServiceMap_.get(protoOpts.service);
if (!service) {
service = new proto[protoOpts.service](
proto.baseUrl || this.baseUrl,
this.grpcCredentials,
Object.assign(
{
'grpc.primary_user_agent': this.userAgent,
},
GRPC_SERVICE_OPTIONS
)
);
this.activeServiceMap_.set(protoOpts.service, service);
}
return service;
}
}