class HTTPX::Connection

  1. lib/httpx/connection.rb
  2. lib/httpx/connection/http1.rb
  3. lib/httpx/connection/http2.rb
  4. lib/httpx/plugins/h2c.rb
  5. lib/httpx/plugins/upgrade/h2.rb
  6. show all
Superclass: Object

The Connection can be watched for IO events.

It contains the io object to read/write from, and knows what to do when it can.

It defers connecting until absolutely necessary. Connection should be triggered from the IO selector (until then, any request will be queued).

A connection boots up its parser after connection is established. All pending requests will be redirected there after connection.

A connection can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the connection was prematurely closed, due to a possible number of conditions:

  • Remote peer closed the connection (“Connection: close”);

  • Remote peer doesn’t support pipelining;

A connection may also route requests for a different host for which the io was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts.

Included modules

  1. Loggable
  2. Callbacks

Attributes

current_selector [W]
current_session [RW]
family [RW]
io [R]
options [R]
origin [R]
origins [R]
pending [R]
sibling [R]
ssl_session [R]
state [R]
type [R]

Public Class methods

new(uri, options)
[show source]
    # File lib/httpx/connection.rb
 52 def initialize(uri, options)
 53   @current_session = @current_selector = @sibling = @coalesced_connection = nil
 54   @exhausted = @cloned = @main_sibling = false
 55 
 56   @options = Options.new(options)
 57   @type = initialize_type(uri, @options)
 58   @origins = [uri.origin]
 59   @origin = Utils.to_uri(uri.origin)
 60   @window_size = @options.window_size
 61   @read_buffer = Buffer.new(@options.buffer_size)
 62   @write_buffer = Buffer.new(@options.buffer_size)
 63   @pending = []
 64 
 65   on(:error, &method(:on_error))
 66   if @options.io
 67     # if there's an already open IO, get its
 68     # peer address, and force-initiate the parser
 69     transition(:already_open)
 70     @io = build_socket
 71     parser
 72   else
 73     transition(:idle)
 74   end
 75   on(:close) do
 76     next if @exhausted # it'll reset
 77 
 78     # may be called after ":close" above, so after the connection has been checked back in.
 79     # next unless @current_session
 80 
 81     next unless @current_session
 82 
 83     @current_session.deselect_connection(self, @current_selector, @cloned)
 84   end
 85   on(:terminate) do
 86     next if @exhausted # it'll reset
 87 
 88     current_session = @current_session
 89     current_selector = @current_selector
 90 
 91     # may be called after ":close" above, so after the connection has been checked back in.
 92     next unless current_session && current_selector
 93 
 94     current_session.deselect_connection(self, current_selector)
 95   end
 96 
 97   on(:altsvc) do |alt_origin, origin, alt_params|
 98     build_altsvc_connection(alt_origin, origin, alt_params)
 99   end
100 
101   @inflight = 0
102   @keep_alive_timeout = @options.timeout[:keep_alive_timeout]
103 
104   self.addresses = @options.addresses if @options.addresses
105 end

Public Instance methods

addresses()
[show source]
    # File lib/httpx/connection.rb
121 def addresses
122   @io && @io.addresses
123 end
addresses=(addrs)

this is a semi-private method, to be used by the resolver to initiate the io object.

[show source]
    # File lib/httpx/connection.rb
113 def addresses=(addrs)
114   if @io
115     @io.add_addresses(addrs)
116   else
117     @io = build_socket(addrs)
118   end
119 end
call()
[show source]
    # File lib/httpx/connection.rb
247 def call
248   case @state
249   when :idle
250     connect
251     consume
252   when :closed
253     return
254   when :closing
255     consume
256     transition(:closed)
257   when :open
258     consume
259   end
260   nil
261 rescue StandardError => e
262   @write_buffer.clear
263   emit(:error, e)
264   raise e
265 end
close()
[show source]
    # File lib/httpx/connection.rb
