diff --git a/lib/que/active_record/connection.rb b/lib/que/active_record/connection.rb index c6be8a18..55ef4965 100644 --- a/lib/que/active_record/connection.rb +++ b/lib/que/active_record/connection.rb @@ -3,6 +3,10 @@ module Que module ActiveRecord class << self + def active_rails_executor? + defined?(::Rails.application.executor) && ::Rails.application.executor.active? + end + def wrap_in_rails_executor(&block) if defined?(::Rails.application.executor) ::Rails.application.executor.wrap(&block) @@ -39,17 +43,32 @@ def call(job) yield end - # ActiveRecord will check out connections to the current thread when - # queries are executed and not return them to the pool until - # explicitly requested to. I'm not wild about this API design, and - # it doesn't pose a problem for the typical case of workers using a - # single PG connection (since we ensure that connection is checked - # in and checked out responsibly), but since ActiveRecord supports - # connections to multiple databases, it's easy for people using that - # feature to unknowingly leak connections to other databases. So, - # take the additional step of telling ActiveRecord to check in all - # of the current thread's connections after each job is run. - ::ActiveRecord::Base.clear_active_connections! unless job.class.resolve_que_setting(:run_synchronously) + clear_active_connections_if_needed!(job) + end + + private + + # ActiveRecord will check out connections to the current thread when + # queries are executed and not return them to the pool until + # explicitly requested to. I'm not wild about this API design, and + # it doesn't pose a problem for the typical case of workers using a + # single PG connection (since we ensure that connection is checked + # in and checked out responsibly), but since ActiveRecord supports + # connections to multiple databases, it's easy for people using that + # feature to unknowingly leak connections to other databases. So, + # take the additional step of telling ActiveRecord to check in all + # of the current thread's connections after each job is run. + def clear_active_connections_if_needed!(job) + # don't clean in synchronous mode + # see https://github.com/que-rb/que/pull/393 + return if job.class.resolve_que_setting(:run_synchronously) + + # don't clear connections in nested jobs, + # i.e. clear only if this is the outermost instance of + # Que::ActiveRecord::Connection::JobMiddleware + return if Que::ActiveRecord.active_rails_executor? + + ::ActiveRecord::Base.clear_active_connections! end end end diff --git a/spec/que/active_record/connection_spec.rb b/spec/que/active_record/connection_spec.rb index 21458d92..ed541632 100644 --- a/spec/que/active_record/connection_spec.rb +++ b/spec/que/active_record/connection_spec.rb @@ -62,5 +62,24 @@ def run(*args) assert SecondDatabaseModel.connection_handler.active_connections? end + + it "shouldn't clear connections to secondary DBs if within an active rails executor" do + # This is a hacky spec, but it's better than requiring Rails. + rails, application, executor = 3.times.map { Object.new } + application.define_singleton_method(:executor) { executor } + rails.define_singleton_method(:application) { application } + executor.define_singleton_method(:wrap) { |&block| block.call } + executor.define_singleton_method(:active?) { true } + + refute defined?(::Rails) + ::Rails = rails + + SecondDatabaseModelJob.run + + assert SecondDatabaseModel.connection_handler.active_connections? + + Object.send :remove_const, :Rails + refute defined?(::Rails) + end end end