Mutations Batching¶
User friendly container for Google Cloud Bigtable MutationBatcher.
- exception google.cloud.bigtable.batcher.MutationsBatchError(message, exc)[source]¶
Bases:
Exception
Error in the batch request
- class google.cloud.bigtable.batcher.MutationsBatcher(table, flush_count=100, max_row_bytes=20971520, flush_interval=1, batch_completed_callback=None)[source]¶
Bases:
object
A MutationsBatcher is used in batch cases where the number of mutations is large or unknown. It will store
DirectRow
in memory until one of the size limits is reached, or an explicit call toflush()
is performed. When a flush event occurs, theDirectRow
in memory will be sent to Cloud Bigtable. Batching mutations is more efficient than sending individual request.This class is not suited for usage in systems where each mutation must be guaranteed to be sent, since calling mutate may only result in an in-memory change. In a case of a system crash, any
DirectRow
remaining in memory will not necessarily be sent to the service, even after the completion of themutate()
method.Note on thread safety: The same
MutationBatcher
cannot be shared by multiple end-user threads.- Parameters
table (class) – class:~google.cloud.bigtable.table.Table.
flush_count (int) – (Optional) Max number of rows to flush. If it reaches the max number of rows it calls finish_batch() to mutate the current row batch. Default is FLUSH_COUNT (1000 rows).
max_row_bytes (int) – (Optional) Max number of row mutations size to flush. If it reaches the max number of row mutations size it calls finish_batch() to mutate the current row batch. Default is MAX_ROW_BYTES (5 MB).
flush_interval (float) – (Optional) The interval (in seconds) between asynchronous flush. Default is 1 second.
batch_completed_callback (Callable[list:[~google.rpc.status_pb2.Status]] = None) – (Optional) A callable for handling responses after the current batch is sent. The callable function expect a list of grpc Status.
- __exit__(exc_type, exc_value, exc_traceback)[source]¶
Clean up resources. Flush and shutdown the ThreadPoolExecutor.
- close()[source]¶
Clean up resources. Flush and shutdown the ThreadPoolExecutor. Any errors will be raised.
- Raises
batcherMutationsBatchError
if there’s any error in the mutations.
- flush()[source]¶
Sends the current batch to Cloud Bigtable synchronously. For example:
from google.cloud.bigtable import Client client = Client(admin=True) instance = client.instance(INSTANCE_ID) table = instance.table(TABLE_ID) # Batcher for max row bytes, max_row_bytes=1024 is optional. batcher = table.mutations_batcher(max_row_bytes=1024) # Add a single row row_key = b"row_key" row = table.row(row_key) row.set_cell(COLUMN_FAMILY_ID, COL_NAME1, "value-0") # In batcher, mutate will flush current batch if it # reaches the max_row_bytes batcher.mutate(row) batcher.flush()
- Raises
batcherMutationsBatchError
if there’s any error in the mutations.
- mutate(row)[source]¶
Add a row to the batch. If the current batch meets one of the size limits, the batch is sent asynchronously.
For example:
from google.cloud.bigtable import Client client = Client(admin=True) instance = client.instance(INSTANCE_ID) table = instance.table(TABLE_ID) # Batcher for max row bytes, max_row_bytes=1024 is optional. batcher = table.mutations_batcher(max_row_bytes=1024) # Add a single row row_key = b"row_key_1" row = table.row(row_key) row.set_cell( COLUMN_FAMILY_ID, COL_NAME1, "value-0", timestamp=datetime.datetime.utcnow() ) # In batcher, mutate will flush current batch if it # reaches the max_row_bytes batcher.mutate(row) batcher.flush()
- Parameters
row (class) –
DirectRow
.- Raises
One of the following: *
_BigtableRetryableError
if any row returned a transient error. *RuntimeError
if the number of responses doesn’t match the number of rows that were retried
- mutate_rows(rows)[source]¶
Add multiple rows to the batch. If the current batch meets one of the size limits, the batch is sent asynchronously.
For example:
from google.cloud.bigtable import Client client = Client(admin=True) instance = client.instance(INSTANCE_ID) table = instance.table(TABLE_ID) batcher = table.mutations_batcher() row1 = table.row(b"row_key_1") row2 = table.row(b"row_key_2") row3 = table.row(b"row_key_3") row4 = table.row(b"row_key_4") row1.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val1") row2.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val2") row3.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val3") row4.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val4") batcher.mutate_rows([row1, row2, row3, row4]) # batcher will flush current batch if it # reaches the max flush_count # Manually send the current batch to Cloud Bigtable batcher.flush()
- Parameters
rows (list:[~google.cloud.bigtable.row.DirectRow]) – list:[~google.cloud.bigtable.row.DirectRow].
- Raises
One of the following: *
_BigtableRetryableError
if any row returned a transient error. *RuntimeError
if the number of responses doesn’t match the number of rows that were retried