/*!
* Copyright 2017 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const common = require('@google-cloud/common');
const pumpify = require('pumpify');
const streamEvents = require('stream-events');
const {PassThrough} = require('stream');
/*!
* Return a dictionary-like object with helpers to augment the Speech
* GAPIC.
*/
module.exports = () => {
const methods = {};
/**
* Performs bidirectional streaming speech recognition: receive results while
* sending audio. This method is only available via the gRPC API (not REST).
*
* @method v1.SpeechClient#streamingRecognize
* @param {object} config The configuration for the stream. This is
* appropriately wrapped and sent as the first argument. It should be an
* object conforming to the [StreamingRecognitionConfig]{@link StreamingRecognitionConfig}
* structure.
* @param {object} [options] Optional parameters. You can override the default
* settings for this call, e.g, timeout, retries, paginations, etc. See
* [gax.CallOptions]{@link https://googleapis.github.io/gax-nodejs/global.html#CallOptions}
* for the details.
* @returns {stream} An object stream which is both readable and writable. It
* accepts raw audio for the `write()` method, and will emit objects
* representing [StreamingRecognizeResponse]{@link StreamingRecognizeResponse}
* on the 'data' event asynchronously.
*
* @example
* const speech = require('@google-cloud/speech');
* const client = new speech.SpeechClient();
*
* const stream = client.streamingRecognize({
* config: {
* encoding: 'LINEAR16',
* languageCode: 'en-us',
* sampleRateHertz: 44100,
* },
* }).on('data', function(response) {
* // doThingsWith(response);
* });
* const request = {};
* // Write request objects.
* stream.write(request);
*/
methods.streamingRecognize = function(streamingConfig, options) {
options = options || {};
streamingConfig = streamingConfig || {};
// Format the audio content as input request for pipeline
const recognizeStream = streamEvents(pumpify.obj());
const requestStream = this._innerApiCalls
.streamingRecognize(options)
.on('error', err => {
recognizeStream.destroy(err);
})
.on('response', response => {
recognizeStream.emit('response', response);
});
// Attach the events to the request stream, but only do so
// when the first write (of data) comes in.
//
// This also means that the sending of the initial request (with the
// config) is delayed until we get the first burst of data.
recognizeStream.once('writing', () => {
// The first message should contain the streaming config.
requestStream.write({streamingConfig});
// Set up appropriate piping between the stream returned by
// the underlying API method and the one that we return.
recognizeStream.setPipeline([
// Format the user's input.
// This entails that the user sends raw audio; it is wrapped in
// the appropriate request structure.
new PassThrough({
objectMode: true,
transform: (audioContent, _, next) => {
if (audioContent !== undefined) {
next(null, {audioContent});
return;
}
next();
},
}),
requestStream,
new PassThrough({
objectMode: true,
transform: (response, enc, next) => {
if (response.error) {
next(new common.util.ApiError(response.error));
return;
}
next(null, response);
},
}),
]);
});
return recognizeStream;
};
return methods;
};