class HTTPX::Connection

  1. lib/httpx/connection.rb
  2. lib/httpx/connection/http1.rb
  3. lib/httpx/connection/http2.rb
  4. lib/httpx/plugins/h2c.rb
  5. show all
Superclass: Object

The Connection can be watched for IO events.

It contains the io object to read/write from, and knows what to do when it can.

It defers connecting until absolutely necessary. Connection should be triggered from the IO selector (until then, any request will be queued).

A connection boots up its parser after connection is established. All pending requests will be redirected there after connection.

A connection can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the connection was prematurely closed, due to a possible number of conditions:

  • Remote peer closed the connection (“Connection: close”);

  • Remote peer doesn’t support pipelining;

A connection may also route requests for a different host for which the io was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts.

Included modules

  1. Loggable
  2. Callbacks

Attributes

current_selector [W]
current_session [RW]
family [RW]
io [R]
options [R]
origin [R]
origins [R]
pending [R]
sibling [R]
ssl_session [R]
state [R]
type [R]

Public Class methods

new(uri, options)
[show source]
   # File lib/httpx/connection.rb
47 def initialize(uri, options)
48   @current_session = @current_selector = @max_concurrent_requests =
49                        @parser = @sibling = @coalesced_connection = @altsvc_connection =
50                                               @family = @io = @ssl_session = @timeout =
51                                                                 @connected_at = @response_received_at = nil
52 
53   @exhausted = @cloned = @main_sibling = false
54 
55   @options = Options.new(options)
56   @type = initialize_type(uri, @options)
57   @origins = [uri.origin]
58   @origin = Utils.to_uri(uri.origin)
59   @window_size = @options.window_size
60   @read_buffer = Buffer.new(@options.buffer_size)
61   @write_buffer = Buffer.new(@options.buffer_size)
62   @pending = []
63   @inflight = 0
64   @keep_alive_timeout = @options.timeout[:keep_alive_timeout]
65   @no_more_requests_counter = 0
66 
67   if @options.io
68     # if there's an already open IO, get its
69     # peer address, and force-initiate the parser
70     transition(:already_open)
71     @io = build_socket
72     parser
73   else
74     transition(:idle)
75   end
76   self.addresses = @options.addresses if @options.addresses
77 end

Public Instance methods

addresses()
[show source]
   # File lib/httpx/connection.rb
93 def addresses
94   @io && @io.addresses
95 end
addresses=(addrs)

this is a semi-private method, to be used by the resolver to initiate the io object.

[show source]
   # File lib/httpx/connection.rb
85 def addresses=(addrs)
86   if @io
87     @io.add_addresses(addrs)
88   else
89     @io = build_socket(addrs)
90   end
91 end
addresses?()
[show source]
   # File lib/httpx/connection.rb
97 def addresses?
98   @io && @io.addresses?
99 end
call()
[show source]
    # File lib/httpx/connection.rb
215 def call
216   case @state
217   when :idle
218     connect
219 
220     # when opening the tcp or ssl socket fails
221     return if @state == :closed
222 
223     consume
224   when :closed
225     return
226   when :closing
227     consume
228     transition(:closed)
229   when :open
230     consume
231   end
232   nil
233 rescue IOError => e
234   @write_buffer.clear
235   on_io_error(e)
236 rescue StandardError => e
237   @write_buffer.clear
238   on_error(e)
239 rescue Exception => e # rubocop:disable Lint/RescueException
240   force_close(true)
241   raise e
242 end
close()
[show source]
    # File lib/httpx/connection.rb
244 def close
245   transition(:active) if @state == :inactive
246 
247   @parser.close if @parser
248 end
coalescable?(connection)

coalescable connections need to be mergeable! but internally, mergeable? is called before coalescable?

[show source]
    # File lib/httpx/connection.rb
138 def coalescable?(connection)
139   if @io.protocol == "h2" &&
140      @origin.scheme == "https" &&
141      connection.origin.scheme == "https" &&
142      @io.can_verify_peer?
143     @io.verify_hostname(connection.origin.host)
144   else
145     @origin == connection.origin
146   end
147 end
coalesce!(connection)

coalesces self into connection.

[show source]
    # File lib/httpx/connection.rb
125 def coalesce!(connection)
126   @coalesced_connection = connection
127 
128   close_sibling
129   connection.merge(self)
130 end
coalesced?()
[show source]
    # File lib/httpx/connection.rb
132 def coalesced?
133   @coalesced_connection
134 end
connecting?()
[show source]
    # File lib/httpx/connection.rb
