class HTTPX::Connection

  1. lib/httpx/connection.rb
  2. lib/httpx/connection/http1.rb
  3. lib/httpx/connection/http2.rb
  4. lib/httpx/plugins/h2c.rb
  5. lib/httpx/plugins/upgrade/h2.rb
  6. show all
Superclass: Object

The Connection can be watched for IO events.

It contains the io object to read/write from, and knows what to do when it can.

It defers connecting until absolutely necessary. Connection should be triggered from the IO selector (until then, any request will be queued).

A connection boots up its parser after connection is established. All pending requests will be redirected there after connection.

A connection can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the connection was prematurely closed, due to a possible number of conditions:

  • Remote peer closed the connection (“Connection: close”);

  • Remote peer doesn’t support pipelining;

A connection may also route requests for a different host for which the io was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts.

Included modules

  1. Loggable
  2. Callbacks

Attributes

current_selector [W]
current_session [RW]
family [RW]
io [R]
options [R]
origin [R]
origins [R]
pending [R]
sibling [R]
ssl_session [R]
state [R]
type [R]

Public Class methods

new(uri, options)
[show source]
    # File lib/httpx/connection.rb
 52 def initialize(uri, options)
 53   @current_session = @current_selector = @sibling = @coalesced_connection = nil
 54   @exhausted = @cloned = @main_sibling = false
 55 
 56   @options = Options.new(options)
 57   @type = initialize_type(uri, @options)
 58   @origins = [uri.origin]
 59   @origin = Utils.to_uri(uri.origin)
 60   @window_size = @options.window_size
 61   @read_buffer = Buffer.new(@options.buffer_size)
 62   @write_buffer = Buffer.new(@options.buffer_size)
 63   @pending = []
 64 
 65   on(:error, &method(:on_error))
 66   if @options.io
 67     # if there's an already open IO, get its
 68     # peer address, and force-initiate the parser
 69     transition(:already_open)
 70     @io = build_socket
 71     parser
 72   else
 73     transition(:idle)
 74   end
 75   on(:close) do
 76     next if @exhausted # it'll reset
 77 
 78     # may be called after ":close" above, so after the connection has been checked back in.
 79     # next unless @current_session
 80 
 81     next unless @current_session
 82 
 83     @current_session.deselect_connection(self, @current_selector, @cloned)
 84   end
 85   on(:terminate) do
 86     next if @exhausted # it'll reset
 87 
 88     current_session = @current_session
 89     current_selector = @current_selector
 90 
 91     # may be called after ":close" above, so after the connection has been checked back in.
 92     next unless current_session && current_selector
 93 
 94     current_session.deselect_connection(self, current_selector)
 95   end
 96 
 97   on(:altsvc) do |alt_origin, origin, alt_params|
 98     build_altsvc_connection(alt_origin, origin, alt_params)
 99   end
100 
101   @inflight = 0
102   @keep_alive_timeout = @options.timeout[:keep_alive_timeout]
103 
104   self.addresses = @options.addresses if @options.addresses
105 end
parser_type(protocol)
[show source]
    # File lib/httpx/connection.rb
923 def parser_type(protocol)
924   case protocol
925   when "h2" then HTTP2
926   when "http/1.1" then HTTP1
927   else
928     raise Error, "unsupported protocol (##{protocol})"
929   end
930 end

Public Instance methods

addresses()
[show source]
    # File lib/httpx/connection.rb
121 def addresses
122   @io && @io.addresses
123 end
addresses=(addrs)

this is a semi-private method, to be used by the resolver to initiate the io object.

[show source]
    # File lib/httpx/connection.rb
113 def addresses=(addrs)
114   if @io
115     @io.add_addresses(addrs)
116   else
117     @io = build_socket(addrs)
118   end
119 end
call()
[show source]
    # File lib/httpx/connection.rb
239 def call
240   case @state
241   when :idle
242     connect
243     consume
244   when :closed
245     return
246   when :closing
247     consume
248     transition(:closed)
249   when :open
250     consume
251   end
252   nil
253 rescue StandardError => e
254   emit(:error, e)
255   raise e
256 end
close()
[show source]
    # File lib/httpx/connection.rb
258 def close
259   transition(:active) if @state == :inactive
260 
261   @parser.close if @parser
262 end
coalescable?(connection)

coalescable connections need to be mergeable! but internally, mergeable? is called before coalescable?

[show source]
    # File lib/httpx/connection.rb
157 def coalescable?(connection)
158   if @io.protocol == "h2" &&
159      @origin.scheme == "https" &&
160      connection.origin.scheme == "https" &&
161      @io.can_verify_peer?
162     @io.verify_hostname(connection.origin.host)
163   else
164     @origin == connection.origin
165   end
166 end
coalesced_connection=(connection)
[show source]
    # File lib/httpx/connection.rb
343 def coalesced_connection=(connection)
344   @coalesced_connection = connection
345 
346   close_sibling
347   connection.merge(self)
348 end
connecting?()
[show source]
    # File lib/httpx/connection.rb
203 def connecting?
204   @state == :idle
205 end
create_idle(options = {})
[show source]
    # File lib/httpx/connection.rb
168 def create_idle(options = {})
169   self.class.new(@origin, @options.merge(options))
170 end
deactivate()
[show source]
    # File lib/httpx/connection.rb
329 def deactivate
330   transition(:inactive)
331 end
disconnect()
[show source]
    # File lib/httpx/connection.rb
372 def disconnect
373   return unless @current_session && @current_selector
374 
375   emit(:close)
376   @current_session = nil
377   @current_selector = nil
378 end
expired?()
[show source]
    # File lib/httpx/connection.rb
138 def expired?
139   return false unless @io
140 
141   @io.expired?
142 end
force_reset(cloned = false)

