diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 1a93eb3..d3a4082 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -147,6 +147,13 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base # Path of file containing statement to execute config :statement_filepath, :validate => :path + # Statement to be executed after reading each record found. You can use values + # from `parameters`, plus the values from the record + config :post_statement, :validate => :string, :default => nil + + # Path of file containing statement to be executed after reading each record found + config :post_statement_filepath, :validate => :path + # Hash of query parameter, for example `{ "target_id" => "321" }` config :parameters, :validate => :hash, :default => {} @@ -227,6 +234,7 @@ def register end @statement = File.read(@statement_filepath) if @statement_filepath + @post_statement = File.read(@post_statement_filepath) if @post_statement_filepath if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") @@ -277,6 +285,13 @@ def execute_query(queue) event = LogStash::Event.new(row) decorate(event) queue << event + run_post_statement(row) + end + end + + def run_post_statement(row) + unless @post_statement.nil? + execute_update_statement(@post_statement, @parameters.merge(row)) end end diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 6ef403f..8c7ec2d 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -251,6 +251,28 @@ def execute_statement(statement, parameters) return success end + + public + def execute_update_statement(statement, parameters) + success = false + begin + parameters = symbolized_params(parameters) + open_jdbc_connection if @database == nil + query = @database[statement, parameters] + @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) + + query.update + + success = true + rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e + @logger.warn("Exception when executing JDBC query", :exception => e) + @logger.warn("Attempt reconnection.") + close_jdbc_connection() + open_jdbc_connection() + end + return success + end + public def get_column_value(row) if !row.has_key?(@tracking_column.to_sym) diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index c7784ee..c3c6324 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -157,6 +157,75 @@ end end + context "when passing post_statement" do + let(:settings) do + { + "statement" => "SELECT * FROM test" + } + end + + before do + db.create_table(:test) do + Integer(:id) + Integer(:flag) + end + db[:test].insert(:id => 42, :flag => 0) + plugin.register + end + + after do + plugin.stop + db.drop_table(:test) + end + + it "should update records from table" do + plugin.post_statement = "UPDATE test SET flag = 1 WHERE id = :id" + plugin.run(queue) + expect(queue.pop.get('id')).to eq(42) + + resultset = db[:test].all + expect(resultset.count).to eq(1) + resultset.each do |row| + expect(row[:id]).to eq(42) + expect(row[:flag]).to eq(1) + end + end + + + it "should delete records from table" do + plugin.post_statement = "DELETE FROM test WHERE id = :id" + plugin.run(queue) + expect(queue.pop.get('id')).to eq(42) + + resultset = db[:test].all + expect(resultset.count).to eq(0) + end + end + + context "when post_statement is passed in from a file" do + let(:post_statement) { "UPDATE dummy_table SET flag = 1 WHERE num = :num" } + let(:post_statement_filepath) { Stud::Temporary.pathname } + let(:settings) do + { + "post_statement_filepath" => post_statement_filepath, + "statement" => "SELECT * from dummy_table" + } + end + + before do + File.write(post_statement_filepath, post_statement) + plugin.register + end + + after do + plugin.stop + end + + it "should read in post_statement from file" do + expect(plugin.post_statement).to eq(post_statement) + end + end + context "when passing parameters" do let(:settings) do {