class HTTPX::Pool

  1. lib/httpx/pool.rb
Superclass: Object

Constants

POOL_TIMEOUT = 5  

Public Class methods

new(options)

Sets up the connection pool with the given options, which can be the following:

:max_connections

the maximum number of connections held in the pool.

:max_connections_per_origin

the maximum number of connections held in the pool pointing to a given origin.

:pool_timeout

the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)

[show source]
   # File lib/httpx/pool.rb
21 def initialize(options)
22   @max_connections = options.fetch(:max_connections, Float::INFINITY)
23   @max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY)
24   @pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT)
25   @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] }
26   @resolver_mtx = Thread::Mutex.new
27   @connections = []
28   @connection_mtx = Thread::Mutex.new
29   @connections_counter = 0
30   @max_connections_cond = ConditionVariable.new
31   @origin_counters = Hash.new(0)
32   @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new }
33 end

Public Instance methods

checkin_connection(connection)
[show source]
    # File lib/httpx/pool.rb
110 def checkin_connection(connection)
111   return if connection.options.io
112 
113   @connection_mtx.synchronize do
114     if connection.coalesced? || connection.state == :idle
115       # when connections coalesce
116       drop_connection(connection)
117 
118       return
119     end
120 
121     @connections << connection
122 
123     @max_connections_cond.signal
124     @origin_conds[connection.origin.to_s].signal
125   end
126 end
checkin_resolver(resolver)
[show source]
    # File lib/httpx/pool.rb
157 def checkin_resolver(resolver)
158   resolver_class = resolver.class
159 
160   resolver = resolver.multi
161 
162   # a multi requires all sub-resolvers being closed in order to be
163   # correctly checked back in.
164   return unless resolver.closed?
165 
166   @resolver_mtx.synchronize do
167     resolvers = @resolvers[resolver_class]
168 
169     resolvers << resolver unless resolvers.include?(resolver)
170   end
171 end
checkout_connection(uri, options)

opens a connection to the IP reachable through uri. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.

[show source]
    # File lib/httpx/pool.rb
 46 def checkout_connection(uri, options)
 47   return checkout_new_connection(uri, options) if options.io
 48 
 49   @connection_mtx.synchronize do
 50     acquire_connection(uri, options) || begin
 51       if @connections_counter == @max_connections
 52         # this takes precedence over per-origin
 53 
 54         expires_at = Utils.now + @pool_timeout
 55 
 56         loop do
 57           @max_connections_cond.wait(@connection_mtx, @pool_timeout)
 58 
 59           if (conn = acquire_connection(uri, options))
 60             return conn
 61           end
 62 
 63           # if one can afford to create a new connection, do it
 64           break unless @connections_counter == @max_connections
 65 
 66           # if no matching usable connection was found, the pool will make room and drop a closed connection.
 67           if (conn = @connections.find { |c| c.state == :closed })
 68             drop_connection(conn)
 69             break
 70           end
 71 
 72           # happens when a condition was signalled, but another thread snatched the available connection before
 73           # context was passed back here.
 74           next if Utils.now < expires_at
 75 
 76           raise PoolTimeoutError.new(@pool_timeout,
 77                                      "Timed out after #{@pool_timeout} seconds while waiting for a connection")
 78         end
 79 
 80       end
 81 
 82       if @origin_counters[uri.origin] == @max_connections_per_origin
 83 
 84         expires_at = Utils.now + @pool_timeout
 85 
 86         loop do
 87           @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout)
 88 
 89           if (conn = acquire_connection(uri, options))
 90             return conn
 91           end
 92 
 93           # happens when a condition was signalled, but another thread snatched the available connection before
 94           # context was passed back here.
 95           next if Utils.now < expires_at
 96 
 97           raise(PoolTimeoutError.new(@pool_timeout,
 98                                      "Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}"))
 99         end
100       end
101 
102       @connections_counter += 1
103       @origin_counters[uri.origin] += 1
104 
105       checkout_new_connection(uri, options)
106     end
107   end
108 end
checkout_mergeable_connection(connection)
[show source]
    # File lib/httpx/pool.rb
128 def checkout_mergeable_connection(connection)
129   return if connection.options.io
130 
131   @connection_mtx.synchronize do
132     idx = @connections.find_index do |ch|
133       ch != connection && ch.mergeable?(connection)
134     end
135     @connections.delete_at(idx) if idx
136   end
137 end
checkout_resolver(options)
[show source]
    # File lib/httpx/pool.rb
143 def checkout_resolver(options)
144   resolver_type = options.resolver_class
145   resolver_type = Resolver.resolver_for(resolver_type, options)
146 
147   @resolver_mtx.synchronize do
148     resolvers = @resolvers[resolver_type]
149 
150     idx = resolvers.find_index do |res|
151       res.options == options
152     end
153     resolvers.delete_at(idx) if idx
154   end || checkout_new_resolver(resolver_type, options)
155 end
inspect()

:nocov:

[show source]
    # File lib/httpx/pool.rb
174 def inspect
175   "#<#{self.class}:#{object_id} " \
176     "@max_connections=#{@max_connections} " \
177     "@max_connections_per_origin=#{@max_connections_per_origin} " \
178     "@pool_timeout=#{@pool_timeout} " \
179     "@connections=#{@connections.size}>"
180 end
pop_connection()

connections returned by this function are not expected to return to the connection pool.

[show source]
   # File lib/httpx/pool.rb
36 def pop_connection
37   @connection_mtx.synchronize do
38     drop_connection
39   end
40 end
reset_resolvers()
[show source]
    # File lib/httpx/pool.rb
139 def reset_resolvers
140   @resolver_mtx.synchronize { @resolvers.clear }
141 end