-
# frozen_string_literal: true
-
-
30
require "resolv"
-
30
require "forwardable"
-
30
require "httpx/io"
-
30
require "httpx/buffer"
-
-
30
module HTTPX
-
# The Connection can be watched for IO events.
-
#
-
# It contains the +io+ object to read/write from, and knows what to do when it can.
-
#
-
# It defers connecting until absolutely necessary. Connection should be triggered from
-
# the IO selector (until then, any request will be queued).
-
#
-
# A connection boots up its parser after connection is established. All pending requests
-
# will be redirected there after connection.
-
#
-
# A connection can be prevented from closing by the parser, that is, if there are pending
-
# requests. This will signal that the connection was prematurely closed, due to a possible
-
# number of conditions:
-
#
-
# * Remote peer closed the connection ("Connection: close");
-
# * Remote peer doesn't support pipelining;
-
#
-
# A connection may also route requests for a different host for which the +io+ was connected
-
# to, provided that the IP is the same and the port and scheme as well. This will allow to
-
# share the same socket to send HTTP/2 requests to different hosts.
-
#
-
30
class Connection
-
30
extend Forwardable
-
30
include Registry
-
30
include Loggable
-
30
include Callbacks
-
-
30
using URIExtensions
-
30
using NumericExtensions
-
-
30
require "httpx/connection/http2"
-
30
require "httpx/connection/http1"
-
-
30
BUFFER_SIZE = 1 << 14
-
-
30
def_delegator :@io, :closed?
-
-
30
def_delegator :@write_buffer, :empty?
-
-
30
attr_reader :type, :io, :origin, :origins, :state, :pending, :options
-
-
30
attr_writer :timers
-
-
30
attr_accessor :family
-
-
30
def initialize(type, uri, options)
-
7712
@type = type
-
7712
@origins = [uri.origin]
-
7712
@origin = Utils.to_uri(uri.origin)
-
7712
@options = Options.new(options)
-
7712
@window_size = @options.window_size
-
7712
@read_buffer = Buffer.new(BUFFER_SIZE)
-
7712
@write_buffer = Buffer.new(BUFFER_SIZE)
-
7712
@pending = []
-
7712
on(:error, &method(:on_error))
-
7712
if @options.io
-
# if there's an already open IO, get its
-
# peer address, and force-initiate the parser
-
84
transition(:already_open)
-
84
@io = IO.registry(@type).new(@origin, nil, @options)
-
84
parser
-
else
-
7628
transition(:idle)
-
end
-
-
7712
@inflight = 0
-
7712
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
-
7712
@total_timeout = @options.timeout[:total_timeout]
-
-
7712
self.addresses = @options.addresses if @options.addresses
-
end
-
-
# this is a semi-private method, to be used by the resolver
-
# to initiate the io object.
-
30
def addresses=(addrs)
-
7414
if @io
-
@io.add_addresses(addrs)
-
else
-
7414
@io = IO.registry(@type).new(@origin, addrs, @options)
-
end
-
end
-
-
30
def addresses
-
24864
@io && @io.addresses
-
end
-
-
30
def match?(uri, options)
-
3550
return false if @state == :closing || @state == :closed
-
-
3392
return false if exhausted?
-
-
(
-
(
-
2262
@origins.include?(uri.origin) &&
-
# if there is more than one origin to match, it means that this connection
-
# was the result of coalescing. To prevent blind trust in the case where the
-
# origin came from an ORIGIN frame, we're going to verify the hostname with the
-
# SSL certificate
-
2680
(@origins.size == 1 || @origin == uri.origin || (@io && @io.verify_hostname(uri.host)))
-
98
) && @options == options
-
3254
) || (match_altsvcs?(uri) && match_altsvc_options?(uri, options))
-
end
-
-
30
def mergeable?(connection)
-
2190
return false if @state == :closing || @state == :closed || !@io
-
-
1239
return false if exhausted?
-
-
1230
return false unless connection.addresses
-
-
(
-
1218
(open? && @origin == connection.origin) ||
-
1477
!(@io.addresses & connection.addresses).empty?
-
619
) && @options == connection.options
-
end
-
-
# coalescable connections need to be mergeable!
-
# but internally, #mergeable? is called before #coalescable?
-
30
def coalescable?(connection)
-
10
if @io.protocol == "h2" &&
-
2
@origin.scheme == "https" &&
-
connection.origin.scheme == "https" &&
-
@io.can_verify_peer?
-
9
@io.verify_hostname(connection.origin.host)
-
else
-
3
@origin == connection.origin
-
end
-
end
-
-
30
def create_idle(options = {})
-
27
self.class.new(@type, @origin, @options.merge(options))
-
end
-
-
30
def merge(connection)
-
47
@origins |= connection.instance_variable_get(:@origins)
-
47
connection.purge_pending do |req|
-
27
send(req)
-
end
-
end
-
-
30
def purge_pending(&block)
-
58
pendings = []
-
58
if @parser
-
49
@inflight -= @parser.pending.size
-
49
pendings << @parser.pending
-
end
-
58
pendings << @pending
-
58
pendings.each do |pending|
-
107
pending.reject!(&block)
-
end
-
end
-
-
# checks if this is connection is an alternative service of
-
# +uri+
-
30
def match_altsvcs?(uri)
-
3983
@origins.any? { |origin| uri.altsvc_match?(origin) } ||
-
29
AltSvc.cached_altsvc(@origin).any? do |altsvc|
-
origin = altsvc["origin"]
-
origin.altsvc_match?(uri.origin)
-
1004
end
-
end
-
-
30
def match_altsvc_options?(uri, options)
-
625
return @options == options unless @options.ssl[:hostname] == uri.host
-
-
11
dup_options = @options.merge(ssl: { hostname: nil })
-
11
dup_options.ssl.delete(:hostname)
-
11
dup_options == options
-
end
-
-
30
def connecting?
-
4525585
@state == :idle
-
end
-
-
30
def inflight?
-
7160
@parser && !@parser.empty? && !@write_buffer.empty?
-
end
-
-
30
def interests
-
# connecting
-
4509423
if connecting?
-
15707
connect
-
-
15707
return @io.interests if connecting?
-
end
-
-
# if the write buffer is full, we drain it
-
4501258
return :w unless @write_buffer.empty?
-
-
4454214
return @parser.interests if @parser
-
-
5
nil
-
end
-
-
30
def to_io
-
28884
@io.to_io
-
end
-
-
30
def call
-
24608
case @state
-
when :closed
-
return
-
when :closing
-
2513
consume
-
2513
transition(:closed)
-
2513
emit(:close)
-
when :open
-
13956
consume
-
end
-
1774
nil
-
end
-
-
30
def close
-
7195
transition(:active) if @state == :inactive
-
-
7195
@parser.close if @parser
-
end
-
-
# bypasses the state machine to force closing of connections still connecting.
-
# **only** used for Happy Eyeballs v2.
-
30
def force_reset
-
@state = :closing
-
transition(:closed)
-
emit(:close)
-
end
-
-
30
def reset
-
4651
transition(:closing)
-
4651
transition(:closed)
-
4651
emit(:close)
-
end
-
-
30
def send(request)
-
8579
if @parser && !@write_buffer.full?
-
518
request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
-
-
518
if @response_received_at && @keep_alive_timeout &&
-
46
Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
-
# when pushing a request into an existing connection, we have to check whether there
-
# is the possibility that the connection might have extended the keep alive timeout.
-
# for such cases, we want to ping for availability before deciding to shovel requests.
-
9
log(level: 3) { "keep alive timeout expired, pinging connection..." }
-
9
@pending << request
-
9
parser.ping
-
9
transition(:active) if @state == :inactive
-
9
return
-
end
-
-
509
send_request_to_parser(request)
-
else
-
8061
@pending << request
-
end
-
end
-
-
30
def timeout
-
5330094
if @total_timeout
-
1513
return @total_timeout unless @connected_at
-
-
554
elapsed_time = @total_timeout - Utils.elapsed_time(@connected_at)
-
-
554
if elapsed_time.negative?
-
1
ex = TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds")
-
1
ex.set_backtrace(caller)
-
1
on_error(ex)
-
1
return
-
end
-
-
553
return elapsed_time
-
end
-
-
5328581
return @timeout if defined?(@timeout)
-
-
84
return @options.timeout[:connect_timeout] if @state == :idle
-
-
84
@options.timeout[:operation_timeout]
-
end
-
-
30
def deactivate
-
1137
transition(:inactive)
-
end
-
-
30
def open?
-
8797
@state == :open || @state == :inactive
-
end
-
-
30
def raise_timeout_error(interval)
-
441
error = HTTPX::TimeoutError.new(interval, "timed out while waiting on select")
-
441
error.set_backtrace(caller)
-
441
on_error(error)
-
end
-
-
30
private
-
-
30
def connect
-
14721
transition(:open)
-
end
-
-
30
def exhausted?
-
4631
@parser && parser.exhausted?
-
end
-
-
30
def consume
-
16795
return unless @io
-
-
16795
catch(:called) do
-
16795
epiped = false
-
16795
loop do
-
37456
parser.consume
-
-
# we exit if there's no more requests to process
-
#
-
# this condition takes into account:
-
#
-
# * the number of inflight requests
-
# * the number of pending requests
-
# * whether the write buffer has bytes (i.e. for close handshake)
-
37456
if @pending.empty? && @inflight.zero? && @write_buffer.empty?
-
2818
log(level: 3) { "NO MORE REQUESTS..." }
-
2809
return
-
end
-
-
34647
@timeout = @current_timeout
-
-
34647
read_drained = false
-
34647
write_drained = nil
-
-
#
-
# tight read loop.
-
#
-
# read as much of the socket as possible.
-
#
-
# this tight loop reads all the data it can from the socket and pipes it to
-
# its parser.
-
#
-
1491
loop do
-
58753
siz = @io.read(@window_size, @read_buffer)
-
58859
log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." }
-
58753
unless siz
-
9
ex = EOFError.new("descriptor closed")
-
9
ex.set_backtrace(caller)
-
9
on_error(ex)
-
9
return
-
end
-
-
# socket has been drained. mark and exit the read loop.
-
58744
if siz.zero?
-
6035
read_drained = @read_buffer.empty?
-
6035
epiped = false
-
6035
break
-
end
-
-
52709
parser << @read_buffer.to_s
-
-
# continue reading if possible.
-
47953
break if interests == :w && !epiped
-
-
# exit the read loop if connection is preparing to be closed
-
40867
break if @state == :closing || @state == :closed
-
-
# exit #consume altogether if all outstanding requests have been dealt with
-
40859
return if @pending.empty? && @inflight.zero?
-
34647
end unless ((ints = interests).nil? || ints == :w || @state == :closing) && !epiped
-
-
#
-
# tight write loop.
-
#
-
# flush as many bytes as the sockets allow.
-
#
-
1845
loop do
-
# buffer has been drainned, mark and exit the write loop.
-
25510
if @write_buffer.empty?
-
# we only mark as drained on the first loop
-
3860
write_drained = write_drained.nil? && @inflight.positive?
-
-
3860
break
-
end
-
-
10236
begin
-
21650
siz = @io.write(@write_buffer)
-
rescue Errno::EPIPE
-
# this can happen if we still have bytes in the buffer to send to the server, but
-
# the server wants to respond immediately with some message, or an error. An example is
-
# when one's uploading a big file to an unintended endpoint, and the server stops the
-
# consumption, and responds immediately with an authorization of even method not allowed error.
-
# at this point, we have to let the connection switch to read-mode.
-
23
log(level: 2) { "pipe broken, could not flush buffer..." }
-
23
epiped = true
-
23
read_drained = false
-
23
break
-
end
-
21698
log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
-
21627
unless siz
-
ex = EOFError.new("descriptor closed")
-
ex.set_backtrace(caller)
-
on_error(ex)
-
return
-
end
-
-
# socket closed for writing. mark and exit the write loop.
-
21627
if siz.zero?
-
14
write_drained = !@write_buffer.empty?
-
14
break
-
end
-
-
# exit write loop if marked to consume from peer, or is closing.
-
21613
break if interests == :r || @state == :closing || @state == :closed
-
-
4265
write_drained = false
-
27271
end unless (ints = interests) == :r
-
-
27271
send_pending if @state == :open
-
-
# return if socket is drained
-
27271
next unless (ints != :r || read_drained) && (ints != :w || write_drained)
-
-
# gotta go back to the event loop. It happens when:
-
#
-
# * the socket is drained of bytes or it's not the interest of the conn to read;
-
# * theres nothing more to write, or it's not in the interest of the conn to write;
-
6625
log(level: 3) { "(#{ints}): WAITING FOR EVENTS..." }
-
6610
return
-
end
-
end
-
end
-
-
30
def send_pending
-
68668
while !@write_buffer.full? && (request = @pending.shift)
-
8056
send_request_to_parser(request)
-
end
-
end
-
-
30
def parser
-
110055
@parser ||= build_parser
-
end
-
-
30
def send_request_to_parser(request)
-
8563
@inflight += 1
-
8563
parser.send(request)
-
-
8563
set_request_timeouts(request)
-
-
8563
return unless @state == :inactive
-
-
168
transition(:active)
-
end
-
-
628
def build_parser(protocol = @io.protocol)
-
7167
parser = registry(protocol).new(@write_buffer, @options)
-
7167
set_parser_callbacks(parser)
-
7167
parser
-
end
-
-
30
def set_parser_callbacks(parser)
-
7318
parser.on(:response) do |request, response|
-
8116
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
-
11
emit(:altsvc, alt_origin, origin, alt_params)
-
end
-
8116
@response_received_at = Utils.now
-
8116
@inflight -= 1
-
8116
request.emit(:response, response)
-
end
-
7318
parser.on(:altsvc) do |alt_origin, origin, alt_params|
-
emit(:altsvc, alt_origin, origin, alt_params)
-
end
-
-
7318
parser.on(:pong, &method(:send_pending))
-
-
7318
parser.on(:promise) do |request, stream|
-
27
request.emit(:promise, parser, stream)
-
end
-
7318
parser.on(:exhausted) do
-
18
emit(:exhausted)
-
end
-
7318
parser.on(:origin) do |origin|
-
@origins |= [origin]
-
end
-
7318
parser.on(:close) do |force|
-
6941
transition(:closing)
-
6941
if force || @state == :idle
-
4235
transition(:closed)
-
4235
emit(:close)
-
end
-
end
-
7318
parser.on(:close_handshake) do
-
9
consume
-
end
-
7318
parser.on(:reset) do
-
4482
if parser.empty?
-
4256
reset
-
else
-
226
transition(:closing)
-
226
transition(:closed)
-
226
emit(:reset)
-
-
226
@parser.reset if @parser
-
226
transition(:idle)
-
226
transition(:open)
-
end
-
end
-
7318
parser.on(:current_timeout) do
-
2716
@current_timeout = @timeout = parser.timeout
-
end
-
7318
parser.on(:timeout) do |tout|
-
@timeout = tout
-
end
-
7318
parser.on(:error) do |request, ex|
-
419
case ex
-
when MisdirectedRequestError
-
9
emit(:misdirected, request)
-
else
-
410
response = ErrorResponse.new(request, ex, @options)
-
410
request.response = response
-
410
request.emit(:response, response)
-
end
-
end
-
end
-
-
30
def transition(nextstate)
-
49939
handle_transition(nextstate)
-
rescue Errno::ECONNABORTED,
-
Errno::ECONNREFUSED,
-
Errno::ECONNRESET,
-
Errno::EADDRNOTAVAIL,
-
Errno::EHOSTUNREACH,
-
Errno::EINVAL,
-
Errno::ENETUNREACH,
-
Errno::EPIPE,
-
Errno::ENOENT,
-
SocketError => e
-
# connect errors, exit gracefully
-
82
error = ConnectionError.new(e.message)
-
82
error.set_backtrace(e.backtrace)
-
82
connecting? && callbacks(:connect_error).any? ? emit(:connect_error, error) : handle_error(error)
-
82
@state = :closed
-
82
emit(:close)
-
rescue TLSError => e
-
# connect errors, exit gracefully
-
33
handle_error(e)
-
33
@state = :closed
-
33
emit(:close)
-
end
-
-
30
def handle_transition(nextstate)
-
49490
case nextstate
-
when :idle
-
7875
@timeout = @current_timeout = @options.timeout[:connect_timeout]
-
-
when :open
-
15222
return if @state == :closed
-
-
15222
@io.connect
-
15107
emit(:tcp_open, self) if @io.state == :connected
-
-
15107
return unless @io.connected?
-
-
7437
@connected_at = Utils.now
-
-
7437
send_pending
-
-
7437
@timeout = @current_timeout = parser.timeout
-
7437
emit(:open)
-
when :inactive
-
1137
return unless @state == :open
-
when :closing
-
12151
return unless @state == :open
-
-
when :closed
-
11903
return unless @state == :closing
-
7605
return unless @write_buffer.empty?
-
-
7555
purge_after_closed
-
when :already_open
-
84
nextstate = :open
-
84
send_pending
-
when :active
-
471
return unless @state == :inactive
-
-
471
nextstate = :open
-
471
emit(:activate)
-
end
-
32203
@state = nextstate
-
end
-
-
30
def purge_after_closed
-
7565
@io.close if @io
-
7565
@read_buffer.clear
-
7565
remove_instance_variable(:@timeout) if defined?(@timeout)
-
end
-
-
30
def on_error(error)
-
721
if error.instance_of?(TimeoutError)
-
-
441
if @total_timeout && @connected_at &&
-
30
Utils.elapsed_time(@connected_at) > @total_timeout
-
342
ex = TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds")
-
342
ex.set_backtrace(error.backtrace)
-
342
error = ex
-
else
-
# inactive connections do not contribute to the select loop, therefore
-
# they should not fail due to such errors.
-
99
return if @state == :inactive
-
-
99
if @timeout
-
99
@timeout -= error.timeout
-
99
return unless @timeout <= 0
-
end
-
-
22
error = error.to_connection_error if connecting?
-
end
-
end
-
644
handle_error(error)
-
640
reset
-
end
-
-
30
def handle_error(error)
-
759
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
-
1783
while (request = @pending.shift)
-
345
response = ErrorResponse.new(request, error, request.options)
-
345
request.response = response
-
345
request.emit(:response, response)
-
end
-
end
-
-
30
def set_request_timeouts(request)
-
8563
write_timeout = request.write_timeout
-
1
request.once(:headers) do
-
44
@timers.after(write_timeout) { write_timeout_callback(request, write_timeout) }
-
8563
end unless write_timeout.nil? || write_timeout.infinite?
-
-
8563
read_timeout = request.read_timeout
-
1
request.once(:done) do
-
33
@timers.after(read_timeout) { read_timeout_callback(request, read_timeout) }
-
8563
end unless read_timeout.nil? || read_timeout.infinite?
-
-
8563
request_timeout = request.request_timeout
-
21
request.once(:headers) do
-
33
@timers.after(request_timeout) { read_timeout_callback(request, request_timeout, RequestTimeoutError) }
-
8563
end unless request_timeout.nil? || request_timeout.infinite?
-
end
-
-
30
def write_timeout_callback(request, write_timeout)
-
22
return if request.state == :done
-
-
11
@write_buffer.clear
-
11
error = WriteTimeoutError.new(request, nil, write_timeout)
-
11
on_error(error)
-
end
-
-
30
def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError)
-
22
response = request.response
-
-
22
return if response && response.finished?
-
-
22
@write_buffer.clear
-
22
error = error_type.new(request, request.response, read_timeout)
-
22
on_error(error)
-
end
-
end
-
end