Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute statements atfer reading each record found by statement #221

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
# Path of file containing statement to execute
config :statement_filepath, :validate => :path

# Statement to be executed after reading each record found. You can use values
# from `parameters`, plus the values from the record
config :post_statement, :validate => :string, :default => nil

# Path of file containing statement to be executed after reading each record found
config :post_statement_filepath, :validate => :path

# Hash of query parameter, for example `{ "target_id" => "321" }`
config :parameters, :validate => :hash, :default => {}

Expand Down Expand Up @@ -227,6 +234,7 @@ def register
end

@statement = File.read(@statement_filepath) if @statement_filepath
@post_statement = File.read(@post_statement_filepath) if @post_statement_filepath

if (@jdbc_password_filepath and @jdbc_password)
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
Expand Down Expand Up @@ -277,6 +285,13 @@ def execute_query(queue)
event = LogStash::Event.new(row)
decorate(event)
queue << event
run_post_statement(row)
end
end

def run_post_statement(row)
unless @post_statement.nil?
execute_update_statement(@post_statement, @parameters.merge(row))
end
end

Expand Down
22 changes: 22 additions & 0 deletions lib/logstash/plugin_mixins/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,28 @@ def execute_statement(statement, parameters)
return success
end


public
def execute_update_statement(statement, parameters)
success = false
begin
parameters = symbolized_params(parameters)
open_jdbc_connection if @database == nil
query = @database[statement, parameters]
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)

query.update

success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
@logger.warn("Attempt reconnection.")
close_jdbc_connection()
open_jdbc_connection()
end
return success
end

public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)
Expand Down
69 changes: 69 additions & 0 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,75 @@
end
end

context "when passing post_statement" do
let(:settings) do
{
"statement" => "SELECT * FROM test"
}
end

before do
db.create_table(:test) do
Integer(:id)
Integer(:flag)
end
db[:test].insert(:id => 42, :flag => 0)
plugin.register
end

after do
plugin.stop
db.drop_table(:test)
end

it "should update records from table" do
plugin.post_statement = "UPDATE test SET flag = 1 WHERE id = :id"
plugin.run(queue)
expect(queue.pop.get('id')).to eq(42)

resultset = db[:test].all
expect(resultset.count).to eq(1)
resultset.each do |row|
expect(row[:id]).to eq(42)
expect(row[:flag]).to eq(1)
end
end


it "should delete records from table" do
plugin.post_statement = "DELETE FROM test WHERE id = :id"
plugin.run(queue)
expect(queue.pop.get('id')).to eq(42)

resultset = db[:test].all
expect(resultset.count).to eq(0)
end
end

context "when post_statement is passed in from a file" do
let(:post_statement) { "UPDATE dummy_table SET flag = 1 WHERE num = :num" }
let(:post_statement_filepath) { Stud::Temporary.pathname }
let(:settings) do
{
"post_statement_filepath" => post_statement_filepath,
"statement" => "SELECT * from dummy_table"
}
end

before do
File.write(post_statement_filepath, post_statement)
plugin.register
end

after do
plugin.stop
end

it "should read in post_statement from file" do
expect(plugin.post_statement).to eq(post_statement)
end
end

context "when passing parameters" do
let(:settings) do
{
Expand Down