Methods
Public Class
Public Instance
Constants
| POOL_TIMEOUT | = | 5 |
Public Class methods
new(options)
Sets up the connection pool with the given options, which can be the following:
| :max_connections |
the maximum number of connections held in the pool. |
| :max_connections_per_origin |
the maximum number of connections held in the pool pointing to a given origin. |
| :pool_timeout |
the number of seconds to wait for a connection to a given origin (before raising |
[show source]
# File lib/httpx/pool.rb 21 def initialize(options) 22 @max_connections = options.fetch(:max_connections, Float::INFINITY) 23 @max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY) 24 @pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT) 25 @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] } 26 @resolver_mtx = Thread::Mutex.new 27 @connections = [] 28 @connection_mtx = Thread::Mutex.new 29 @connections_counter = 0 30 @max_connections_cond = ConditionVariable.new 31 @origin_counters = Hash.new(0) 32 @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new } 33 end
Public Instance methods
checkin_connection(connection)
[show source]
# File lib/httpx/pool.rb 110 def checkin_connection(connection) 111 return if connection.options.io 112 113 @connection_mtx.synchronize do 114 if connection.coalesced? || connection.state == :idle 115 # when connections coalesce 116 drop_connection(connection) 117 118 return 119 end 120 121 @connections << connection 122 123 @max_connections_cond.signal 124 @origin_conds[connection.origin.to_s].signal 125 end 126 end
checkin_resolver(resolver)
[show source]
# File lib/httpx/pool.rb 157 def checkin_resolver(resolver) 158 resolver_class = resolver.class 159 160 resolver = resolver.multi 161 162 # a multi requires all sub-resolvers being closed in order to be 163 # correctly checked back in. 164 return unless resolver.closed? 165 166 @resolver_mtx.synchronize do 167 resolvers = @resolvers[resolver_class] 168 169 resolvers << resolver unless resolvers.include?(resolver) 170 end 171 end
checkout_connection(uri, options)
opens a connection to the IP reachable through uri. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.
[show source]
# File lib/httpx/pool.rb 46 def checkout_connection(uri, options) 47 return checkout_new_connection(uri, options) if options.io 48 49 @connection_mtx.synchronize do 50 acquire_connection(uri, options) || begin 51 if @connections_counter == @max_connections 52 # this takes precedence over per-origin 53 54 expires_at = Utils.now + @pool_timeout 55 56 loop do 57 @max_connections_cond.wait(@connection_mtx, @pool_timeout) 58 59 if (conn = acquire_connection(uri, options)) 60 return conn 61 end 62 63 # if one can afford to create a new connection, do it 64 break unless @connections_counter == @max_connections 65 66 # if no matching usable connection was found, the pool will make room and drop a closed connection. 67 if (conn = @connections.find { |c| c.state == :closed }) 68 drop_connection(conn) 69 break 70 end 71 72 # happens when a condition was signalled, but another thread snatched the available connection before 73 # context was passed back here. 74 next if Utils.now < expires_at 75 76 raise PoolTimeoutError.new(@pool_timeout, 77 "Timed out after #{@pool_timeout} seconds while waiting for a connection") 78 end 79 80 end 81 82 if @origin_counters[uri.origin] == @max_connections_per_origin 83 84 expires_at = Utils.now + @pool_timeout 85 86 loop do 87 @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout) 88 89 if (conn = acquire_connection(uri, options)) 90 return conn 91 end 92 93 # happens when a condition was signalled, but another thread snatched the available connection before 94 # context was passed back here. 95 next if Utils.now < expires_at 96 97 raise(PoolTimeoutError.new(@pool_timeout, 98 "Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}")) 99 end 100 end 101 102 @connections_counter += 1 103 @origin_counters[uri.origin] += 1 104 105 checkout_new_connection(uri, options) 106 end 107 end 108 end
checkout_mergeable_connection(connection)
[show source]
# File lib/httpx/pool.rb 128 def checkout_mergeable_connection(connection) 129 return if connection.options.io 130 131 @connection_mtx.synchronize do 132 idx = @connections.find_index do |ch| 133 ch != connection && ch.mergeable?(connection) 134 end 135 @connections.delete_at(idx) if idx 136 end 137 end
checkout_resolver(options)
[show source]
# File lib/httpx/pool.rb 143 def checkout_resolver(options) 144 resolver_type = options.resolver_class 145 resolver_type = Resolver.resolver_for(resolver_type, options) 146 147 @resolver_mtx.synchronize do 148 resolvers = @resolvers[resolver_type] 149 150 idx = resolvers.find_index do |res| 151 res.options == options 152 end 153 resolvers.delete_at(idx) if idx 154 end || checkout_new_resolver(resolver_type, options) 155 end
inspect()
:nocov:
[show source]
# File lib/httpx/pool.rb 174 def inspect 175 "#<#{self.class}:#{object_id} " \ 176 "@max_connections=#{@max_connections} " \ 177 "@max_connections_per_origin=#{@max_connections_per_origin} " \ 178 "@pool_timeout=#{@pool_timeout} " \ 179 "@connections=#{@connections.size}>" 180 end
pop_connection()
connections returned by this function are not expected to return to the connection pool.
[show source]
# File lib/httpx/pool.rb 36 def pop_connection 37 @connection_mtx.synchronize do 38 drop_connection 39 end 40 end
reset_resolvers()
[show source]
# File lib/httpx/pool.rb 139 def reset_resolvers 140 @resolver_mtx.synchronize { @resolvers.clear } 141 end