"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
/*!
* Copyright 2018 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const stream_1 = require("stream");
const mutation_1 = require("./mutation");
class TransformError extends Error {
constructor(props) {
super();
this.name = 'TransformError';
this.message = `${props.message}: ${JSON.stringify(props.chunk)}`;
}
}
/**
* Enum for chunk formatter Row state.
* NEW_ROW: inital state or state after commitRow or resetRow
* ROW_IN_PROGRESS: state after first valid chunk without commitRow or resetRow
* CELL_IN_PROGRESS: state when valueSize > 0(partial cell)
*/
exports.RowStateEnum = Object.freeze({
NEW_ROW: 1,
ROW_IN_PROGRESS: 2,
CELL_IN_PROGRESS: 3,
});
/**
* ChunkTransformer formats all incoming chunks in to row
* keeps all intermediate state until end of stream.
* Should use new instance for each request.
*/
class ChunkTransformer extends stream_1.Transform {
constructor(options = {}) {
options.objectMode = true; // forcing object mode
super(options);
this.options = options;
this._destroyed = false;
this.lastRowKey = undefined;
this.reset();
}
/**
* called at end of the stream.
* @public
* @param {callback} cb callback will be called with error if there is any uncommitted row
*/
_flush(cb) {
if (typeof this.row.key !== 'undefined') {
this.destroy(new TransformError({
message: 'Response ended with pending row without commit',
chunk: null,
}));
return;
}
cb();
}
/**
* transform the readrowsresponse chunks into friendly format. Chunks contain
* 3 properties:
*
* `rowContents` The row contents, this essentially is all data pertaining
* to a single family.
*
* `commitRow` This is a boolean telling us the all previous chunks for this
* row are ok to consume.
*
* `resetRow` This is a boolean telling us that all the previous chunks are to
* be discarded.
*
* @public
*
* @param {object} data readrows response containing array of chunks.
* @param {object} [enc] encoding options.
* @param {callback} next callback will be called once data is processed, with error if any error in processing
*/
_transform(data, enc, next) {
for (const chunk of data.chunks) {
switch (this.state) {
case exports.RowStateEnum.NEW_ROW:
this.processNewRow(chunk);
break;
case exports.RowStateEnum.ROW_IN_PROGRESS:
this.processRowInProgress(chunk);
break;
case exports.RowStateEnum.CELL_IN_PROGRESS:
this.processCellInProgress(chunk);
break;
}
if (this._destroyed) {
return;
}
}
if (data.lastScannedRowKey && data.lastScannedRowKey.length > 0) {
this.lastRowKey = mutation_1.Mutation.convertFromBytes(data.lastScannedRowKey, {
userOptions: this.options,
});
}
next();
}
/**
* called when stream is destroyed.
* @public
* @param {error} err error if any
*/
destroy(err) {
if (this._destroyed)
return;
this._destroyed = true;
if (err) {
this.emit('error', err);
}
this.emit('close');
}
/**
* Resets state of formatter
* @private
*/
reset() {
this.family = {};
this.qualifiers = [];
this.qualifier = {};
this.row = {};
this.state = exports.RowStateEnum.NEW_ROW;
}
/**
* sets lastRowkey and calls reset when row is committed.
* @private
*/
commit() {
const row = this.row;
this.reset();
this.lastRowKey = row.key;
}
/**
* Validates valuesize and commitrow in a chunk
* @private
* @param {chunk} chunk chunk to validate for valuesize and commitRow
*/
validateValueSizeAndCommitRow(chunk) {
if (chunk.valueSize > 0 && chunk.commitRow) {
this.destroy(new TransformError({
message: 'A row cannot be have a value size and be a commit row',
chunk,
}));
}
}
/**
* Validates resetRow condition for chunk
* @private
* @param {chunk} chunk chunk to validate for resetrow
*/
validateResetRow(chunk) {
const containsData = (chunk.rowKey && chunk.rowKey.length !== 0) ||
chunk.familyName ||
chunk.qualifier ||
(chunk.value && chunk.value.length !== 0) ||
chunk.timestampMicros > 0;
if (chunk.resetRow && containsData) {
this.destroy(new TransformError({
message: 'A reset should have no data',
chunk,
}));
}
}
/**
* Validates state for new row.
* @private
* @param {chunk} chunk chunk to validate
* @param {newRowKey} newRowKey newRowKey of the new row
*/
validateNewRow(chunk, newRowKey) {
const row = this.row;
const lastRowKey = this.lastRowKey;
let errorMessage;
if (typeof row.key !== 'undefined') {
errorMessage = 'A new row cannot have existing state';
}
else if (typeof chunk.rowKey === 'undefined' ||
chunk.rowKey.length === 0 ||
newRowKey.length === 0) {
errorMessage = 'A row key must be set';
}
else if (chunk.resetRow) {
errorMessage = 'A new row cannot be reset';
}
else if (lastRowKey === newRowKey) {
errorMessage = 'A commit happened but the same key followed';
}
else if (!chunk.familyName) {
errorMessage = 'A family must be set';
}
else if (chunk.qualifier === null || chunk.qualifier === undefined) {
errorMessage = 'A column qualifier must be set';
}
if (errorMessage) {
this.destroy(new TransformError({ message: errorMessage, chunk }));
return;
}
this.validateValueSizeAndCommitRow(chunk);
}
/**
* Validates state for rowInProgress
* @private
* @param {chunk} chunk chunk to validate
*/
validateRowInProgress(chunk) {
const row = this.row;
if (chunk.rowKey && chunk.rowKey.length) {
const newRowKey = mutation_1.Mutation.convertFromBytes(chunk.rowKey, {
userOptions: this.options,
});
const oldRowKey = row.key || '';
if (newRowKey &&
chunk.rowKey &&
newRowKey.length !== 0 &&
newRowKey.toString() !== oldRowKey.toString()) {
this.destroy(new TransformError({
message: 'A commit is required between row keys',
chunk,
}));
return;
}
}
if (chunk.familyName &&
(chunk.qualifier === null || chunk.qualifier === undefined)) {
this.destroy(new TransformError({
message: 'A qualifier must be specified',
chunk,
}));
return;
}
this.validateResetRow(chunk);
this.validateValueSizeAndCommitRow(chunk);
}
/**
* Validates chunk for cellInProgress state.
* @private
* @param {chunk} chunk chunk to validate
*/
validateCellInProgress(chunk) {
this.validateResetRow(chunk);
this.validateValueSizeAndCommitRow(chunk);
}
/**
* Moves to next state in processing.
* @private
* @param {chunk} chunk chunk in process
*/
moveToNextState(chunk) {
const row = this.row;
if (chunk.commitRow) {
this.push(row);
this.commit();
this.lastRowKey = row.key;
}
else {
if (chunk.valueSize > 0) {
this.state = exports.RowStateEnum.CELL_IN_PROGRESS;
}
else {
this.state = exports.RowStateEnum.ROW_IN_PROGRESS;
}
}
}
/**
* Process chunk when in NEW_ROW state.
* @private
* @param {chunks} chunk chunk to process
*/
processNewRow(chunk) {
const newRowKey = mutation_1.Mutation.convertFromBytes(chunk.rowKey, {
userOptions: this.options,
});
this.validateNewRow(chunk, newRowKey);
if (chunk.familyName && chunk.qualifier) {
const row = this.row;
row.key = newRowKey;
row.data = {};
this.family = row.data[chunk.familyName.value] = {};
const qualifierName = mutation_1.Mutation.convertFromBytes(chunk.qualifier.value, {
userOptions: this.options,
});
this.qualifiers = this.family[qualifierName] = [];
this.qualifier = {
value: mutation_1.Mutation.convertFromBytes(chunk.value, {
userOptions: this.options,
isPossibleNumber: true,
}),
labels: chunk.labels,
timestamp: chunk.timestampMicros,
};
this.qualifiers.push(this.qualifier);
this.moveToNextState(chunk);
}
}
/**
* Process chunk when in ROW_IN_PROGRESS state.
* @private
* @param {chunk} chunk chunk to process
*/
processRowInProgress(chunk) {
this.validateRowInProgress(chunk);
if (chunk.resetRow) {
return this.reset();
}
const row = this.row;
if (chunk.familyName) {
this.family = row.data[chunk.familyName.value] =
row.data[chunk.familyName.value] || {};
}
if (chunk.qualifier) {
const qualifierName = mutation_1.Mutation.convertFromBytes(chunk.qualifier.value, {
userOptions: this.options,
});
this.qualifiers = this.family[qualifierName] =
this.family[qualifierName] || [];
}
this.qualifier = {
value: mutation_1.Mutation.convertFromBytes(chunk.value, {
userOptions: this.options,
isPossibleNumber: true,
}),
labels: chunk.labels,
timestamp: chunk.timestampMicros,
};
this.qualifiers.push(this.qualifier);
this.moveToNextState(chunk);
}
/**
* Process chunk when in CELl_IN_PROGRESS state.
* @private
* @param {chunk} chunk chunk to process
*/
processCellInProgress(chunk) {
this.validateCellInProgress(chunk);
if (chunk.resetRow) {
return this.reset();
}
const chunkQualifierValue = mutation_1.Mutation.convertFromBytes(chunk.value, {
userOptions: this.options,
});
if (chunkQualifierValue instanceof Buffer &&
this.qualifier.value instanceof Buffer) {
this.qualifier.value = Buffer.concat([
this.qualifier.value,
chunkQualifierValue,
]);
}
else {
this.qualifier.value += chunkQualifierValue;
}
this.moveToNextState(chunk);
}
}
exports.ChunkTransformer = ChunkTransformer;
//# sourceMappingURL=chunktransformer.js.map