Adds synchronization to request operations which may buffer payloads from different threads.
Attributes
| headers_sent | [RW] |
Public Class methods
new(*)
[show source]
# File lib/httpx/plugins/stream_bidi.rb 253 def initialize(*) 254 super 255 @headers_sent = false 256 @closed = false 257 @mutex = Thread::Mutex.new 258 end
Public Instance methods
<<(chunk)
[show source]
# File lib/httpx/plugins/stream_bidi.rb 299 def <<(chunk) 300 @mutex.synchronize do 301 if @drainer 302 @body.clear if @body.respond_to?(:clear) 303 @drainer = nil 304 end 305 @body << chunk 306 307 transition(:body) 308 end 309 end
can_buffer?()
[show source]
# File lib/httpx/plugins/stream_bidi.rb 266 def can_buffer? 267 return super unless @options.stream 268 269 super && @state != :waiting_for_chunk 270 end
close()
[show source]
# File lib/httpx/plugins/stream_bidi.rb 311 def close 312 return super unless @options.stream 313 314 @mutex.synchronize do 315 return if @closed 316 317 @closed = true 318 end 319 320 # last chunk to send which ends the stream 321 self << "" 322 end
closed?()
[show source]
# File lib/httpx/plugins/stream_bidi.rb 260 def closed? 261 return super unless @options.stream 262 263 @closed 264 end
transition(nextstate)
overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.
[show source]
# File lib/httpx/plugins/stream_bidi.rb 275 def transition(nextstate) 276 return super unless @options.stream 277 278 headers_sent = @headers_sent 279 280 case nextstate 281 when :waiting_for_chunk 282 return unless @state == :body 283 when :body 284 case @state 285 when :headers 286 headers_sent = true 287 when :waiting_for_chunk 288 # HACK: to allow super to pass through 289 @state = :headers 290 end 291 end 292 293 super.tap do 294 # delay setting this up until after the first transition to :body 295 @headers_sent = headers_sent 296 end 297 end