The Connection
can be watched for IO events.
It contains the io
object to read/write from, and knows what to do when it can.
It defers connecting until absolutely necessary. Connection
should be triggered from the IO selector (until then, any request will be queued).
A connection boots up its parser after connection is established. All pending requests will be redirected there after connection.
A connection can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the connection was prematurely closed, due to a possible number of conditions:
-
Remote peer closed the connection (“Connection: close”);
-
Remote peer doesn’t support pipelining;
A connection may also route requests for a different host for which the io
was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts.
Methods
Public Class
Public Instance
- addresses
- addresses=
- call
- close
- coalescable?
- coalesced_connection
- connecting?
- create_idle
- current_selector
- current_session
- deactivate
- expired?
- family
- force_reset
- handle_socket_timeout
- idling
- inflight?
- interests
- io
- match?
- merge
- mergeable?
- open?
- options
- origin
- origins
- peer
- pending
- purge_pending
- reset
- send
- ssl_session
- state
- terminate
- timeout
- to_io
- type
- used?
Classes and Modules
Attributes
coalesced_connection | [W] | |
current_selector | [W] | |
current_session | [RW] | |
family | [RW] | |
io | [R] | |
options | [R] | |
origin | [R] | |
origins | [R] | |
pending | [R] | |
ssl_session | [R] | |
state | [R] | |
type | [R] |
Public Class methods
# File lib/httpx/connection.rb 50 def initialize(uri, options) 51 @current_session = @current_selector = @coalesced_connection = nil 52 @exhausted = @cloned = false 53 54 @options = Options.new(options) 55 @type = initialize_type(uri, @options) 56 @origins = [uri.origin] 57 @origin = Utils.to_uri(uri.origin) 58 @window_size = @options.window_size 59 @read_buffer = Buffer.new(@options.buffer_size) 60 @write_buffer = Buffer.new(@options.buffer_size) 61 @pending = [] 62 63 on(:error, &method(:on_error)) 64 if @options.io 65 # if there's an already open IO, get its 66 # peer address, and force-initiate the parser 67 transition(:already_open) 68 @io = build_socket 69 parser 70 else 71 transition(:idle) 72 end 73 on(:activate) do 74 @current_session.select_connection(self, @current_selector) 75 end 76 on(:close) do 77 next if @exhausted # it'll reset 78 79 # may be called after ":close" above, so after the connection has been checked back in. 80 # next unless @current_session 81 82 next unless @current_session 83 84 @current_session.deselect_connection(self, @current_selector, @cloned) 85 end 86 on(:terminate) do 87 next if @exhausted # it'll reset 88 89 # may be called after ":close" above, so after the connection has been checked back in. 90 next unless @current_session 91 92 @current_session.deselect_connection(self, @current_selector) 93 end 94 95 on(:altsvc) do |alt_origin, origin, alt_params| 96 build_altsvc_connection(alt_origin, origin, alt_params) 97 end 98 99 @inflight = 0 100 @keep_alive_timeout = @options.timeout[:keep_alive_timeout] 101 102 @intervals = [] 103 104 self.addresses = @options.addresses if @options.addresses 105 end
# File lib/httpx/connection.rb 866 def parser_type(protocol) 867 case protocol 868 when "h2" then HTTP2 869 when "http/1.1" then HTTP1 870 else 871 raise Error, "unsupported protocol (##{protocol})" 872 end 873 end
Public Instance methods
# File lib/httpx/connection.rb 121 def addresses 122 @io && @io.addresses 123 end
this is a semi-private method, to be used by the resolver to initiate the io object.
# File lib/httpx/connection.rb 113 def addresses=(addrs) 114 if @io 115 @io.add_addresses(addrs) 116 else 117 @io = build_socket(addrs) 118 end 119 end
# File lib/httpx/connection.rb 233 def call 234 case @state 235 when :idle 236 connect 237 consume 238 when :closed 239 return 240 when :closing 241 consume 242 transition(:closed) 243 when :open 244 consume 245 end 246 nil 247 rescue StandardError => e 248 emit(:error, e) 249 raise e 250 end
# File lib/httpx/connection.rb 252 def close 253 transition(:active) if @state == :inactive 254 255 @parser.close if @parser 256 end
coalescable connections need to be mergeable! but internally, mergeable?
is called before coalescable?
# File lib/httpx/connection.rb 157 def coalescable?(connection) 158 if @io.protocol == "h2" && 159 @origin.scheme == "https" && 160 connection.origin.scheme == "https" && 161 @io.can_verify_peer? 162 @io.verify_hostname(connection.origin.host) 163 else 164 @origin == connection.origin 165 end 166 end
# File lib/httpx/connection.rb 197 def connecting? 198 @state == :idle 199 end
# File lib/httpx/connection.rb 168 def create_idle(options = {}) 169 self.class.new(@origin, @options.merge(options)) 170 end
# File lib/httpx/connection.rb 323 def deactivate 324 transition(:inactive) 325 end
# File lib/httpx/connection.rb 138 def expired? 139 return false unless @io 140 141 @io.expired? 142 end
bypasses the state machine to force closing of connections still connecting. only used for Happy Eyeballs v2.
# File lib/httpx/connection.rb 266 def force_reset(cloned = false) 267 @state = :closing 268 @cloned = cloned 269 transition(:closed) 270 end
# File lib/httpx/connection.rb 331 def handle_socket_timeout(interval) 332 @intervals.delete_if(&:elapsed?) 333 334 unless @intervals.empty? 335 # remove the intervals which will elapse 336 337 return 338 end 339 340 error = HTTPX::TimeoutError.new(interval, "timed out while waiting on select") 341 error.set_backtrace(caller) 342 on_error(error) 343 end
# File lib/httpx/connection.rb 312 def idling 313 purge_after_closed 314 @write_buffer.clear 315 transition(:idle) 316 @parser = nil if @parser 317 end
# File lib/httpx/connection.rb 201 def inflight? 202 @parser && ( 203 # parser may be dealing with other requests (possibly started from a different fiber) 204 !@parser.empty? || 205 # connection may be doing connection termination handshake 206 !@write_buffer.empty? 207 ) 208 end
# File lib/httpx/connection.rb 210 def interests 211 # connecting 212 if connecting? 213 connect 214 215 return @io.interests if connecting? 216 end 217 218 # if the write buffer is full, we drain it 219 return :w unless @write_buffer.empty? 220 221 return @parser.interests if @parser 222 223 nil 224 rescue StandardError => e 225 emit(:error, e) 226 nil 227 end
# File lib/httpx/connection.rb 125 def match?(uri, options) 126 return false if !used? && (@state == :closing || @state == :closed) 127 128 ( 129 @origins.include?(uri.origin) && 130 # if there is more than one origin to match, it means that this connection 131 # was the result of coalescing. To prevent blind trust in the case where the 132 # origin came from an ORIGIN frame, we're going to verify the hostname with the 133 # SSL certificate 134 (@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host))) 135 ) && @options == options 136 end
# File lib/httpx/connection.rb 172 def merge(connection) 173 @origins |= connection.instance_variable_get(:@origins) 174 if connection.ssl_session 175 @ssl_session = connection.ssl_session 176 @io.session_new_cb do |sess| 177 @ssl_session = sess 178 end if @io 179 end 180 connection.purge_pending do |req| 181 send(req) 182 end 183 end
# File lib/httpx/connection.rb 144 def mergeable?(connection) 145 return false if @state == :closing || @state == :closed || !@io 146 147 return false unless connection.addresses 148 149 ( 150 (open? && @origin == connection.origin) || 151 !(@io.addresses & (connection.addresses || [])).empty? 152 ) && @options == connection.options 153 end
# File lib/httpx/connection.rb 327 def open? 328 @state == :open || @state == :inactive 329 end
# File lib/httpx/connection.rb 185 def purge_pending(&block) 186 pendings = [] 187 if @parser 188 @inflight -= @parser.pending.size 189 pendings << @parser.pending 190 end 191 pendings << @pending 192 pendings.each do |pending| 193 pending.reject!(&block) 194 end 195 end
# File lib/httpx/connection.rb 272 def reset 273 return if @state == :closing || @state == :closed 274 275 transition(:closing) 276 277 transition(:closed) 278 end
# File lib/httpx/connection.rb 280 def send(request) 281 return @coalesced_connection.send(request) if @coalesced_connection 282 283 if @parser && !@write_buffer.full? 284 if @response_received_at && @keep_alive_timeout && 285 Utils.elapsed_time(@response_received_at) > @keep_alive_timeout 286 # when pushing a request into an existing connection, we have to check whether there 287 # is the possibility that the connection might have extended the keep alive timeout. 288 # for such cases, we want to ping for availability before deciding to shovel requests. 289 log(level: 3) { "keep alive timeout expired, pinging connection..." } 290 @pending << request 291 transition(:active) if @state == :inactive 292 parser.ping 293 return 294 end 295 296 send_request_to_parser(request) 297 else 298 @pending << request 299 end 300 end
# File lib/httpx/connection.rb 258 def terminate 259 @connected_at = nil if @state == :closed 260 261 close 262 end
# File lib/httpx/connection.rb 302 def timeout 303 return if @state == :closed || @state == :inactive 304 305 return @timeout if @timeout 306 307 return @options.timeout[:connect_timeout] if @state == :idle 308 309 @options.timeout[:operation_timeout] 310 end