Class: Google::Cloud::Bigquery::Table::AsyncInserter
- Inherits:
-
Object
- Object
- Google::Cloud::Bigquery::Table::AsyncInserter
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/bigquery/table/async_inserter.rb
Overview
AsyncInserter
Used to insert multiple rows in batches to a topic. See #insert_async.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#interval ⇒ Numeric
readonly
The number of seconds to collect rows before the batch is inserted.
-
#max_bytes ⇒ Integer
readonly
The maximum size of rows to be collected before the batch is inserted.
-
#max_rows ⇒ Integer
readonly
The maximum number of rows to be collected before the batch is inserted.
-
#threads ⇒ Integer
readonly
The number of threads used to insert rows.
Instance Method Summary collapse
-
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
-
#insert(rows, insert_ids: nil) ⇒ Object
Adds rows to the async inserter to be inserted.
-
#started? ⇒ boolean
Whether the inserter has been started.
-
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter.
-
#stopped? ⇒ boolean
Whether the inserter has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncInserter
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed.
Instance Attribute Details
#interval ⇒ Numeric (readonly)
The number of seconds to collect rows before the batch is inserted. Default is 10.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def max_bytes @max_bytes end |
#max_rows ⇒ Integer (readonly)
The maximum number of rows to be collected before the batch is inserted. Default is 500.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def max_rows @max_rows end |
#threads ⇒ Integer (readonly)
The number of threads used to insert rows. Default is 4.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def threads @threads end |
Instance Method Details
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
197 198 199 200 201 202 203 204 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 197 def flush synchronize do push_batch_request! @cond.signal end self end |
#insert(rows, insert_ids: nil) ⇒ Object
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.
Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.
The value :skip
can be provided to skip the generation of IDs for all rows, or to skip the generation of
an ID for a specific row in the array.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 124 def insert rows, insert_ids: nil return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows, insert_ids = validate_insert_args rows, insert_ids synchronize do rows.zip(Array(insert_ids)).each do |row, insert_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id else unless @batch.try_insert row, insert_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end |
#started? ⇒ boolean
Whether the inserter has been started.
211 212 213 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 211 def started? !stopped? end |
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.
163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 163 def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end |
#stopped? ⇒ boolean
Whether the inserter has been stopped.
220 221 222 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 220 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncInserter
183 184 185 186 187 188 189 190 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 183 def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end |