class Sequel::ThreadedConnectionPool
A connection pool allowing multi-threaded access to a pool of connections. This is the default connection pool used by Sequel
.
Constants
- USE_WAITER
Attributes
A hash with thread keys and connection values for currently allocated connections. The calling code should already have the mutex before calling this.
An array of connections that are available for use by the pool. The calling code should already have the mutex before calling this.
The maximum number of connections this pool will create (per shard/server if sharding).
Public Class Methods
The following additional options are respected:
- :max_connections
-
The maximum number of connections the connection pool will open (default 4)
- :pool_timeout
-
The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)
Sequel::ConnectionPool::new
# File lib/sequel/connection_pool/threaded.rb 26 def initialize(db, opts = OPTS) 27 super 28 @max_size = Integer(opts[:max_connections] || 4) 29 raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1 30 @mutex = Mutex.new 31 @connection_handling = opts[:connection_handling] 32 @available_connections = [] 33 @allocated = {} 34 @timeout = Float(opts[:pool_timeout] || 5) 35 @waiter = ConditionVariable.new 36 end
Public Instance Methods
Yield all of the available connections, and the one currently allocated to this thread. This will not yield connections currently allocated to other threads, as it is not safe to operate on them. This holds the mutex while it is yielding all of the available connections, which means that until the method's block returns, the pool is locked.
# File lib/sequel/connection_pool/threaded.rb 43 def all_connections 44 hold do |c| 45 sync do 46 yield c 47 @available_connections.each{|conn| yield conn} 48 end 49 end 50 end
Removes all connections currently available, optionally yielding each connection to the given block. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. If you want to be able to disconnect connections that are currently in use, use the ShardedThreadedConnectionPool, which can do that. This connection pool does not, for performance reasons. To use the sharded pool, pass the servers: {}
option when connecting to the database.
Once a connection is requested using hold
, the connection pool creates new connections to the database.
# File lib/sequel/connection_pool/threaded.rb 62 def disconnect(opts=OPTS) 63 conns = nil 64 sync do 65 conns = @available_connections.dup 66 @available_connections.clear 67 @waiter.signal 68 end 69 conns.each{|conn| disconnect_connection(conn)} 70 end
Chooses the first available connection, or if none are available, creates a new connection. Passes the connection to the supplied block:
pool.hold {|conn| conn.execute('DROP TABLE posts')}
Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.
If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.
# File lib/sequel/connection_pool/threaded.rb 85 def hold(server=nil) 86 t = Thread.current 87 if conn = owned_connection(t) 88 return yield(conn) 89 end 90 begin 91 conn = acquire(t) 92 yield conn 93 rescue Sequel::DatabaseDisconnectError, *@error_classes => e 94 if disconnect_error?(e) 95 oconn = conn 96 conn = nil 97 disconnect_connection(oconn) if oconn 98 sync do 99 @allocated.delete(t) 100 @waiter.signal 101 end 102 end 103 raise 104 ensure 105 if conn 106 sync{release(t)} 107 if @connection_handling == :disconnect 108 disconnect_connection(conn) 109 end 110 end 111 end 112 end
# File lib/sequel/connection_pool/threaded.rb 114 def pool_type 115 :threaded 116 end
The total number of connections opened, either available or allocated. The calling code should not have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 120 def size 121 @mutex.synchronize{_size} 122 end
Private Instance Methods
The total number of connections opened, either available or allocated. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 128 def _size 129 @allocated.length + @available_connections.length 130 end
Assigns a connection to the supplied thread, if one is available. The calling code should NOT already have the mutex when calling this.
This should return a connection is one is available within the timeout, or nil if a connection could not be acquired within the timeout.
# File lib/sequel/connection_pool/threaded.rb 138 def acquire(thread) 139 if conn = assign_connection(thread) 140 return conn 141 end 142 143 timeout = @timeout 144 timer = Sequel.start_timer 145 146 sync do 147 @waiter.wait(@mutex, timeout) 148 if conn = next_available 149 return(@allocated[thread] = conn) 150 end 151 end 152 153 until conn = assign_connection(thread) 154 elapsed = Sequel.elapsed_seconds_since(timer) 155 raise_pool_timeout(elapsed) if elapsed > timeout 156 157 # :nocov: 158 # It's difficult to get to this point, it can only happen if there is a race condition 159 # where a connection cannot be acquired even after the thread is signalled by the condition variable 160 sync do 161 @waiter.wait(@mutex, timeout - elapsed) 162 if conn = next_available 163 return(@allocated[thread] = conn) 164 end 165 end 166 # :nocov: 167 end 168 169 conn 170 end
Assign a connection to the thread, or return nil if one cannot be assigned. The caller should NOT have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 174 def assign_connection(thread) 175 # Thread safe as instance variable is only assigned to local variable 176 # and not operated on outside mutex. 177 allocated = @allocated 178 do_make_new = false 179 to_disconnect = nil 180 181 sync do 182 if conn = next_available 183 return(allocated[thread] = conn) 184 end 185 186 if (n = _size) >= (max = @max_size) 187 allocated.keys.each do |t| 188 unless t.alive? 189 (to_disconnect ||= []) << allocated.delete(t) 190 end 191 end 192 n = nil 193 end 194 195 if (n || _size) < max 196 do_make_new = allocated[thread] = true 197 end 198 end 199 200 if to_disconnect 201 to_disconnect.each{|dconn| disconnect_connection(dconn)} 202 end 203 204 # Connect to the database outside of the connection pool mutex, 205 # as that can take a long time and the connection pool mutex 206 # shouldn't be locked while the connection takes place. 207 if do_make_new 208 begin 209 conn = make_new(:default) 210 sync{allocated[thread] = conn} 211 ensure 212 unless conn 213 sync{allocated.delete(thread)} 214 end 215 end 216 end 217 218 conn 219 end
Return a connection to the pool of available connections, returns the connection. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 223 def checkin_connection(conn) 224 @available_connections << conn 225 conn 226 end
Return the next available connection in the pool, or nil if there is not currently an available connection. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 231 def next_available 232 case @connection_handling 233 when :stack 234 @available_connections.pop 235 else 236 @available_connections.shift 237 end 238 end
Returns the connection owned by the supplied thread, if any. The calling code should NOT already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 242 def owned_connection(thread) 243 sync{@allocated[thread]} 244 end
Create the maximum number of connections immediately. The calling code should NOT have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 248 def preconnect(concurrent = false) 249 enum = (max_size - _size).times 250 251 conns = if concurrent 252 enum.map{Thread.new{make_new(:default)}}.map(&:value) 253 else 254 enum.map{make_new(:default)} 255 end 256 257 sync{conns.each{|conn| checkin_connection(conn)}} 258 end
Raise a PoolTimeout error showing the current timeout, the elapsed time, and the database's name (if any).
# File lib/sequel/connection_pool/threaded.rb 262 def raise_pool_timeout(elapsed) 263 name = db.opts[:name] 264 raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}" 265 end
Releases the connection assigned to the supplied thread back to the pool. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 269 def release(thread) 270 conn = @allocated.delete(thread) 271 272 unless @connection_handling == :disconnect 273 checkin_connection(conn) 274 end 275 276 @waiter.signal 277 nil 278 end
Yield to the block while inside the mutex. The calling code should NOT already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 282 def sync 283 @mutex.synchronize{yield} 284 end