182 def connecting?
183   @state == :idle
184 end
deactivate()
[show source]
    # File lib/httpx/connection.rb
356 def deactivate
357   transition(:inactive)
358 end
disconnect()

disconnects from the current session it’s attached to

[show source]
    # File lib/httpx/connection.rb
391 def disconnect
392   return if @exhausted # it'll reset
393 
394   return unless (current_session = @current_session) && (current_selector = @current_selector)
395 
396   @current_session = @current_selector = nil
397 
398   current_session.deselect_connection(self, current_selector, @cloned)
399 end
force_close(delete_pending = false)

bypasses state machine rules while setting the connection in the :closed state.

[show source]
    # File lib/httpx/connection.rb
264 def force_close(delete_pending = false)
265   force_purge
266   return unless @state == :closed
267 
268   if delete_pending
269     @pending.clear
270   elsif (parser = @parser)
271     enqueue_pending_requests_from_parser(parser)
272   end
273 
274   return unless @pending.empty?
275 
276   disconnect
277   emit(:force_closed, delete_pending)
278 end
force_reset(cloned = false)

bypasses the state machine to force closing of connections still connecting. only used for Happy Eyeballs v2.

[show source]
    # File lib/httpx/connection.rb
282 def force_reset(cloned = false)
283   @state = :closing
284   @cloned = cloned
285   transition(:closed)
286 end
handle_connect_error(error)
[show source]
    # File lib/httpx/connection.rb
382 def handle_connect_error(error)
383   return on_error(error) unless @sibling && @sibling.connecting?
384 
385   @sibling.merge(self)
386 
387   force_reset(true)
388 end
handle_socket_timeout(interval)
[show source]
    # File lib/httpx/connection.rb
364 def handle_socket_timeout(interval)
365   error = OperationTimeoutError.new(interval, "timed out while waiting on select")
366   error.set_backtrace(caller)
367   on_error(error)
368 end
idling()
[show source]
    # File lib/httpx/connection.rb
342 def idling
343   purge_after_closed
344   @write_buffer.clear
345   transition(:idle)
346   return unless @parser
347 
348   enqueue_pending_requests_from_parser(parser)
349   @parser = nil
350 end
inflight?()
[show source]
    # File lib/httpx/connection.rb
186 def inflight?
187   @parser && (
188     # parser may be dealing with other requests (possibly started from a different fiber)
189     !@parser.empty? ||
190     # connection may be doing connection termination handshake
191     !@write_buffer.empty?
192   )
193 end
inspect()

:nocov:

[show source]
    # File lib/httpx/connection.rb
433 def inspect
434   "#<#{self.class}:#{object_id} " \
435     "@origin=#{@origin} " \
436     "@state=#{@state} " \
437     "@pending=#{@pending.size} " \
438     "@io=#{@io}>"
439 end
interests()
[show source]
    # File lib/httpx/connection.rb
195 def interests
196   # connecting
197   if connecting?
198     connect
199 
200     return @io.interests if connecting?
201   end
202 
203   return @parser.interests if @parser
204 
205   nil
206 rescue StandardError => e
207   on_error(e)
208   nil
209 end
io_connected?()
[show source]
    # File lib/httpx/connection.rb
176 def io_connected?
177   return @coalesced_connection.io_connected? if @coalesced_connection
178 
179   @io && @io.state == :connected
180 end
match?(uri, options)
[show source]
    # File lib/httpx/connection.rb
101 def match?(uri, options)
102   return false if !used? && (@state == :closing || @state == :closed)
103 
104   @origins.include?(uri.origin) &&
105     # if there is more than one origin to match, it means that this connection
106     # was the result of coalescing. To prevent blind trust in the case where the
107     # origin came from an ORIGIN frame, we're going to verify the hostname with the
108     # SSL certificate
109     (@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host))) &&
110     @options.connection_options_match?(options)
111 end
merge(connection)
[show source]
    # File lib/httpx/connection.rb
149 def merge(connection)
150   @origins |= connection.instance_variable_get(:@origins)
151   if @ssl_session.nil? && connection.ssl_session
152     @ssl_session = connection.ssl_session
153     @io.session_new_cb do |sess|
154       @ssl_session = sess
155     end if @io
156   end
157   connection.purge_pending do |req|
158     req.transition(:idle)
159     send(req)
160   end
161 end
mergeable?(connection)
[show source]
    # File lib/httpx/connection.rb
