src/fallbackServiceStub.ts
Properties |
Signature :
[url: RequestInfo, init: RequestInit]
|
Returns : Promise<Response>
|
import nodeFetch from 'node-fetch';
import {Response as NodeFetchResponse} from 'node-fetch';
import {AbortController as NodeAbortController} from 'abort-controller';
import {hasWindowFetch, hasAbortController} from './featureDetection';
import {AuthClient} from './fallback';
import {StreamArrayParser} from './streamArrayParser';
import {pipeline, PipelineSource} from 'stream';
interface NodeFetchType {
(url: RequestInfo, init?: RequestInit): Promise<Response>;
}
export interface FallbackServiceStub {
// Compatible with gRPC service stub
[method: string]: (
request: {},
options: {},
metadata: {},
callback: (err?: Error, response?: {} | undefined) => void
) => StreamArrayParser | {cancel: () => void};
}
export interface FetchParameters {
headers: {[key: string]: string};
body: Buffer | Uint8Array | string;
method: 'get' | 'post' | 'put' | 'patch' | 'delete';
url: string;
}
export function generateServiceStub(
rpcs: {[name: string]: protobuf.Method},
protocol: string,
servicePath: string,
servicePort: number,
authClient: AuthClient,
requestEncoder: (
rpc: protobuf.Method,
protocol: string,
servicePath: string,
servicePort: number,
request: {}
) => FetchParameters,
responseDecoder: (
rpc: protobuf.Method,
ok: boolean,
response: Buffer | ArrayBuffer
) => {}
) {
const fetch = hasWindowFetch()
? window.fetch
: (nodeFetch as unknown as NodeFetchType);
const serviceStub: FallbackServiceStub = {};
for (const [rpcName, rpc] of Object.entries(rpcs)) {
serviceStub[rpcName] = (
request: {},
options: {[name: string]: string},
_metadata: {},
callback: Function
) => {
// We cannot use async-await in this function because we need to return the canceller object as soon as possible.
// Using plain old promises instead.
const cancelController = hasAbortController()
? new AbortController()
: new NodeAbortController();
const cancelSignal = cancelController.signal;
let cancelRequested = false;
const fetchParameters = requestEncoder(
rpc,
protocol,
servicePath,
servicePort,
request
);
const url = fetchParameters.url;
const headers = fetchParameters.headers;
for (const key of Object.keys(options)) {
headers[key] = options[key][0];
}
const streamArrayParser = new StreamArrayParser(rpc);
authClient
.getRequestHeaders()
.then(authHeader => {
const fetchRequest = {
headers: {
...authHeader,
...headers,
},
body: fetchParameters.body as
| string
| Buffer
| Uint8Array
| undefined,
method: fetchParameters.method,
signal: cancelSignal,
};
if (
fetchParameters.method === 'get' ||
fetchParameters.method === 'delete'
) {
delete fetchRequest['body'];
}
return fetch(url, fetchRequest);
})
.then((response: Response | NodeFetchResponse) => {
if (response.ok && rpc.responseStream) {
pipeline(
response.body as PipelineSource<unknown>,
streamArrayParser,
(err: unknown) => {
if (
err &&
(!cancelRequested ||
(err instanceof Error && err.name !== 'AbortError'))
) {
if (callback) {
callback(err);
}
streamArrayParser.emit('error', err);
}
}
);
return;
} else {
return Promise.all([
Promise.resolve(response.ok),
response.arrayBuffer(),
])
.then(([ok, buffer]: [boolean, Buffer | ArrayBuffer]) => {
const response = responseDecoder(rpc, ok, buffer);
callback(null, response);
})
.catch((err: Error) => {
if (!cancelRequested || err.name !== 'AbortError') {
callback(err);
}
});
}
});
if (rpc.responseStream) {
return streamArrayParser;
}
return {
cancel: () => {
cancelRequested = true;
cancelController.abort();
},
};
};
}
return serviceStub;
}