module HTTPX::Plugins::StreamBidi::RequestMethods

  1. lib/httpx/plugins/stream_bidi.rb

Adds synchronization to request operations which may buffer payloads from different threads.

Attributes

Public Class methods

new(*)
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
253 def initialize(*)
254   super
255   @headers_sent = false
256   @closed = false
257   @flush_buffer_on_body_cb = nil
258   @mutex = Thread::Mutex.new
259 end

Public Instance methods

<<(chunk)
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
311 def <<(chunk)
312   @mutex.synchronize do
313     if @drainer
314       @body.clear if @body.respond_to?(:clear)
315       @drainer = nil
316     end
317     @body << chunk
318 
319     transition(:body)
320   end
321 end
can_buffer?()
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
271 def can_buffer?
272   return super unless @options.stream
273 
274   super && @state != :waiting_for_chunk
275 end
close()
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
323 def close
324   return super unless @options.stream
325 
326   @mutex.synchronize do
327     return if @closed
328 
329     @closed = true
330   end
331 
332   # last chunk to send which ends the stream
333   self << ""
334 end
closed?()
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
265 def closed?
266   return super unless @options.stream
267 
268   @closed
269 end
flush_buffer_on_body(&cb)
[show source]
    # File lib/httpx/plugins/stream_bidi.rb
261 def flush_buffer_on_body(&cb)
262   @flush_buffer_on_body_cb = on(:body, &cb)
263 end
transition(nextstate)

overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.

[show source]
    # File lib/httpx/plugins/stream_bidi.rb
280 def transition(nextstate)
281   return super unless @options.stream
282 
283   headers_sent = @headers_sent
284 
285   case nextstate
286   when :idle
287     headers_sent = false
288 
289     if @flush_buffer_on_body_cb
290       callbacks(:body).delete(@flush_buffer_on_body_cb)
291       @flush_buffer_on_body_cb = nil
292     end
293   when :waiting_for_chunk
294     return unless @state == :body
295   when :body
296     case @state
297     when :headers
298       headers_sent = true
299     when :waiting_for_chunk
300       # HACK: to allow super to pass through
301       @state = :headers
302     end
303   end
304 
305   super.tap do
306     # delay setting this up until after the first transition to :body
307     @headers_sent = headers_sent
308   end
309 end