Adds synchronization to request operations which may buffer payloads from different threads.
Attributes
headers_sent | [RW] |
Public Class methods
new(*)
[show source]
# File lib/httpx/plugins/stream_bidi.rb 205 def initialize(*) 206 super 207 @headers_sent = false 208 @closed = false 209 @mutex = Thread::Mutex.new 210 end
Public Instance methods
<<(chunk)
[show source]
# File lib/httpx/plugins/stream_bidi.rb 245 def <<(chunk) 246 @mutex.synchronize do 247 if @drainer 248 @body.clear if @body.respond_to?(:clear) 249 @drainer = nil 250 end 251 @body << chunk 252 253 transition(:body) 254 end 255 end
can_buffer?()
[show source]
# File lib/httpx/plugins/stream_bidi.rb 216 def can_buffer? 217 super && @state != :waiting_for_chunk 218 end
close()
[show source]
# File lib/httpx/plugins/stream_bidi.rb 257 def close 258 @mutex.synchronize do 259 return if @closed 260 261 @closed = true 262 end 263 264 # last chunk to send which ends the stream 265 self << "" 266 end
transition(nextstate)
overrides state management transitions to introduce an intermediate :waiting_for_chunk
state, which the request transitions to once payload is buffered.
[show source]
# File lib/httpx/plugins/stream_bidi.rb 223 def transition(nextstate) 224 headers_sent = @headers_sent 225 226 case nextstate 227 when :waiting_for_chunk 228 return unless @state == :body 229 when :body 230 case @state 231 when :headers 232 headers_sent = true 233 when :waiting_for_chunk 234 # HACK: to allow super to pass through 235 @state = :headers 236 end 237 end 238 239 super.tap do 240 # delay setting this up until after the first transition to :body 241 @headers_sent = headers_sent 242 end 243 end