Class: Google::Cloud::Bigquery::Table::AsyncInserter
- Inherits:
-
Object
- Object
- Google::Cloud::Bigquery::Table::AsyncInserter
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/bigquery/table/async_inserter.rb
Overview
AsyncInserter
Used to insert multiple rows in batches to a topic. See #insert_async.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#interval ⇒ Numeric
readonly
The number of seconds to collect rows before the batch is inserted.
-
#max_bytes ⇒ Integer
readonly
The maximum size of rows to be collected before the batch is inserted.
-
#max_rows ⇒ Integer
readonly
The maximum number of rows to be collected before the batch is inserted.
-
#threads ⇒ Integer
readonly
The number of threads used to insert rows.
Instance Method Summary collapse
-
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
-
#insert(rows, insert_ids: nil) ⇒ Object
Adds rows to the async inserter to be inserted.
-
#started? ⇒ boolean
Whether the inserter has been started.
-
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter.
-
#stopped? ⇒ boolean
Whether the inserter has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncInserter
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed.
Instance Attribute Details
#interval ⇒ Numeric (readonly)
The number of seconds to collect rows before the batch is inserted. Default is 10.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def max_bytes @max_bytes end |
#max_rows ⇒ Integer (readonly)
The maximum number of rows to be collected before the batch is inserted. Default is 500.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def max_rows @max_rows end |
#threads ⇒ Integer (readonly)
The number of threads used to insert rows. Default is 4.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def threads @threads end |
Instance Method Details
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
220 221 222 223 224 225 226 227 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 220 def flush synchronize do push_batch_request! @cond.signal end self end |
#insert(rows, insert_ids: nil) ⇒ Object
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.
Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's more complex types:
BigQuery | Ruby | Notes |
---|---|---|
NUMERIC |
BigDecimal |
BigDecimal values will be rounded to scale 9. |
BIGNUMERIC |
String |
Pass as String to avoid rounding to scale 9. |
DATETIME |
DateTime |
DATETIME does not support time zone. |
DATE |
Date |
|
GEOGRAPHY |
String |
|
JSON |
String (Stringified JSON) |
String, as JSON does not have a schema to verify. |
TIMESTAMP |
Time |
|
TIME |
Google::Cloud::BigQuery::Time |
|
BYTES |
File , IO , StringIO , or similar |
|
ARRAY |
Array |
Nested arrays, nil values are not supported. |
STRUCT |
Hash |
Hash keys may be strings or symbols. |
Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.
The value :skip
can be provided to skip the generation of IDs for all rows, or to skip the generation of
an ID for a specific row in the array.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 147 def insert rows, insert_ids: nil return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows, insert_ids = validate_insert_args rows, insert_ids synchronize do rows.zip(Array(insert_ids)).each do |row, insert_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id else unless @batch.try_insert row, insert_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end |
#started? ⇒ boolean
Whether the inserter has been started.
234 235 236 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 234 def started? !stopped? end |
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.
186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 186 def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end |
#stopped? ⇒ boolean
Whether the inserter has been stopped.
243 244 245 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 243 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncInserter
206 207 208 209 210 211 212 213 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 206 def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end |