module HTTPX::Plugins::StreamBidi::RequestMethods

  1. lib/httpx/plugins/stream_bidi.rb

Adds synchronization to request operations which may buffer payloads from different threads.

Attributes

Public Class methods

new(*)
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
259 def initialize(*)
260   super
261   @headers_sent = false
262   @closed = false
263   @flush_buffer_on_body_cb = nil
264   @mutex = Thread::Mutex.new
265 end

Public Instance methods

<<(chunk)
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
317 def <<(chunk)
318   @mutex.synchronize do
319     if @drainer
320       @body.clear if @body.respond_to?(:clear)
321       @drainer = nil
322     end
323     @body << chunk
324 
325     transition(:body)
326   end
327 end
can_buffer?()
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
277 def can_buffer?
278   return super unless @options.stream
279 
280   super && @state != :waiting_for_chunk
281 end
close()
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
329 def close
330   return super unless @options.stream
331 
332   @mutex.synchronize do
333     return if @closed
334 
335     @closed = true
336   end
337 
338   # last chunk to send which ends the stream
339   self << ""
340 end
closed?()
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
271 def closed?
272   return super unless @options.stream
273 
274   @closed
275 end
flush_buffer_on_body(&cb)
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
267 def flush_buffer_on_body(&cb)
268   @flush_buffer_on_body_cb = on(:body, &cb)
269 end
transition(nextstate)

overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.

[show source]
    # File lib/httpx/plugins/stream_bidi.rb
286 def transition(nextstate)
287   return super unless @options.stream
288 
289   headers_sent = @headers_sent
290 
291   case nextstate
292   when :idle
293     headers_sent = false
294 
295     if @flush_buffer_on_body_cb
296       callbacks(:body).delete(@flush_buffer_on_body_cb)
297       @flush_buffer_on_body_cb = nil
298     end
299   when :waiting_for_chunk
300     return unless @state == :body
301   when :body
302     case @state
303     when :headers
304       headers_sent = true
305     when :waiting_for_chunk
306       # HACK: to allow super to pass through
307       @state = :headers
308     end
309   end
310 
311   super.tap do
312     # delay setting this up until after the first transition to :body
313     @headers_sent = headers_sent
314   end
315 end