Methods
Public Class
Public Instance
Constants
| POOL_TIMEOUT | = | 5 |
Public Class methods
new(options)
Sets up the connection pool with the given options, which can be the following:
| :max_connections |
the maximum number of connections held in the pool. |
| :max_connections_per_origin |
the maximum number of connections held in the pool pointing to a given origin. |
| :pool_timeout |
the number of seconds to wait for a connection to a given origin (before raising |
[show source]
# File lib/httpx/pool.rb 21 def initialize(options) 22 @max_connections = options.fetch(:max_connections, Float::INFINITY) 23 @max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY) 24 @pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT) 25 @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] } 26 @resolver_mtx = Thread::Mutex.new 27 @connections = [] 28 @connection_mtx = Thread::Mutex.new 29 @connections_counter = 0 30 @max_connections_cond = ConditionVariable.new 31 @origin_counters = Hash.new(0) 32 @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new } 33 end
Public Instance methods
checkin_connection(connection)
[show source]
# File lib/httpx/pool.rb 110 def checkin_connection(connection) 111 return if connection.options.io 112 113 @connection_mtx.synchronize do 114 if connection.coalesced? || connection.state == :idle 115 # when connections coalesce 116 drop_connection(connection) 117 118 return 119 end 120 121 @connections << connection 122 123 @max_connections_cond.signal 124 @origin_conds[connection.origin.to_s].signal 125 126 # Observed situations where a session handling multiple requests in a loop 127 # across multiple threads checks the same connection in and out, while another 128 # thread which is waiting on the same connection never gets the chance to pick 129 # it up, because ruby's thread scheduler never switched on to it in the process. 130 Thread.pass 131 end 132 end
checkin_resolver(resolver)
[show source]
# File lib/httpx/pool.rb 163 def checkin_resolver(resolver) 164 resolver_class = resolver.class 165 166 resolver = resolver.multi 167 168 # a multi requires all sub-resolvers being closed in order to be 169 # correctly checked back in. 170 return unless resolver.closed? 171 172 @resolver_mtx.synchronize do 173 resolvers = @resolvers[resolver_class] 174 175 resolvers << resolver unless resolvers.include?(resolver) 176 end 177 end
checkout_connection(uri, options)
opens a connection to the IP reachable through uri. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.
[show source]
# File lib/httpx/pool.rb 46 def checkout_connection(uri, options) 47 return checkout_new_connection(uri, options) if options.io 48 49 @connection_mtx.synchronize do 50 acquire_connection(uri, options) || begin 51 if @connections_counter == @max_connections 52 # this takes precedence over per-origin 53 54 expires_at = Utils.now + @pool_timeout 55 56 loop do 57 @max_connections_cond.wait(@connection_mtx, @pool_timeout) 58 59 if (conn = acquire_connection(uri, options)) 60 return conn 61 end 62 63 # if one can afford to create a new connection, do it 64 break unless @connections_counter == @max_connections 65 66 # if no matching usable connection was found, the pool will make room and drop a closed connection. 67 if (conn = @connections.find { |c| c.state == :closed }) 68 drop_connection(conn) 69 break 70 end 71 72 # happens when a condition was signalled, but another thread snatched the available connection before 73 # context was passed back here. 74 next if Utils.now < expires_at 75 76 raise PoolTimeoutError.new(@pool_timeout, 77 "Timed out after #{@pool_timeout} seconds while waiting for a connection") 78 end 79 80 end 81 82 if @origin_counters[uri.origin] == @max_connections_per_origin 83 84 expires_at = Utils.now + @pool_timeout 85 86 loop do 87 @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout) 88 89 if (conn = acquire_connection(uri, options)) 90 return conn 91 end 92 93 # happens when a condition was signalled, but another thread snatched the available connection before 94 # context was passed back here. 95 next if Utils.now < expires_at 96 97 raise(PoolTimeoutError.new(@pool_timeout, 98 "Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}")) 99 end 100 end 101 102 @connections_counter += 1 103 @origin_counters[uri.origin] += 1 104 105 checkout_new_connection(uri, options) 106 end 107 end 108 end
checkout_mergeable_connection(connection)
[show source]
# File lib/httpx/pool.rb 134 def checkout_mergeable_connection(connection) 135 return if connection.options.io 136 137 @connection_mtx.synchronize do 138 idx = @connections.find_index do |ch| 139 ch != connection && ch.mergeable?(connection) 140 end 141 @connections.delete_at(idx) if idx 142 end 143 end
checkout_resolver(options)
[show source]
# File lib/httpx/pool.rb 149 def checkout_resolver(options) 150 resolver_type = options.resolver_class 151 resolver_type = Resolver.resolver_for(resolver_type, options) 152 153 @resolver_mtx.synchronize do 154 resolvers = @resolvers[resolver_type] 155 156 idx = resolvers.find_index do |res| 157 res.options == options 158 end 159 resolvers.delete_at(idx) if idx 160 end || checkout_new_resolver(resolver_type, options) 161 end
inspect()
:nocov:
[show source]
# File lib/httpx/pool.rb 180 def inspect 181 "#<#{self.class}:#{object_id} " \ 182 "@max_connections=#{@max_connections} " \ 183 "@max_connections_per_origin=#{@max_connections_per_origin} " \ 184 "@pool_timeout=#{@pool_timeout} " \ 185 "@connections=#{@connections.size}>" 186 end
pop_connection()
connections returned by this function are not expected to return to the connection pool.
[show source]
# File lib/httpx/pool.rb 36 def pop_connection 37 @connection_mtx.synchronize do 38 drop_connection 39 end 40 end
reset_resolvers()
[show source]
# File lib/httpx/pool.rb 145 def reset_resolvers 146 @resolver_mtx.synchronize { @resolvers.clear } 147 end