bypasses the state machine to force closing of connections still connecting. only used for Happy Eyeballs v2.

[show source]
    # File lib/httpx/connection.rb
272 def force_reset(cloned = false)
273   @state = :closing
274   @cloned = cloned
275   transition(:closed)
276 end
handle_connect_error(error)
[show source]
    # File lib/httpx/connection.rb
362 def handle_connect_error(error)
363   @connect_error = error
364 
365   return handle_error(error) unless @sibling && @sibling.connecting?
366 
367   @sibling.merge(self)
368 
369   force_reset(true)
370 end
handle_socket_timeout(interval)
[show source]
    # File lib/httpx/connection.rb
337 def handle_socket_timeout(interval)
338   error = OperationTimeoutError.new(interval, "timed out while waiting on select")
339   error.set_backtrace(caller)
340   on_error(error)
341 end
idling()
[show source]
    # File lib/httpx/connection.rb
318 def idling
319   purge_after_closed
320   @write_buffer.clear
321   transition(:idle)
322   @parser = nil if @parser
323 end
inflight?()
[show source]
    # File lib/httpx/connection.rb
207 def inflight?
208   @parser && (
209     # parser may be dealing with other requests (possibly started from a different fiber)
210     !@parser.empty? ||
211     # connection may be doing connection termination handshake
212     !@write_buffer.empty?
213   )
214 end
interests()
[show source]
    # File lib/httpx/connection.rb
216 def interests
217   # connecting
218   if connecting?
219     connect
220 
221     return @io.interests if connecting?
222   end
223 
224   # if the write buffer is full, we drain it
225   return :w unless @write_buffer.empty?
226 
227   return @parser.interests if @parser
228 
229   nil
230 rescue StandardError => e
231   emit(:error, e)
232   nil
233 end
io_connected?()
[show source]
    # File lib/httpx/connection.rb
197 def io_connected?
198   return @coalesced_connection.io_connected? if @coalesced_connection
199 
200   @io && @io.state == :connected
201 end
match?(uri, options)
[show source]
    # File lib/httpx/connection.rb
125 def match?(uri, options)
126   return false if !used? && (@state == :closing || @state == :closed)
127 
128   (
129     @origins.include?(uri.origin) &&
130     # if there is more than one origin to match, it means that this connection
131     # was the result of coalescing. To prevent blind trust in the case where the
132     # origin came from an ORIGIN frame, we're going to verify the hostname with the
133     # SSL certificate
134     (@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host)))
135   ) && @options == options
136 end
merge(connection)
[show source]
    # File lib/httpx/connection.rb
172 def merge(connection)
173   @origins |= connection.instance_variable_get(:@origins)
174   if connection.ssl_session
175     @ssl_session = connection.ssl_session
176     @io.session_new_cb do |sess|
177       @ssl_session = sess
178     end if @io
179   end
180   connection.purge_pending do |req|
181     send(req)
182   end
183 end
mergeable?(connection)
[show source]
    # File lib/httpx/connection.rb
144 def mergeable?(connection)
145   return false if @state == :closing || @state == :closed || !@io
146 
147   return false unless connection.addresses
148 
149   (
150     (open? && @origin == connection.origin) ||
151     !(@io.addresses & (connection.addresses || [])).empty?
152   ) && @options == connection.options
153 end
open?()
[show source]
    # File lib/httpx/connection.rb
333 def open?
334   @state == :open || @state == :inactive
335 end
peer()
[show source]
    # File lib/httpx/connection.rb
107 def peer
108   @origin
109 end
purge_pending(&block)
[show source]
    # File lib/httpx/connection.rb
185 def purge_pending(&block)
186   pendings = []
187   if @parser
188     @inflight -= @parser.pending.size
189     pendings << @parser.pending
190   end
191   pendings << @pending
192   pendings.each do |pending|
193     pending.reject!(&block)
194   end
195 end
reset()
[show source]
    # File lib/httpx/connection.rb
278 def reset
279   return if @state == :closing || @state == :closed
280 
281   transition(:closing)
282 
283   transition(:closed)
284 end
send(request)
[show source]
    # File lib/httpx/connection.rb
286 def send(request)
287   return @coalesced_connection.send(request) if @coalesced_connection
288 
289   if @parser && !@write_buffer.full?
290     if @response_received_at && @keep_alive_timeout &&
291        Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
292       # when pushing a request into an existing connection, we have to check whether there
293       # is the possibility that the connection might have extended the keep alive timeout.
294       # for such cases, we want to ping for availability before deciding to shovel requests.
295       log(level: 3) { "keep alive timeout expired, pinging connection..." }
296       @pending << request
297       transition(:active) if @state == :inactive
298       parser.ping
299       return
300     end
301 
302     send_request_to_parser(request)
303   else
304     @pending << request
305   end
306 end
sibling=(connection)
[show source]
    # File lib/httpx/connection.rb
350 def sibling=(connection)
351   @sibling = connection
352 
353   return unless connection
354 
355   @main_sibling = connection.sibling.nil?
356 
357   return unless @main_sibling
358 
359   connection.sibling = self
360 end
terminate()
[show source]
    # File lib/httpx/connection.rb
264 def terminate
265   @connected_at = nil if @state == :closed
266 
267   close
268 end
timeout()
[show source]
    # File lib/httpx/connection.rb
308 def timeout
309   return if @state == :closed || @state == :inactive
310 
311   return @timeout if @timeout
312 
313   return @options.timeout[:connect_timeout] if @state == :idle
314 
315   @options.timeout[:operation_timeout]
316 end
to_io()
[show source]
    # File lib/httpx/connection.rb
235 def to_io
236   @io.to_io
237 end
used?()
[show source]
    # File lib/httpx/connection.rb
325 def used?
326   @connected_at
327 end