267 def close
268   transition(:active) if @state == :inactive
269 
270   @parser.close if @parser
271 end
coalescable?(connection)

coalescable connections need to be mergeable! but internally, mergeable? is called before coalescable?

[show source]
    # File lib/httpx/connection.rb
165 def coalescable?(connection)
166   if @io.protocol == "h2" &&
167      @origin.scheme == "https" &&
168      connection.origin.scheme == "https" &&
169      @io.can_verify_peer?
170     @io.verify_hostname(connection.origin.host)
171   else
172     @origin == connection.origin
173   end
174 end
coalesce!(connection)

coalesces self into connection.

[show source]
    # File lib/httpx/connection.rb
156 def coalesce!(connection)
157   @coalesced_connection = connection
158 
159   close_sibling
160   connection.merge(self)
161 end
connecting?()
[show source]
    # File lib/httpx/connection.rb
211 def connecting?
212   @state == :idle
213 end
create_idle(options = {})
[show source]
    # File lib/httpx/connection.rb
176 def create_idle(options = {})
177   self.class.new(@origin, @options.merge(options))
178 end
deactivate()
[show source]
    # File lib/httpx/connection.rb
345 def deactivate
346   transition(:inactive)
347 end
disconnect()
[show source]
    # File lib/httpx/connection.rb
381 def disconnect
382   return unless @current_session && @current_selector
383 
384   emit(:close)
385   @current_session = nil
386   @current_selector = nil
387 end
expired?()
[show source]
    # File lib/httpx/connection.rb
138 def expired?
139   return false unless @io
140 
141   @io.expired?
142 end
force_reset(cloned = false)

bypasses the state machine to force closing of connections still connecting. only used for Happy Eyeballs v2.

[show source]
    # File lib/httpx/connection.rb
287 def force_reset(cloned = false)
288   @state = :closing
289   @cloned = cloned
290   transition(:closed)
291 end
handle_connect_error(error)
[show source]
    # File lib/httpx/connection.rb
371 def handle_connect_error(error)
372   @connect_error = error
373 
374   return handle_error(error) unless @sibling && @sibling.connecting?
375 
376   @sibling.merge(self)
377 
378   force_reset(true)
379 end
handle_socket_timeout(interval)
[show source]
    # File lib/httpx/connection.rb
353 def handle_socket_timeout(interval)
354   error = OperationTimeoutError.new(interval, "timed out while waiting on select")
355   error.set_backtrace(caller)
356   on_error(error)
357 end
idling()
[show source]
    # File lib/httpx/connection.rb
334 def idling
335   purge_after_closed
336   @write_buffer.clear
337   transition(:idle)
338   @parser = nil if @parser
339 end
inflight?()
[show source]
    # File lib/httpx/connection.rb
215 def inflight?
216   @parser && (
217     # parser may be dealing with other requests (possibly started from a different fiber)
218     !@parser.empty? ||
219     # connection may be doing connection termination handshake
220     !@write_buffer.empty?
221   )
222 end
inspect()

:nocov:

[show source]
    # File lib/httpx/connection.rb
390 def inspect
391   "#<#{self.class}:#{object_id} " \
392     "@origin=#{@origin} " \
393     "@state=#{@state} " \
394     "@pending=#{@pending.size} " \
395     "@io=#{@io}>"
396 end
interests()
[show source]
    # File lib/httpx/connection.rb
224 def interests
225   # connecting
226   if connecting?
227     connect
228 
229     return @io.interests if connecting?
230   end
231 
232   # if the write buffer is full, we drain it
233   return :w unless @write_buffer.empty?
234 
235   return @parser.interests if @parser
236 
237   nil
238 rescue StandardError => e
239   emit(:error, e)
240   nil
241 end
io_connected?()
[show source]
    # File lib/httpx/connection.rb
205 def io_connected?
206   return @coalesced_connection.io_connected? if @coalesced_connection
207 
208   @io && @io.state == :connected
209 end
match?(uri, options)
[show source]
    # File lib/httpx/connection.rb
