class HTTPX::Pool

  1. lib/httpx/pool.rb
Superclass: Object

Constants

POOL_TIMEOUT = 5  

Public Class methods

new(options)

Sets up the connection pool with the given options, which can be the following:

:max_connections

the maximum number of connections held in the pool.

:max_connections_per_origin

the maximum number of connections held in the pool pointing to a given origin.

:pool_timeout

the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)

[show source]
   # File lib/httpx/pool.rb
21 def initialize(options)
22   @max_connections = options.fetch(:max_connections, Float::INFINITY)
23   @max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY)
24   @pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT)
25   @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] }
26   @resolver_mtx = Thread::Mutex.new
27   @connections = []
28   @connection_mtx = Thread::Mutex.new
29   @connections_counter = 0
30   @max_connections_cond = ConditionVariable.new
31   @origin_counters = Hash.new(0)
32   @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new }
33 end

Public Instance methods

checkin_connection(connection)
[show source]
    # File lib/httpx/pool.rb
110 def checkin_connection(connection)
111   return if connection.options.io
112 
113   @connection_mtx.synchronize do
114     if connection.coalesced? || connection.state == :idle
115       # when connections coalesce
116       drop_connection(connection)
117 
118       return
119     end
120 
121     @connections << connection
122 
123     @max_connections_cond.signal
124     @origin_conds[connection.origin.to_s].signal
125 
126     # Observed situations where a session handling multiple requests in a loop
127     # across multiple threads checks the same connection in and out, while another
128     # thread which is waiting on the same connection never gets the chance to pick
129     # it up, because ruby's thread scheduler never switched on to it in the process.
130     Thread.pass
131   end
132 end
checkin_resolver(resolver)
[show source]
    # File lib/httpx/pool.rb
163 def checkin_resolver(resolver)
164   resolver_class = resolver.class
165 
166   resolver = resolver.multi
167 
168   # a multi requires all sub-resolvers being closed in order to be
169   # correctly checked back in.
170   return unless resolver.closed?
171 
172   @resolver_mtx.synchronize do
173     resolvers = @resolvers[resolver_class]
174 
175     resolvers << resolver unless resolvers.include?(resolver)
176   end
177 end
checkout_connection(uri, options)

opens a connection to the IP reachable through uri. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.

[show source]
    # File lib/httpx/pool.rb
 46 def checkout_connection(uri, options)
 47   return checkout_new_connection(uri, options) if options.io
 48 
 49   @connection_mtx.synchronize do
 50     acquire_connection(uri, options) || begin
 51       if @connections_counter == @max_connections
 52         # this takes precedence over per-origin
 53 
 54         expires_at = Utils.now + @pool_timeout
 55 
 56         loop do
 57           @max_connections_cond.wait(@connection_mtx, @pool_timeout)
 58 
 59           if (conn = acquire_connection(uri, options))
 60             return conn
 61           end
 62 
 63           # if one can afford to create a new connection, do it
 64           break unless @connections_counter == @max_connections
 65 
 66           # if no matching usable connection was found, the pool will make room and drop a closed connection.
 67           if (conn = @connections.find { |c| c.state == :closed })
 68             drop_connection(conn)
 69             break
 70           end
 71 
 72           # happens when a condition was signalled, but another thread snatched the available connection before
 73           # context was passed back here.
 74           next if Utils.now < expires_at
 75 
 76           raise PoolTimeoutError.new(@pool_timeout,
 77                                      "Timed out after #{@pool_timeout} seconds while waiting for a connection")
 78         end
 79 
 80       end
 81 
 82       if @origin_counters[uri.origin] == @max_connections_per_origin
 83 
 84         expires_at = Utils.now + @pool_timeout
 85 
 86         loop do
 87           @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout)
 88 
 89           if (conn = acquire_connection(uri, options))
 90             return conn
 91           end
 92 
 93           # happens when a condition was signalled, but another thread snatched the available connection before
 94           # context was passed back here.
 95           next if Utils.now < expires_at
 96 
 97           raise(PoolTimeoutError.new(@pool_timeout,
 98                                      "Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}"))
 99         end
100       end
101 
102       @connections_counter += 1
103       @origin_counters[uri.origin] += 1
104 
105       checkout_new_connection(uri, options)
106     end
107   end
108 end
checkout_mergeable_connection(connection)
[show source]
    # File lib/httpx/pool.rb
134 def checkout_mergeable_connection(connection)
135   return if connection.options.io
136 
137   @connection_mtx.synchronize do
138     idx = @connections.find_index do |ch|
139       ch != connection && ch.mergeable?(connection)
140     end
141     @connections.delete_at(idx) if idx
142   end
143 end
checkout_resolver(options)
[show source]
    # File lib/httpx/pool.rb
149 def checkout_resolver(options)
150   resolver_type = options.resolver_class
151   resolver_type = Resolver.resolver_for(resolver_type, options)
152 
153   @resolver_mtx.synchronize do
154     resolvers = @resolvers[resolver_type]
155 
156     idx = resolvers.find_index do |res|
157       res.options == options
158     end
159     resolvers.delete_at(idx) if idx
160   end || checkout_new_resolver(resolver_type, options)
161 end
inspect()

:nocov:

[show source]
    # File lib/httpx/pool.rb
180 def inspect
181   "#<#{self.class}:#{object_id} " \
182     "@max_connections=#{@max_connections} " \
183     "@max_connections_per_origin=#{@max_connections_per_origin} " \
184     "@pool_timeout=#{@pool_timeout} " \
185     "@connections=#{@connections.size}>"
186 end
pop_connection()

connections returned by this function are not expected to return to the connection pool.

[show source]
   # File lib/httpx/pool.rb
36 def pop_connection
37   @connection_mtx.synchronize do
38     drop_connection
39   end
40 end
reset_resolvers()
[show source]
    # File lib/httpx/pool.rb
145 def reset_resolvers
146   @resolver_mtx.synchronize { @resolvers.clear }
147 end