Class: Google::Cloud::Logging::AsyncWriter
- Inherits:
-
Object
- Object
- Google::Cloud::Logging::AsyncWriter
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/logging/async_writer.rb
Overview
AsyncWriter
AsyncWriter buffers, batches, and transmits log entries efficiently. Writing log entries is asynchronous and will not block.
Batches that cannot be delivered immediately are queued. When the queue is full new batch requests will raise errors that can be consumed using the #on_error callback. This provides back pressure in case the writer cannot keep up with requests.
This object is thread-safe; it may accept write requests from multiple threads simultaneously, and will serialize them when executing in the background thread.
Instance Method Summary collapse
-
#flush ⇒ AsyncWriter
Forces all entries in the current batch to be published immediately.
-
#last_error ⇒ Exception?
(also: #last_exception)
The most recent unhandled error to occur while transmitting log entries.
-
#logger(log_name, resource, labels = {}) ⇒ Google::Cloud::Logging::Logger
Creates a logger instance that is API-compatible with Ruby's standard library Logger.
-
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
-
#started? ⇒ boolean
Whether the writer has been started.
-
#stop ⇒ AsyncWriter
Begins the process of stopping the writer.
-
#stop!(timeout = nil, force: nil) ⇒ Symbol
(also: #async_stop!)
Stop this asynchronous writer and block until it has been stopped.
-
#stopped? ⇒ boolean
Whether the writer has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncWriter
Blocks until the writer is fully stopped, all pending entries have been published, and all callbacks have completed.
-
#write_entries(entries, log_name: nil, resource: nil, labels: nil) ⇒ Google::Cloud::Logging::AsyncWriter
Asynchronously write one or more log entries to the Stackdriver Logging service.
Instance Method Details
#flush ⇒ AsyncWriter
Forces all entries in the current batch to be published immediately.
280 281 282 283 284 285 286 287 |
# File 'lib/google/cloud/logging/async_writer.rb', line 280 def flush synchronize do publish_batch! @cond.broadcast end self end |
#last_error ⇒ Exception? Also known as: last_exception
The most recent unhandled error to occur while transmitting log entries.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume buffering, batching, and transmitting log entries.
371 372 373 |
# File 'lib/google/cloud/logging/async_writer.rb', line 371 def last_error synchronize { @last_error } end |
#logger(log_name, resource, labels = {}) ⇒ Google::Cloud::Logging::Logger
Creates a logger instance that is API-compatible with Ruby's standard library Logger.
The logger will use AsyncWriter to transmit log entries on a background thread.
195 196 197 |
# File 'lib/google/cloud/logging/async_writer.rb', line 195 def logger log_name, resource, labels = {} Logger.new self, log_name, resource, labels end |
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
If an unhandled error has occurred the writer will attempt to recover from the error and resume buffering, batching, and transmitting log entries
Multiple error handlers can be added.
338 339 340 341 342 |
# File 'lib/google/cloud/logging/async_writer.rb', line 338 def on_error &block synchronize do @error_callbacks << block end end |
#started? ⇒ boolean
Whether the writer has been started.
293 294 295 |
# File 'lib/google/cloud/logging/async_writer.rb', line 293 def started? !stopped? end |
#stop ⇒ AsyncWriter
Begins the process of stopping the writer. Entries already in the queue will be published, but no new entries can be added. Use #wait! to block until the writer is fully stopped and all pending entries have been published.
206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/google/cloud/logging/async_writer.rb', line 206 def stop synchronize do break if @stopped @stopped = true publish_batch! @cond.broadcast @thread_pool&.shutdown end self end |
#stop!(timeout = nil, force: nil) ⇒ Symbol Also known as: async_stop!
Stop this asynchronous writer and block until it has been stopped.
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/google/cloud/logging/async_writer.rb', line 258 def stop! timeout = nil, force: nil return :new unless @thread_pool return :stopped if stopped? stop wait! timeout if synchronize { @thread_pool.shutdown? } return :waited if timeout elsif force @thread_pool.kill return :forced end :timeout end |
#stopped? ⇒ boolean
Whether the writer has been stopped.
301 302 303 |
# File 'lib/google/cloud/logging/async_writer.rb', line 301 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncWriter
230 231 232 233 234 235 236 237 238 239 |
# File 'lib/google/cloud/logging/async_writer.rb', line 230 def wait! timeout = nil synchronize do if @thread_pool @thread_pool.shutdown @thread_pool.wait_for_termination timeout end end self end |
#write_entries(entries, log_name: nil, resource: nil, labels: nil) ⇒ Google::Cloud::Logging::AsyncWriter
Asynchronously write one or more log entries to the Stackdriver Logging service.
Unlike the main write_entries method, this method usually does not block. The actual write RPCs will happen in the background, and may be batched with related calls. However, if the queue is full, this method will block until enough space has cleared out.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/google/cloud/logging/async_writer.rb', line 132 def write_entries entries, log_name: nil, resource: nil, labels: nil synchronize do raise "AsyncWriter has been stopped" if @stopped Array(entries).each do |entry| # Update the entry to have all the data directly on it entry.log_name ||= log_name if entry.resource.nil? || entry.resource.empty? entry.resource = resource end entry.labels = labels if entry.labels.nil? || entry.labels.empty? # Add the entry to the batch @batch ||= Batch.new self next if @batch.try_add entry # If we can't add to the batch, publish and create a new batch publish_batch! @batch = Batch.new self @batch.add entry end @thread_pool ||= Concurrent::ThreadPoolExecutor.new \ max_threads: @threads, max_queue: @max_queue @thread ||= Thread.new { run_background } publish_batch! if @batch&.ready? @cond.broadcast end self end |