Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #411 Wrap job invocation in Rails executor when using ActiveRecord #412

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions lib/que/active_record/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,72 @@

module Que
module ActiveRecord
class << self
def active_rails_executor?
defined?(::Rails.application.executor) && ::Rails.application.executor.active?
end

def wrap_in_rails_executor(&block)
if defined?(::Rails.application.executor)
::Rails.application.executor.wrap(&block)
else
yield
end
end
end

module Connection
class << self
private

# Check out a PG::Connection object from ActiveRecord's pool.
def checkout
wrap_in_rails_executor do
# Use Rails' executor (if present) to make sure that the connection
# we're using isn't taken from us while the block runs. See
# https://github.com/que-rb/que/issues/166#issuecomment-274218910
Que::ActiveRecord.wrap_in_rails_executor do
::ActiveRecord::Base.connection_pool.with_connection do |conn|
yield conn.raw_connection
end
end
end

# Use Rails' executor (if present) to make sure that the connection
# we're using isn't taken from us while the block runs. See
# https://github.com/que-rb/que/issues/166#issuecomment-274218910
def wrap_in_rails_executor(&block)
if defined?(::Rails.application.executor)
::Rails.application.executor.wrap(&block)
else
yield
end
end
end

module JobMiddleware
class << self
def call(job)
yield

# ActiveRecord will check out connections to the current thread when
# queries are executed and not return them to the pool until
# explicitly requested to. I'm not wild about this API design, and
# it doesn't pose a problem for the typical case of workers using a
# single PG connection (since we ensure that connection is checked
# in and checked out responsibly), but since ActiveRecord supports
# connections to multiple databases, it's easy for people using that
# feature to unknowingly leak connections to other databases. So,
# take the additional step of telling ActiveRecord to check in all
# of the current thread's connections after each job is run.
# Use Rails' executor (if present) to make sure that the connection
# used by the job isn't returned to the pool prematurely. See
# https://github.com/que-rb/que/issues/411
Que::ActiveRecord.wrap_in_rails_executor do
yield
end

clear_active_connections_if_needed!(job)
end

private

# ActiveRecord will check out connections to the current thread when
# queries are executed and not return them to the pool until
# explicitly requested to. I'm not wild about this API design, and
# it doesn't pose a problem for the typical case of workers using a
# single PG connection (since we ensure that connection is checked
# in and checked out responsibly), but since ActiveRecord supports
# connections to multiple databases, it's easy for people using that
# feature to unknowingly leak connections to other databases. So,
# take the additional step of telling ActiveRecord to check in all
# of the current thread's connections after each job is run.
def clear_active_connections_if_needed!(job)
# don't clean in synchronous mode
# see https://github.com/que-rb/que/pull/393
return if job.class.resolve_que_setting(:run_synchronously)

# don't clear connections in nested jobs executed synchronously
# i.e. while we're still inside of the rails executor
# see https://github.com/que-rb/que/pull/412#issuecomment-2194412783
return if Que::ActiveRecord.active_rails_executor?

if ::ActiveRecord.version >= Gem::Version.new('7.1')
::ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
else
Expand Down
19 changes: 19 additions & 0 deletions spec/que/active_record/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,24 @@ def run(*args)

assert SecondDatabaseModel.connection_handler.active_connections?
end

it "shouldn't clear connections to secondary DBs if within an active rails executor" do
# This is a hacky spec, but it's better than requiring Rails.
rails, application, executor = 3.times.map { Object.new }
application.define_singleton_method(:executor) { executor }
rails.define_singleton_method(:application) { application }
executor.define_singleton_method(:wrap) { |&block| block.call }
executor.define_singleton_method(:active?) { true }

refute defined?(::Rails)
::Rails = rails

SecondDatabaseModelJob.run

assert SecondDatabaseModel.connection_handler.active_connections?

Object.send :remove_const, :Rails
refute defined?(::Rails)
end
end
end
Loading