-
# frozen_string_literal: true
-
-
20
require "resolv"
-
20
require "forwardable"
-
20
require "httpx/io"
-
20
require "httpx/buffer"
-
-
20
module HTTPX
-
# The Connection can be watched for IO events.
-
#
-
# It contains the +io+ object to read/write from, and knows what to do when it can.
-
#
-
# It defers connecting until absolutely necessary. Connection should be triggered from
-
# the IO selector (until then, any request will be queued).
-
#
-
# A connection boots up its parser after connection is established. All pending requests
-
# will be redirected there after connection.
-
#
-
# A connection can be prevented from closing by the parser, that is, if there are pending
-
# requests. This will signal that the connection was prematurely closed, due to a possible
-
# number of conditions:
-
#
-
# * Remote peer closed the connection ("Connection: close");
-
# * Remote peer doesn't support pipelining;
-
#
-
# A connection may also route requests for a different host for which the +io+ was connected
-
# to, provided that the IP is the same and the port and scheme as well. This will allow to
-
# share the same socket to send HTTP/2 requests to different hosts.
-
#
-
20
class Connection
-
20
extend Forwardable
-
20
include Loggable
-
20
include Callbacks
-
-
20
using URIExtensions
-
-
20
require "httpx/connection/http2"
-
20
require "httpx/connection/http1"
-
-
20
def_delegator :@io, :closed?
-
-
20
def_delegator :@write_buffer, :empty?
-
-
20
attr_reader :type, :io, :origin, :origins, :state, :pending, :options, :ssl_session
-
-
20
attr_writer :timers
-
-
20
attr_accessor :family
-
-
20
def initialize(type, uri, options)
-
4200
@type = type
-
4200
@origins = [uri.origin]
-
4200
@origin = Utils.to_uri(uri.origin)
-
4200
@options = Options.new(options)
-
4200
@window_size = @options.window_size
-
4200
@read_buffer = Buffer.new(@options.buffer_size)
-
4200
@write_buffer = Buffer.new(@options.buffer_size)
-
4200
@pending = []
-
4200
on(:error, &method(:on_error))
-
4200
if @options.io
-
# if there's an already open IO, get its
-
# peer address, and force-initiate the parser
-
41
transition(:already_open)
-
41
@io = build_socket
-
41
parser
-
else
-
4159
transition(:idle)
-
end
-
-
4200
@inflight = 0
-
4200
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
-
-
4200
@intervals = []
-
-
4200
self.addresses = @options.addresses if @options.addresses
-
end
-
-
# this is a semi-private method, to be used by the resolver
-
# to initiate the io object.
-
20
def addresses=(addrs)
-
4082
if @io
-
138
@io.add_addresses(addrs)
-
else
-
3944
@io = build_socket(addrs)
-
end
-
end
-
-
20
def addresses
-
11053
@io && @io.addresses
-
end
-
-
20
def match?(uri, options)
-
4180
return false if !used? && (@state == :closing || @state == :closed)
-
-
185
(
-
3995
@origins.include?(uri.origin) &&
-
# if there is more than one origin to match, it means that this connection
-
# was the result of coalescing. To prevent blind trust in the case where the
-
# origin came from an ORIGIN frame, we're going to verify the hostname with the
-
# SSL certificate
-
1906
(@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host)))
-
) && @options == options
-
end
-
-
20
def expired?
-
return false unless @io
-
-
@io.expired?
-
end
-
-
20
def mergeable?(connection)
-
2559
return false if @state == :closing || @state == :closed || !@io
-
-
1370
return false unless connection.addresses
-
-
(
-
1370
(open? && @origin == connection.origin) ||
-
1314
!(@io.addresses & (connection.addresses || [])).empty?
-
) && @options == connection.options
-
end
-
-
# coalescable connections need to be mergeable!
-
# but internally, #mergeable? is called before #coalescable?
-
20
def coalescable?(connection)
-
12
if @io.protocol == "h2" &&
-
@origin.scheme == "https" &&
-
connection.origin.scheme == "https" &&
-
@io.can_verify_peer?
-
5
@io.verify_hostname(connection.origin.host)
-
else
-
7
@origin == connection.origin
-
end
-
end
-
-
20
def create_idle(options = {})
-
5
self.class.new(@type, @origin, @options.merge(options))
-
end
-
-
20
def merge(connection)
-
19
@origins |= connection.instance_variable_get(:@origins)
-
19
if connection.ssl_session
-
4
@ssl_session = connection.ssl_session
-
@io.session_new_cb do |sess|
-
@ssl_session = sess
-
4
end if @io
-
end
-
19
connection.purge_pending do |req|
-
5
send(req)
-
end
-
end
-
-
20
def purge_pending(&block)
-
19
pendings = []
-
19
if @parser
-
10
@inflight -= @parser.pending.size
-
10
pendings << @parser.pending
-
end
-
19
pendings << @pending
-
19
pendings.each do |pending|
-
29
pending.reject!(&block)
-
end
-
end
-
-
20
def connecting?
-
1662074
@state == :idle
-
end
-
-
20
def inflight?
-
4039
@parser && !@parser.empty? && !@write_buffer.empty?
-
end
-
-
20
def interests
-
# connecting
-
1655558
if connecting?
-
6312
connect
-
-
6311
return @io.interests if connecting?
-
end
-
-
# if the write buffer is full, we drain it
-
1649841
return :w unless @write_buffer.empty?
-
-
1624674
return @parser.interests if @parser
-
-
nil
-
end
-
-
20
def to_io
-
14299
@io.to_io
-
end
-
-
20
def call
-
11936
case @state
-
when :idle
-
5610
connect
-
5601
consume
-
when :closed
-
return
-
when :closing
-
consume
-
transition(:closed)
-
when :open
-
6190
consume
-
end
-
1525
nil
-
end
-
-
20
def close
-
4062
transition(:active) if @state == :inactive
-
-
4062
@parser.close if @parser
-
end
-
-
20
def terminate
-
4062
@connected_at = nil if @state == :closed
-
-
4062
close
-
end
-
-
# bypasses the state machine to force closing of connections still connecting.
-
# **only** used for Happy Eyeballs v2.
-
20
def force_reset
-
97
@state = :closing
-
97
transition(:closed)
-
end
-
-
20
def reset
-
6212
return if @state == :closing || @state == :closed
-
-
4120
transition(:closing)
-
4120
unless @write_buffer.empty?
-
# handshakes, try sending
-
1565
consume
-
1565
@write_buffer.clear
-
end
-
4120
transition(:closed)
-
end
-
-
20
def send(request)
-
5270
if @parser && !@write_buffer.full?
-
232
if @response_received_at && @keep_alive_timeout &&
-
Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
-
# when pushing a request into an existing connection, we have to check whether there
-
# is the possibility that the connection might have extended the keep alive timeout.
-
# for such cases, we want to ping for availability before deciding to shovel requests.
-
5
log(level: 3) { "keep alive timeout expired, pinging connection..." }
-
5
@pending << request
-
5
parser.ping
-
5
transition(:active) if @state == :inactive
-
5
return
-
end
-
-
227
send_request_to_parser(request)
-
else
-
5038
@pending << request
-
end
-
end
-
-
20
def timeout
-
1638999
return @timeout if @timeout
-
-
1622047
return @options.timeout[:connect_timeout] if @state == :idle
-
-
1622047
@options.timeout[:operation_timeout]
-
end
-
-
20
def idling
-
421
purge_after_closed
-
421
@write_buffer.clear
-
421
transition(:idle)
-
421
@parser = nil if @parser
-
end
-
-
20
def used?
-
8972
@connected_at
-
end
-
-
20
def deactivate
-
654
transition(:inactive)
-
end
-
-
20
def open?
-
5464
@state == :open || @state == :inactive
-
end
-
-
20
def handle_socket_timeout(interval)
-
284
@intervals.delete_if(&:elapsed?)
-
-
284
unless @intervals.empty?
-
# remove the intervals which will elapse
-
-
264
return
-
end
-
-
20
error = HTTPX::TimeoutError.new(interval, "timed out while waiting on select")
-
20
error.set_backtrace(caller)
-
20
on_error(error)
-
end
-
-
20
private
-
-
20
def connect
-
11251
transition(:open)
-
end
-
-
20
def consume
-
13593
return unless @io
-
-
13593
catch(:called) do
-
13593
epiped = false
-
13593
loop do
-
# connection may have
-
25786
return if @state == :idle
-
-
24069
parser.consume
-
-
# we exit if there's no more requests to process
-
#
-
# this condition takes into account:
-
#
-
# * the number of inflight requests
-
# * the number of pending requests
-
# * whether the write buffer has bytes (i.e. for close handshake)
-
24059
if @pending.empty? && @inflight.zero? && @write_buffer.empty?
-
1686
log(level: 3) { "NO MORE REQUESTS..." }
-
1668
return
-
end
-
-
22391
@timeout = @current_timeout
-
-
22391
read_drained = false
-
22391
write_drained = nil
-
-
#
-
# tight read loop.
-
#
-
# read as much of the socket as possible.
-
#
-
# this tight loop reads all the data it can from the socket and pipes it to
-
# its parser.
-
#
-
loop do
-
28586
siz = @io.read(@window_size, @read_buffer)
-
28659
log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." }
-
28586
unless siz
-
6
ex = EOFError.new("descriptor closed")
-
6
ex.set_backtrace(caller)
-
6
on_error(ex)
-
6
return
-
end
-
-
# socket has been drained. mark and exit the read loop.
-
28580
if siz.zero?
-
5370
read_drained = @read_buffer.empty?
-
5370
epiped = false
-
5370
break
-
end
-
-
23210
parser << @read_buffer.to_s
-
-
# continue reading if possible.
-
20755
break if interests == :w && !epiped
-
-
# exit the read loop if connection is preparing to be closed
-
16134
break if @state == :closing || @state == :closed
-
-
# exit #consume altogether if all outstanding requests have been dealt with
-
16130
return if @pending.empty? && @inflight.zero?
-
22391
end unless ((ints = interests).nil? || ints == :w || @state == :closing) && !epiped
-
-
#
-
# tight write loop.
-
#
-
# flush as many bytes as the sockets allow.
-
#
-
loop do
-
# buffer has been drainned, mark and exit the write loop.
-
14887
if @write_buffer.empty?
-
# we only mark as drained on the first loop
-
2166
write_drained = write_drained.nil? && @inflight.positive?
-
-
2166
break
-
end
-
-
begin
-
12721
siz = @io.write(@write_buffer)
-
rescue Errno::EPIPE
-
# this can happen if we still have bytes in the buffer to send to the server, but
-
# the server wants to respond immediately with some message, or an error. An example is
-
# when one's uploading a big file to an unintended endpoint, and the server stops the
-
# consumption, and responds immediately with an authorization of even method not allowed error.
-
# at this point, we have to let the connection switch to read-mode.
-
6
log(level: 2) { "pipe broken, could not flush buffer..." }
-
6
epiped = true
-
6
read_drained = false
-
6
break
-
end
-
12768
log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
-
12713
unless siz
-
ex = EOFError.new("descriptor closed")
-
ex.set_backtrace(caller)
-
on_error(ex)
-
return
-
end
-
-
# socket closed for writing. mark and exit the write loop.
-
12713
if siz.zero?
-
19
write_drained = !@write_buffer.empty?
-
19
break
-
end
-
-
# exit write loop if marked to consume from peer, or is closing.
-
12694
break if interests == :r || @state == :closing || @state == :closed
-
-
2059
write_drained = false
-
18187
end unless (ints = interests) == :r
-
-
18185
send_pending if @state == :open
-
-
# return if socket is drained
-
18185
next unless (ints != :r || read_drained) && (ints != :w || write_drained)
-
-
# gotta go back to the event loop. It happens when:
-
#
-
# * the socket is drained of bytes or it's not the interest of the conn to read;
-
# * theres nothing more to write, or it's not in the interest of the conn to write;
-
6013
log(level: 3) { "(#{ints}): WAITING FOR EVENTS..." }
-
5992
return
-
end
-
end
-
end
-
-
20
def send_pending
-
52139
while !@write_buffer.full? && (request = @pending.shift)
-
15313
send_request_to_parser(request)
-
end
-
end
-
-
20
def parser
-
67683
@parser ||= build_parser
-
end
-
-
20
def send_request_to_parser(request)
-
15538
@inflight += 1
-
15538
request.peer_address = @io.ip
-
15538
parser.send(request)
-
-
15538
set_request_timeouts(request)
-
-
15538
return unless @state == :inactive
-
-
43
transition(:active)
-
end
-
-
20
def build_parser(protocol = @io.protocol)
-
4220
parser = self.class.parser_type(protocol).new(@write_buffer, @options)
-
4220
set_parser_callbacks(parser)
-
4220
parser
-
end
-
-
20
def set_parser_callbacks(parser)
-
4300
parser.on(:response) do |request, response|
-
4661
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
-
5
emit(:altsvc, alt_origin, origin, alt_params)
-
end
-
4661
@response_received_at = Utils.now
-
4661
@inflight -= 1
-
4661
request.emit(:response, response)
-
end
-
4300
parser.on(:altsvc) do |alt_origin, origin, alt_params|
-
emit(:altsvc, alt_origin, origin, alt_params)
-
end
-
-
4300
parser.on(:pong, &method(:send_pending))
-
-
4300
parser.on(:promise) do |request, stream|
-
15
request.emit(:promise, parser, stream)
-
end
-
4300
parser.on(:exhausted) do
-
5
@pending.concat(parser.pending)
-
5
emit(:exhausted)
-
end
-
4300
parser.on(:origin) do |origin|
-
@origins |= [origin]
-
end
-
4300
parser.on(:close) do |force|
-
3802
if force
-
3802
reset
-
3797
emit(:terminate)
-
end
-
end
-
4300
parser.on(:close_handshake) do
-
5
consume
-
end
-
4300
parser.on(:reset) do
-
2219
@pending.concat(parser.pending) unless parser.empty?
-
2219
reset
-
2214
idling unless @pending.empty?
-
end
-
4300
parser.on(:current_timeout) do
-
1801
@current_timeout = @timeout = parser.timeout
-
end
-
4300
parser.on(:timeout) do |tout|
-
1715
@timeout = tout
-
end
-
4300
parser.on(:error) do |request, ex|
-
279
case ex
-
when MisdirectedRequestError
-
5
emit(:misdirected, request)
-
else
-
274
response = ErrorResponse.new(request, ex, @options)
-
274
request.response = response
-
274
request.emit(:response, response)
-
end
-
end
-
end
-
-
20
def transition(nextstate)
-
26822
handle_transition(nextstate)
-
rescue Errno::ECONNABORTED,
-
Errno::ECONNREFUSED,
-
Errno::ECONNRESET,
-
Errno::EADDRNOTAVAIL,
-
Errno::EHOSTUNREACH,
-
Errno::EINVAL,
-
Errno::ENETUNREACH,
-
Errno::EPIPE,
-
Errno::ENOENT,
-
SocketError => e
-
# connect errors, exit gracefully
-
48
error = ConnectionError.new(e.message)
-
48
error.set_backtrace(e.backtrace)
-
48
connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, error) : handle_error(error)
-
48
@state = :closed
-
48
emit(:close)
-
rescue TLSError => e
-
# connect errors, exit gracefully
-
15
handle_error(e)
-
15
connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, e) : handle_error(e)
-
15
@state = :closed
-
15
emit(:close)
-
end
-
-
20
def handle_transition(nextstate)
-
26495
case nextstate
-
when :idle
-
4590
@timeout = @current_timeout = @options.timeout[:connect_timeout]
-
-
4590
@connected_at = nil
-
when :open
-
11453
return if @state == :closed
-
-
11453
@io.connect
-
11390
emit(:tcp_open, self) if @io.state == :connected
-
-
11390
return unless @io.connected?
-
-
4198
@connected_at = Utils.now
-
-
4198
send_pending
-
-
4198
@timeout = @current_timeout = parser.timeout
-
4198
emit(:open)
-
when :inactive
-
654
return unless @state == :open
-
when :closing
-
4469
return unless @state == :idle || @state == :open
-
-
when :closed
-
4529
return unless @state == :closing
-
4526
return unless @write_buffer.empty?
-
-
4450
purge_after_closed
-
4450
emit(:close) if @pending.empty?
-
when :already_open
-
41
nextstate = :open
-
# the first check for given io readiness must still use a timeout.
-
# connect is the reasonable choice in such a case.
-
41
@timeout = @options.timeout[:connect_timeout]
-
41
send_pending
-
when :active
-
380
return unless @state == :inactive
-
-
380
nextstate = :open
-
380
emit(:activate)
-
end
-
18875
@state = nextstate
-
end
-
-
20
def purge_after_closed
-
4876
@io.close if @io
-
4876
@read_buffer.clear
-
4876
@timeout = nil
-
end
-
-
20
def build_socket(addrs = nil)
-
3985
case @type
-
when "tcp"
-
2246
TCP.new(@origin, addrs, @options)
-
when "ssl"
-
1723
SSL.new(@origin, addrs, @options) do |sock|
-
1709
sock.ssl_session = @ssl_session
-
1709
sock.session_new_cb do |sess|
-
2699
@ssl_session = sess
-
-
2699
sock.ssl_session = sess
-
end
-
end
-
when "unix"
-
16
UNIX.new(@origin, addrs, @options)
-
else
-
raise Error, "unsupported transport (#{@type})"
-
end
-
end
-
-
20
def on_error(error)
-
488
if error.instance_of?(TimeoutError)
-
-
# inactive connections do not contribute to the select loop, therefore
-
# they should not fail due to such errors.
-
20
return if @state == :inactive
-
-
20
if @timeout
-
20
@timeout -= error.timeout
-
20
return unless @timeout <= 0
-
end
-
-
20
error = error.to_connection_error if connecting?
-
end
-
488
handle_error(error)
-
488
reset
-
end
-
-
20
def handle_error(error)
-
566
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
-
1275
while (request = @pending.shift)
-
268
response = ErrorResponse.new(request, error, request.options)
-
268
request.response = response
-
268
request.emit(:response, response)
-
end
-
end
-
-
20
def set_request_timeouts(request)
-
15538
write_timeout = request.write_timeout
-
15538
read_timeout = request.read_timeout
-
15538
request_timeout = request.request_timeout
-
-
15538
unless write_timeout.nil? || write_timeout.infinite?
-
15538
set_request_timeout(request, write_timeout, :headers, %i[done response]) do
-
15
write_timeout_callback(request, write_timeout)
-
end
-
end
-
-
15538
unless read_timeout.nil? || read_timeout.infinite?
-
15364
set_request_timeout(request, read_timeout, :done, :response) do
-
15
read_timeout_callback(request, read_timeout)
-
end
-
end
-
-
15538
return if request_timeout.nil? || request_timeout.infinite?
-
-
290
set_request_timeout(request, request_timeout, :headers, :response) do
-
217
read_timeout_callback(request, request_timeout, RequestTimeoutError)
-
end
-
end
-
-
20
def write_timeout_callback(request, write_timeout)
-
15
return if request.state == :done
-
-
15
@write_buffer.clear
-
15
error = WriteTimeoutError.new(request, nil, write_timeout)
-
15
on_error(error)
-
end
-
-
20
def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError)
-
232
response = request.response
-
-
232
return if response && response.finished?
-
-
232
@write_buffer.clear
-
232
error = error_type.new(request, request.response, read_timeout)
-
232
on_error(error)
-
end
-
-
20
def set_request_timeout(request, timeout, start_event, finish_events, &callback)
-
31242
request.once(start_event) do
-
30840
interval = @timers.after(timeout, callback)
-
-
30840
Array(finish_events).each do |event|
-
# clean up reques timeouts if the connection errors out
-
46237
request.once(event) do
-
46142
if @intervals.include?(interval)
-
45896
interval.delete(callback)
-
45896
@intervals.delete(interval) if interval.no_callbacks?
-
end
-
end
-
end
-
-
30840
@intervals << interval
-
end
-
end
-
-
20
class << self
-
20
def parser_type(protocol)
-
4322
case protocol
-
1803
when "h2" then HTTP2
-
2519
when "http/1.1" then HTTP1
-
else
-
raise Error, "unsupported protocol (##{protocol})"
-
end
-
end
-
end
-
end
-
end