-
# frozen_string_literal: true
-
-
26
require "resolv"
-
26
require "forwardable"
-
26
require "httpx/io"
-
26
require "httpx/buffer"
-
-
26
module HTTPX
-
# The Connection can be watched for IO events.
-
#
-
# It contains the +io+ object to read/write from, and knows what to do when it can.
-
#
-
# It defers connecting until absolutely necessary. Connection should be triggered from
-
# the IO selector (until then, any request will be queued).
-
#
-
# A connection boots up its parser after connection is established. All pending requests
-
# will be redirected there after connection.
-
#
-
# A connection can be prevented from closing by the parser, that is, if there are pending
-
# requests. This will signal that the connection was prematurely closed, due to a possible
-
# number of conditions:
-
#
-
# * Remote peer closed the connection ("Connection: close");
-
# * Remote peer doesn't support pipelining;
-
#
-
# A connection may also route requests for a different host for which the +io+ was connected
-
# to, provided that the IP is the same and the port and scheme as well. This will allow to
-
# share the same socket to send HTTP/2 requests to different hosts.
-
#
-
26
class Connection
-
26
extend Forwardable
-
26
include Loggable
-
26
include Callbacks
-
-
26
using URIExtensions
-
26
using NumericExtensions
-
-
26
require "httpx/connection/http2"
-
26
require "httpx/connection/http1"
-
-
26
def_delegator :@io, :closed?
-
-
26
def_delegator :@write_buffer, :empty?
-
-
26
attr_reader :type, :io, :origin, :origins, :state, :pending, :options
-
-
26
attr_writer :timers
-
-
26
attr_accessor :family
-
-
26
def initialize(type, uri, options)
-
6568
@type = type
-
6568
@origins = [uri.origin]
-
6568
@origin = Utils.to_uri(uri.origin)
-
6568
@options = Options.new(options)
-
6568
@window_size = @options.window_size
-
6568
@read_buffer = Buffer.new(@options.buffer_size)
-
6568
@write_buffer = Buffer.new(@options.buffer_size)
-
6568
@pending = []
-
6568
on(:error, &method(:on_error))
-
6568
if @options.io
-
# if there's an already open IO, get its
-
# peer address, and force-initiate the parser
-
68
transition(:already_open)
-
68
@io = build_socket
-
68
parser
-
else
-
6500
transition(:idle)
-
end
-
-
6568
@inflight = 0
-
6568
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
-
6568
@total_timeout = @options.timeout[:total_timeout]
-
-
6568
self.addresses = @options.addresses if @options.addresses
-
end
-
-
# this is a semi-private method, to be used by the resolver
-
# to initiate the io object.
-
26
def addresses=(addrs)
-
6227
if @io
-
@io.add_addresses(addrs)
-
else
-
6227
@io = build_socket(addrs)
-
end
-
end
-
-
26
def addresses
-
20714
@io && @io.addresses
-
end
-
-
26
def match?(uri, options)
-
5015
return false if @state == :closing || @state == :closed
-
-
4857
return false if exhausted?
-
-
(
-
(
-
3974
@origins.include?(uri.origin) &&
-
# if there is more than one origin to match, it means that this connection
-
# was the result of coalescing. To prevent blind trust in the case where the
-
# origin came from an ORIGIN frame, we're going to verify the hostname with the
-
# SSL certificate
-
2779
(@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host)))
-
146
) && @options == options
-
5021
) || (match_altsvcs?(uri) && match_altsvc_options?(uri, options))
-
end
-
-
26
def mergeable?(connection)
-
3230
return false if @state == :closing || @state == :closed || !@io
-
-
935
return false if exhausted?
-
-
926
return false unless connection.addresses
-
-
(
-
926
(open? && @origin == connection.origin) ||
-
878
!(@io.addresses & (connection.addresses || [])).empty?
-
338
) && @options == connection.options
-
end
-
-
# coalescable connections need to be mergeable!
-
# but internally, #mergeable? is called before #coalescable?
-
26
def coalescable?(connection)
-
10
if @io.protocol == "h2" &&
-
2
@origin.scheme == "https" &&
-
connection.origin.scheme == "https" &&
-
@io.can_verify_peer?
-
9
@io.verify_hostname(connection.origin.host)
-
else
-
3
@origin == connection.origin
-
end
-
end
-
-
26
def create_idle(options = {})
-
27
self.class.new(@type, @origin, @options.merge(options))
-
end
-
-
26
def merge(connection)
-
45
@origins |= connection.instance_variable_get(:@origins)
-
45
connection.purge_pending do |req|
-
27
send(req)
-
end
-
end
-
-
26
def purge_pending(&block)
-
54
pendings = []
-
54
if @parser
-
45
@inflight -= @parser.pending.size
-
45
pendings << @parser.pending
-
end
-
54
pendings << @pending
-
54
pendings.each do |pending|
-
99
pending.reject!(&block)
-
end
-
end
-
-
# checks if this is connection is an alternative service of
-
# +uri+
-
26
def match_altsvcs?(uri)
-
7285
@origins.any? { |origin| uri.altsvc_match?(origin) } ||
-
58
AltSvc.cached_altsvc(@origin).any? do |altsvc|
-
origin = altsvc["origin"]
-
origin.altsvc_match?(uri.origin)
-
838
end
-
end
-
-
26
def match_altsvc_options?(uri, options)
-
946
return @options == options unless @options.ssl[:hostname] == uri.host
-
-
9
dup_options = @options.merge(ssl: { hostname: nil })
-
9
dup_options.ssl.delete(:hostname)
-
9
dup_options == options
-
end
-
-
26
def connecting?
-
3240930
@state == :idle
-
end
-
-
26
def inflight?
-
6015
@parser && !@parser.empty? && !@write_buffer.empty?
-
end
-
-
26
def interests
-
# connecting
-
3227155
if connecting?
-
13220
connect
-
-
13220
return @io.interests if connecting?
-
end
-
-
# if the write buffer is full, we drain it
-
3220271
return :w unless @write_buffer.empty?
-
-
3176656
return @parser.interests if @parser
-
-
5
nil
-
end
-
-
26
def to_io
-
24769
@io.to_io
-
end
-
-
26
def call
-
21347
case @state
-
when :closed
-
return
-
when :closing
-
2538
consume
-
2538
transition(:closed)
-
2538
emit(:close)
-
when :open
-
11952
consume
-
end
-
1858
nil
-
end
-
-
26
def close
-
6050
transition(:active) if @state == :inactive
-
-
6050
@parser.close if @parser
-
end
-
-
# bypasses the state machine to force closing of connections still connecting.
-
# **only** used for Happy Eyeballs v2.
-
26
def force_reset
-
@state = :closing
-
transition(:closed)
-
emit(:close)
-
end
-
-
26
def reset
-
3509
transition(:closing)
-
3509
transition(:closed)
-
3509
emit(:close)
-
end
-
-
26
def send(request)
-
7359
if @parser && !@write_buffer.full?
-
476
request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
-
-
476
if @response_received_at && @keep_alive_timeout &&
-
36
Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
-
# when pushing a request into an existing connection, we have to check whether there
-
# is the possibility that the connection might have extended the keep alive timeout.
-
# for such cases, we want to ping for availability before deciding to shovel requests.
-
9
log(level: 3) { "keep alive timeout expired, pinging connection..." }
-
9
@pending << request
-
9
parser.ping
-
9
transition(:active) if @state == :inactive
-
9
return
-
end
-
-
467
send_request_to_parser(request)
-
else
-
6883
@pending << request
-
end
-
end
-
-
26
def timeout
-
4181482
if @total_timeout
-
1235
return @total_timeout unless @connected_at
-
-
473
elapsed_time = @total_timeout - Utils.elapsed_time(@connected_at)
-
-
473
if elapsed_time.negative?
-
1
ex = TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds")
-
1
ex.set_backtrace(caller)
-
1
on_error(ex)
-
1
return
-
end
-
-
472
return elapsed_time
-
end
-
-
4180247
return @timeout if defined?(@timeout)
-
-
68
return @options.timeout[:connect_timeout] if @state == :idle
-
-
68
@options.timeout[:operation_timeout]
-
end
-
-
26
def deactivate
-
1033
transition(:inactive)
-
end
-
-
26
def open?
-
7293
@state == :open || @state == :inactive
-
end
-
-
26
def raise_timeout_error(interval)
-
362
error = HTTPX::TimeoutError.new(interval, "timed out while waiting on select")
-
362
error.set_backtrace(caller)
-
362
on_error(error)
-
end
-
-
26
private
-
-
26
def connect
-
12399
transition(:open)
-
end
-
-
26
def exhausted?
-
5792
@parser && parser.exhausted?
-
end
-
-
26
def consume
-
14760
return unless @io
-
-
14760
catch(:called) do
-
14760
epiped = false
-
14760
loop do
-
33654
parser.consume
-
-
# we exit if there's no more requests to process
-
#
-
# this condition takes into account:
-
#
-
# * the number of inflight requests
-
# * the number of pending requests
-
# * whether the write buffer has bytes (i.e. for close handshake)
-
33654
if @pending.empty? && @inflight.zero? && @write_buffer.empty?
-
2850
log(level: 3) { "NO MORE REQUESTS..." }
-
2841
return
-
end
-
-
30813
@timeout = @current_timeout
-
-
30813
read_drained = false
-
30813
write_drained = nil
-
-
#
-
# tight read loop.
-
#
-
# read as much of the socket as possible.
-
#
-
# this tight loop reads all the data it can from the socket and pipes it to
-
# its parser.
-
#
-
1573
loop do
-
50057
siz = @io.read(@window_size, @read_buffer)
-
50139
log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." }
-
50057
unless siz
-
9
ex = EOFError.new("descriptor closed")
-
9
ex.set_backtrace(caller)
-
9
on_error(ex)
-
9
return
-
end
-
-
# socket has been drained. mark and exit the read loop.
-
50048
if siz.zero?
-
5127
read_drained = @read_buffer.empty?
-
5127
epiped = false
-
5127
break
-
end
-
-
44921
parser << @read_buffer.to_s
-
-
# continue reading if possible.
-
41352
break if interests == :w && !epiped
-
-
# exit the read loop if connection is preparing to be closed
-
34268
break if @state == :closing || @state == :closed
-
-
# exit #consume altogether if all outstanding requests have been dealt with
-
34260
return if @pending.empty? && @inflight.zero?
-
30813
end unless ((ints = interests).nil? || ints == :w || @state == :closing) && !epiped
-
-
#
-
# tight write loop.
-
#
-
# flush as many bytes as the sockets allow.
-
#
-
1906
loop do
-
# buffer has been drainned, mark and exit the write loop.
-
23081
if @write_buffer.empty?
-
# we only mark as drained on the first loop
-
3286
write_drained = write_drained.nil? && @inflight.positive?
-
-
3286
break
-
end
-
-
8254
begin
-
19795
siz = @io.write(@write_buffer)
-
rescue Errno::EPIPE
-
# this can happen if we still have bytes in the buffer to send to the server, but
-
# the server wants to respond immediately with some message, or an error. An example is
-
# when one's uploading a big file to an unintended endpoint, and the server stops the
-
# consumption, and responds immediately with an authorization of even method not allowed error.
-
# at this point, we have to let the connection switch to read-mode.
-
19
log(level: 2) { "pipe broken, could not flush buffer..." }
-
19
epiped = true
-
19
read_drained = false
-
19
break
-
end
-
19839
log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
-
19776
unless siz
-
ex = EOFError.new("descriptor closed")
-
ex.set_backtrace(caller)
-
on_error(ex)
-
return
-
end
-
-
# socket closed for writing. mark and exit the write loop.
-
19776
if siz.zero?
-
12
write_drained = !@write_buffer.empty?
-
12
break
-
end
-
-
# exit write loop if marked to consume from peer, or is closing.
-
19764
break if interests == :r || @state == :closing || @state == :closed
-
-
3617
write_drained = false
-
24582
end unless (ints = interests) == :r
-
-
24582
send_pending if @state == :open
-
-
# return if socket is drained
-
24582
next unless (ints != :r || read_drained) && (ints != :w || write_drained)
-
-
# gotta go back to the event loop. It happens when:
-
#
-
# * the socket is drained of bytes or it's not the interest of the conn to read;
-
# * theres nothing more to write, or it's not in the interest of the conn to write;
-
5696
log(level: 3) { "(#{ints}): WAITING FOR EVENTS..." }
-
5688
return
-
end
-
end
-
end
-
-
26
def send_pending
-
59672
while !@write_buffer.full? && (request = @pending.shift)
-
6868
send_request_to_parser(request)
-
end
-
end
-
-
26
def parser
-
95306
@parser ||= build_parser
-
end
-
-
26
def send_request_to_parser(request)
-
7333
@inflight += 1
-
7333
parser.send(request)
-
-
7333
set_request_timeouts(request)
-
-
7333
return unless @state == :inactive
-
-
184
transition(:active)
-
end
-
-
647
def build_parser(protocol = @io.protocol)
-
6022
parser = self.class.parser_type(protocol).new(@write_buffer, @options)
-
6022
set_parser_callbacks(parser)
-
6022
parser
-
end
-
-
26
def set_parser_callbacks(parser)
-
6147
parser.on(:response) do |request, response|
-
6946
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
-
9
emit(:altsvc, alt_origin, origin, alt_params)
-
end
-
6946
@response_received_at = Utils.now
-
6946
@inflight -= 1
-
6946
request.emit(:response, response)
-
end
-
6147
parser.on(:altsvc) do |alt_origin, origin, alt_params|
-
emit(:altsvc, alt_origin, origin, alt_params)
-
end
-
-
6147
parser.on(:pong, &method(:send_pending))
-
-
6147
parser.on(:promise) do |request, stream|
-
27
request.emit(:promise, parser, stream)
-
end
-
6147
parser.on(:exhausted) do
-
18
emit(:exhausted)
-
end
-
6147
parser.on(:origin) do |origin|
-
@origins |= [origin]
-
end
-
6147
parser.on(:close) do |force|
-
5871
transition(:closing)
-
5871
if force || @state == :idle
-
3131
transition(:closed)
-
3131
emit(:close)
-
end
-
end
-
6147
parser.on(:close_handshake) do
-
9
consume
-
end
-
6147
parser.on(:reset) do
-
3324
if parser.empty?
-
3134
reset
-
else
-
190
transition(:closing)
-
190
transition(:closed)
-
190
emit(:reset)
-
-
190
@parser.reset if @parser
-
190
transition(:idle)
-
190
transition(:open)
-
end
-
end
-
6147
parser.on(:current_timeout) do
-
2745
@current_timeout = @timeout = parser.timeout
-
end
-
6147
parser.on(:timeout) do |tout|
-
@timeout = tout
-
end
-
6147
parser.on(:error) do |request, ex|
-
369
case ex
-
when MisdirectedRequestError
-
9
emit(:misdirected, request)
-
else
-
360
response = ErrorResponse.new(request, ex, @options)
-
360
request.response = response
-
360
request.emit(:response, response)
-
end
-
end
-
end
-
-
26
def transition(nextstate)
-
41429
handle_transition(nextstate)
-
rescue Errno::ECONNABORTED,
-
Errno::ECONNREFUSED,
-
Errno::ECONNRESET,
-
Errno::EADDRNOTAVAIL,
-
Errno::EHOSTUNREACH,
-
Errno::EINVAL,
-
Errno::ENETUNREACH,
-
Errno::EPIPE,
-
Errno::ENOENT,
-
SocketError => e
-
# connect errors, exit gracefully
-
66
error = ConnectionError.new(e.message)
-
66
error.set_backtrace(e.backtrace)
-
66
connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, error) : handle_error(error)
-
66
@state = :closed
-
66
emit(:close)
-
rescue TLSError => e
-
# connect errors, exit gracefully
-
27
handle_error(e)
-
27
connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, e) : handle_error(e)
-
27
@state = :closed
-
27
emit(:close)
-
end
-
-
26
def handle_transition(nextstate)
-
41057
case nextstate
-
when :idle
-
6707
@timeout = @current_timeout = @options.timeout[:connect_timeout]
-
-
when :open
-
12819
return if @state == :closed
-
-
12819
@io.connect
-
12726
emit(:tcp_open, self) if @io.state == :connected
-
-
12726
return unless @io.connected?
-
-
6251
@connected_at = Utils.now
-
-
6251
send_pending
-
-
6251
@timeout = @current_timeout = parser.timeout
-
6251
emit(:open)
-
when :inactive
-
1033
return unless @state == :open
-
when :closing
-
9834
return unless @state == :open
-
-
when :closed
-
9584
return unless @state == :closing
-
6384
return unless @write_buffer.empty?
-
-
6339
purge_after_closed
-
when :already_open
-
68
nextstate = :open
-
68
send_pending
-
when :active
-
475
return unless @state == :inactive
-
-
475
nextstate = :open
-
475
emit(:activate)
-
end
-
27290
@state = nextstate
-
end
-
-
26
def purge_after_closed
-
6347
@io.close if @io
-
6347
@read_buffer.clear
-
6347
remove_instance_variable(:@timeout) if defined?(@timeout)
-
end
-
-
26
def build_socket(addrs = nil)
-
6295
transport_type = case @type
-
3597
when "tcp" then TCP
-
2666
when "ssl" then SSL
-
32
when "unix" then UNIX
-
else
-
raise Error, "unsupported transport (#{@type})"
-
end
-
6295
transport_type.new(@origin, addrs, @options)
-
end
-
-
26
def on_error(error)
-
628
if error.instance_of?(TimeoutError)
-
-
362
if @total_timeout && @connected_at &&
-
30
Utils.elapsed_time(@connected_at) > @total_timeout
-
280
ex = TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds")
-
280
ex.set_backtrace(error.backtrace)
-
280
error = ex
-
else
-
# inactive connections do not contribute to the select loop, therefore
-
# they should not fail due to such errors.
-
82
return if @state == :inactive
-
-
82
if @timeout
-
82
@timeout -= error.timeout
-
82
return unless @timeout <= 0
-
end
-
-
18
error = error.to_connection_error if connecting?
-
end
-
end
-
564
handle_error(error)
-
564
reset
-
end
-
-
26
def handle_error(error)
-
684
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
-
1585
while (request = @pending.shift)
-
298
response = ErrorResponse.new(request, error, request.options)
-
298
request.response = response
-
298
request.emit(:response, response)
-
end
-
end
-
-
26
def set_request_timeouts(request)
-
7333
write_timeout = request.write_timeout
-
1
request.once(:headers) do
-
36
@timers.after(write_timeout) { write_timeout_callback(request, write_timeout) }
-
7333
end unless write_timeout.nil? || write_timeout.infinite?
-
-
7333
read_timeout = request.read_timeout
-
1
request.once(:done) do
-
27
@timers.after(read_timeout) { read_timeout_callback(request, read_timeout) }
-
7333
end unless read_timeout.nil? || read_timeout.infinite?
-
-
7333
request_timeout = request.request_timeout
-
17
request.once(:headers) do
-
27
@timers.after(request_timeout) { read_timeout_callback(request, request_timeout, RequestTimeoutError) }
-
7333
end unless request_timeout.nil? || request_timeout.infinite?
-
end
-
-
26
def write_timeout_callback(request, write_timeout)
-
18
return if request.state == :done
-
-
9
@write_buffer.clear
-
9
error = WriteTimeoutError.new(request, nil, write_timeout)
-
9
on_error(error)
-
end
-
-
26
def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError)
-
18
response = request.response
-
-
18
return if response && response.finished?
-
-
18
@write_buffer.clear
-
18
error = error_type.new(request, request.response, read_timeout)
-
18
on_error(error)
-
end
-
-
26
class << self
-
26
def parser_type(protocol)
-
6169
case protocol
-
2752
when "h2" then HTTP2
-
3417
when "http/1.1" then HTTP1
-
else
-
raise Error, "unsupported protocol (##{protocol})"
-
end
-
end
-
end
-
end
-
end