Implements the selector loop, where it registers and monitors “Selectable” objects.
A Selectable object is an object which can calculate the interests (:r
, :w
or :rw
, respectively “read”, “write” or “read-write”) it wants to monitor for, and returns (via to_io
method) an IO object which can be passed to functions such as IO.select . More exhaustively, a Selectable must implement the following methods:
state |
returns the state as a Symbol, must return |
to_io |
returns the IO object. |
call |
gets called when the IO is ready. |
interests |
returns the current interests to monitor for, as described above. |
timeout |
returns nil or an integer, representing how long to wait for interests. |
handle_socket_timeout(Numeric) |
called when waiting for interest times out. |
Methods
Public Class
Public Instance
Public Class methods
# File lib/httpx/selector.rb 34 def initialize 35 @timers = Timers.new 36 @selectables = [] 37 @is_timer_interval = false 38 end
Public Instance methods
deregisters io
from selectables.
# File lib/httpx/selector.rb 127 def deregister(io) 128 @selectables.delete(io) 129 end
# File lib/httpx/selector.rb 40 def each(&blk) 41 @selectables.each(&blk) 42 end
# File lib/httpx/selector.rb 101 def each_connection(&block) 102 return enum_for(__method__) unless block 103 104 @selectables.each do |c| 105 case c 106 when Resolver::Resolver 107 c.each_connection(&block) 108 when Connection 109 yield c 110 end 111 end 112 end
# File lib/httpx/selector.rb 114 def find_connection(request_uri, options) 115 each_connection.find do |connection| 116 connection.match?(request_uri, options) 117 end 118 end
# File lib/httpx/selector.rb 120 def find_mergeable_connection(connection) 121 each_connection.find do |ch| 122 ch != connection && ch.mergeable?(connection) 123 end 124 end
# File lib/httpx/selector.rb 93 def find_resolver(options) 94 res = @selectables.find do |c| 95 c.is_a?(Resolver::Resolver) && options == c.options 96 end 97 98 res.multi if res 99 end
# File lib/httpx/selector.rb 44 def next_tick 45 catch(:jump_tick) do 46 timeout = next_timeout 47 if timeout && timeout.negative? 48 @timers.fire 49 throw(:jump_tick) 50 end 51 52 begin 53 select(timeout) do |c| 54 c.log(level: 2) { "[#{c.state}] selected#{" after #{timeout} secs" unless timeout.nil?}..." } 55 56 c.call 57 end 58 59 @timers.fire 60 rescue TimeoutError => e 61 @timers.fire(e) 62 end 63 end 64 rescue StandardError => e 65 each_connection do |c| 66 c.emit(:error, e) 67 end 68 rescue Exception # rubocop:disable Lint/RescueException 69 each_connection do |conn| 70 conn.force_reset 71 conn.disconnect 72 end 73 74 raise 75 end
register io
.
# File lib/httpx/selector.rb 132 def register(io) 133 return if @selectables.include?(io) 134 135 @selectables << io 136 end
# File lib/httpx/selector.rb 77 def terminate 78 # array may change during iteration 79 selectables = @selectables.reject(&:inflight?) 80 81 selectables.delete_if do |sel| 82 sel.terminate 83 sel.state == :closed 84 end 85 86 until selectables.empty? 87 next_tick 88 89 selectables &= @selectables 90 end 91 end