125 def match?(uri, options)
126   return false if !used? && (@state == :closing || @state == :closed)
127 
128   (
129     @origins.include?(uri.origin) &&
130     # if there is more than one origin to match, it means that this connection
131     # was the result of coalescing. To prevent blind trust in the case where the
132     # origin came from an ORIGIN frame, we're going to verify the hostname with the
133     # SSL certificate
134     (@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host)))
135   ) && @options == options
136 end
merge(connection)
[show source]
    # File lib/httpx/connection.rb
180 def merge(connection)
181   @origins |= connection.instance_variable_get(:@origins)
182   if connection.ssl_session
183     @ssl_session = connection.ssl_session
184     @io.session_new_cb do |sess|
185       @ssl_session = sess
186     end if @io
187   end
188   connection.purge_pending do |req|
189     send(req)
190   end
191 end
mergeable?(connection)
[show source]
    # File lib/httpx/connection.rb
144 def mergeable?(connection)
145   return false if @state == :closing || @state == :closed || !@io
146 
147   return false unless connection.addresses
148 
149   (
150     (open? && @origin == connection.origin) ||
151     !(@io.addresses & (connection.addresses || [])).empty?
152   ) && @options == connection.options
153 end
open?()
[show source]
    # File lib/httpx/connection.rb
349 def open?
350   @state == :open || @state == :inactive
351 end
peer()
[show source]
    # File lib/httpx/connection.rb
107 def peer
108   @origin
109 end
purge_pending(&block)
[show source]
    # File lib/httpx/connection.rb
193 def purge_pending(&block)
194   pendings = []
195   if @parser
196     @inflight -= @parser.pending.size
197     pendings << @parser.pending
198   end
199   pendings << @pending
200   pendings.each do |pending|
201     pending.reject!(&block)
202   end
203 end
reset()
[show source]
    # File lib/httpx/connection.rb
293 def reset
294   return if @state == :closing || @state == :closed
295 
296   transition(:closing)
297 
298   transition(:closed)
299 end
send(request)
[show source]
    # File lib/httpx/connection.rb
301 def send(request)
302   return @coalesced_connection.send(request) if @coalesced_connection
303 
304   if @parser && !@write_buffer.full?
305     if @response_received_at && @keep_alive_timeout &&
306        Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
307       # when pushing a request into an existing connection, we have to check whether there
308       # is the possibility that the connection might have extended the keep alive timeout.
309       # for such cases, we want to ping for availability before deciding to shovel requests.
310       log(level: 3) { "keep alive timeout expired, pinging connection..." }
311       @pending << request
312       transition(:active) if @state == :inactive
313       parser.ping
314       request.ping!
315       return
316     end
317 
318     send_request_to_parser(request)
319   else
320     @pending << request
321   end
322 end
sibling=(connection)
[show source]
    # File lib/httpx/connection.rb
359 def sibling=(connection)
360   @sibling = connection
361 
362   return unless connection
363 
364   @main_sibling = connection.sibling.nil?
365 
366   return unless @main_sibling
367 
368   connection.sibling = self
369 end
terminate()
[show source]
    # File lib/httpx/connection.rb
273 def terminate
274   case @state
275   when :idle
276     purge_after_closed
277     emit(:terminate)
278   when :closed
279     @connected_at = nil
280   end
281 
282   close
283 end
timeout()
[show source]
    # File lib/httpx/connection.rb
324 def timeout
325   return if @state == :closed || @state == :inactive
326 
327   return @timeout if @timeout
328 
329   return @options.timeout[:connect_timeout] if @state == :idle
330 
331   @options.timeout[:operation_timeout]
332 end
to_io()
[show source]
    # File lib/httpx/connection.rb
243 def to_io
244   @io.to_io
245 end
used?()
[show source]
    # File lib/httpx/connection.rb
341 def used?
342   @connected_at
343 end