Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: block less while queue-ing output records #68

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 33 additions & 37 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'logstash/namespace'
require 'logstash/outputs/base'
require 'java'
require 'concurrent/map'
require 'logstash-integration-kafka_jars.rb'
require 'logstash/plugin_mixins/kafka_support'

Expand Down Expand Up @@ -50,6 +51,9 @@
class LogStash::Outputs::Kafka < LogStash::Outputs::Base

java_import org.apache.kafka.clients.producer.ProducerRecord
java_import org.apache.kafka.common.KafkaException
java_import org.apache.kafka.common.errors.RetriableException
java_import org.apache.kafka.common.errors.InterruptException

include LogStash::PluginMixins::KafkaSupport

Expand Down Expand Up @@ -187,7 +191,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base

public
def register
@thread_batch_map = Concurrent::Hash.new
@thread_batch_map = Concurrent::Map.new

if [email protected]?
if @retries < 0
Expand All @@ -201,33 +205,29 @@ def register
@producer = create_producer
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data)
push_event_data(event, data)
end
elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data.to_java_bytes)
push_event_data(event, data.to_java_bytes)
end
else
raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
end
end

def prepare(record)
def append_record(record)
# This output is threadsafe, so we need to keep a batch per thread.
@thread_batch_map[Thread.current].add(record)
@thread_batch_map.get(Thread.current) << record
end

def multi_receive(events)
t = Thread.current
if !@thread_batch_map.include?(t)
@thread_batch_map[t] = java.util.ArrayList.new(events.size)
end
batch = @thread_batch_map.fetch_or_store(Thread.current) { Array.new(events.size).clear }

events.each do |event|
@codec.encode(event)
end

batch = @thread_batch_map[t]
if batch.any?
retrying_send(batch)
batch.clear
Expand Down Expand Up @@ -255,18 +255,9 @@ def retrying_send(batch)

futures = batch.collect do |record|
begin
# send() can throw an exception even before the future is created.
@producer.send(record)
rescue org.apache.kafka.common.errors.InterruptException,
org.apache.kafka.common.errors.RetriableException => e
logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message)
failures << record
nil
rescue org.apache.kafka.common.KafkaException => e
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record",:exception => e.class, :message => e.message,
:record_value => record.value)
rescue => e # send() can throw an exception even before the future is created
failures << record if handle_kafka_error(e, record) == :retry
nil
end
end
Expand All @@ -277,18 +268,8 @@ def retrying_send(batch)
begin
future.get
rescue java.util.concurrent.ExecutionException => e
# TODO(sissel): Add metric to count failures, possibly by exception type.
if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or
e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException
logger.info("producer send failed, will retry sending", :exception => e.cause.class,
:message => e.cause.message)
failures << batch[i]
elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record", :exception => e.cause.class,
:message => e.cause.message, :record_value => batch[i].value)
end
record = batch[i]
failures << record if handle_kafka_error(e.cause, record) == :retry
end
end
end
Expand All @@ -314,13 +295,29 @@ def close

private

def write_to_kafka(event, serialized_data)
def handle_kafka_error(e, record)
# TODO(sissel): Add metric to count failures, possibly by exception type.
case e
when RetriableException, InterruptException
logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message)
return :retry
when KafkaException
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record", :exception => e.class, :message => e.message, :record_value => record.value)
else
logger.error("producer send failed, unexpected exception", :exception => e.class, :message => e.message, :backtrace => e.backtrace)
raise e
end
end

def push_event_data(event, serialized_data)
if @message_key.nil?
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
else
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
end
prepare(record)
append_record(record)
rescue LogStash::ShutdownSignal
logger.debug('producer received shutdown signal')
rescue => e
Expand Down Expand Up @@ -369,8 +366,7 @@ def create_producer
org.apache.kafka.clients.producer.KafkaProducer.new(props)
rescue => e
logger.error("Unable to create Kafka producer from given configuration",
:kafka_error_message => e,
:cause => e.respond_to?(:getCause) ? e.getCause() : nil)
:message => e.message, :exception => e.class, :cause => e.cause)
raise e
end
end
Expand Down