113 def mergeable?(connection)
114   return false if @state == :closing || @state == :closed || !@io
115 
116   return false unless connection.addresses
117 
118   (
119     (open? && @origin == connection.origin) ||
120     !(@io.addresses & (connection.addresses || [])).empty?
121   ) && @options.connection_options_match?(connection.options)
122 end
on_connect_error(e)
[show source]
    # File lib/httpx/connection.rb
401 def on_connect_error(e)
402   # connect errors, exit gracefully
403   error = ConnectionError.new(e.message)
404   error.set_backtrace(e.backtrace)
405   handle_connect_error(error) if connecting?
406   force_close
407 end
on_error(error, request = nil)
[show source]
    # File lib/httpx/connection.rb
414 def on_error(error, request = nil)
415   if error.is_a?(OperationTimeoutError)
416 
417     # inactive connections do not contribute to the select loop, therefore
418     # they should not fail due to such errors.
419     return if @state == :inactive
420 
421     if @timeout
422       @timeout -= error.timeout
423       return unless @timeout <= 0
424     end
425 
426     error = error.to_connection_error if connecting?
427   end
428   handle_error(error, request)
429   reset
430 end
on_io_error(e)
[show source]
    # File lib/httpx/connection.rb
409 def on_io_error(e)
410   on_error(e)
411   force_close(true)
412 end
open?()
[show source]
    # File lib/httpx/connection.rb
360 def open?
361   @state == :open || @state == :inactive
362 end
peer()
[show source]
   # File lib/httpx/connection.rb
79 def peer
80   @origin
81 end
purge_pending(&block)
[show source]
    # File lib/httpx/connection.rb
163 def purge_pending(&block)
164   pendings = []
165   if @parser
166     pending = @parser.pending
167     @inflight -= pending.size
168     pendings << pending
169   end
170   pendings << @pending
171   pendings.each do |pending|
172     pending.reject!(&block)
173   end
174 end
reset()
[show source]
    # File lib/httpx/connection.rb
288 def reset
289   return if @state == :closing || @state == :closed
290 
291   # do not reset a connection which may have restarted back to :idle, such when the parser resets
292   # (example: HTTP/1 parser disabling pipelining)
293   return if @state == :idle && @pending.any?
294 
295   parser = @parser
296 
297   if parser && parser.respond_to?(:max_concurrent_requests)
298     # if connection being reset has at some downgraded the number of concurrent
299     # requests, such as in the case where an attempt to use HTTP/1 pipelining failed,
300     # keep that information around.
301     @max_concurrent_requests = parser.max_concurrent_requests
302   end
303 
304   transition(:closing)
305 
306   transition(:closed)
307 end
send(request)
[show source]
    # File lib/httpx/connection.rb
309 def send(request)
310   return @coalesced_connection.send(request) if @coalesced_connection
311 
312   if @parser && !@write_buffer.full?
313     if @response_received_at && @keep_alive_timeout &&
314        Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
315       # when pushing a request into an existing connection, we have to check whether there
316       # is the possibility that the connection might have extended the keep alive timeout.
317       # for such cases, we want to ping for availability before deciding to shovel requests.
318       log(level: 3) { "keep alive timeout expired, pinging connection..." }
319       @pending << request
320       transition(:active) if @state == :inactive
321       request.ping!
322       ping
323       return
324     end
325 
326     send_request_to_parser(request)
327   else
328     @pending << request
329   end
330 end
sibling=(connection)
[show source]
    # File lib/httpx/connection.rb
370 def sibling=(connection)
371   @sibling = connection
372 
373   return unless connection
374 
375   @main_sibling = connection.sibling.nil?
376 
377   return unless @main_sibling
378 
379   connection.sibling = self
380 end
terminate()
[show source]
    # File lib/httpx/connection.rb
250 def terminate
251   case @state
252   when :idle
253     purge_after_closed
254     disconnect
255   when :closed
256     @connected_at = nil
257   end
258 
259   close
260 end
timeout()
[show source]
    # File lib/httpx/connection.rb
332 def timeout
333   return if @state == :closed || @state == :inactive
334 
335   return @timeout if @timeout
336 
337   return @options.timeout[:connect_timeout] if @state == :idle
338 
339   @options.timeout[:operation_timeout]
340 end
to_io()
[show source]
    # File lib/httpx/connection.rb
211 def to_io
212   @io.to_io
213 end
used?()
[show source]
    # File lib/httpx/connection.rb
352 def used?
353   @connected_at
354 end