diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index b7f4c629..a9400355 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -1,6 +1,7 @@ require 'logstash/namespace' require 'logstash/outputs/base' require 'java' +require 'concurrent/map' require 'logstash-integration-kafka_jars.rb' require 'logstash/plugin_mixins/kafka_support' @@ -50,6 +51,9 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base java_import org.apache.kafka.clients.producer.ProducerRecord + java_import org.apache.kafka.common.KafkaException + java_import org.apache.kafka.common.errors.RetriableException + java_import org.apache.kafka.common.errors.InterruptException include LogStash::PluginMixins::KafkaSupport @@ -187,7 +191,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base public def register - @thread_batch_map = Concurrent::Hash.new + @thread_batch_map = Concurrent::Map.new if !@retries.nil? if @retries < 0 @@ -201,33 +205,29 @@ def register @producer = create_producer if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| - write_to_kafka(event, data) + push_event_data(event, data) end elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| - write_to_kafka(event, data.to_java_bytes) + push_event_data(event, data.to_java_bytes) end else raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end end - def prepare(record) + def append_record(record) # This output is threadsafe, so we need to keep a batch per thread. - @thread_batch_map[Thread.current].add(record) + @thread_batch_map.get(Thread.current) << record end def multi_receive(events) - t = Thread.current - if !@thread_batch_map.include?(t) - @thread_batch_map[t] = java.util.ArrayList.new(events.size) - end + batch = @thread_batch_map.fetch_or_store(Thread.current) { Array.new(events.size).clear } events.each do |event| @codec.encode(event) end - batch = @thread_batch_map[t] if batch.any? retrying_send(batch) batch.clear @@ -255,18 +255,9 @@ def retrying_send(batch) futures = batch.collect do |record| begin - # send() can throw an exception even before the future is created. @producer.send(record) - rescue org.apache.kafka.common.errors.InterruptException, - org.apache.kafka.common.errors.RetriableException => e - logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message) - failures << record - nil - rescue org.apache.kafka.common.KafkaException => e - # This error is not retriable, drop event - # TODO: add DLQ support - logger.warn("producer send failed, dropping record",:exception => e.class, :message => e.message, - :record_value => record.value) + rescue => e # send() can throw an exception even before the future is created + failures << record if handle_kafka_error(e, record) == :retry nil end end @@ -277,18 +268,8 @@ def retrying_send(batch) begin future.get rescue java.util.concurrent.ExecutionException => e - # TODO(sissel): Add metric to count failures, possibly by exception type. - if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or - e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException - logger.info("producer send failed, will retry sending", :exception => e.cause.class, - :message => e.cause.message) - failures << batch[i] - elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException - # This error is not retriable, drop event - # TODO: add DLQ support - logger.warn("producer send failed, dropping record", :exception => e.cause.class, - :message => e.cause.message, :record_value => batch[i].value) - end + record = batch[i] + failures << record if handle_kafka_error(e.cause, record) == :retry end end end @@ -314,13 +295,29 @@ def close private - def write_to_kafka(event, serialized_data) + def handle_kafka_error(e, record) + # TODO(sissel): Add metric to count failures, possibly by exception type. + case e + when RetriableException, InterruptException + logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message) + return :retry + when KafkaException + # This error is not retriable, drop event + # TODO: add DLQ support + logger.warn("producer send failed, dropping record", :exception => e.class, :message => e.message, :record_value => record.value) + else + logger.error("producer send failed, unexpected exception", :exception => e.class, :message => e.message, :backtrace => e.backtrace) + raise e + end + end + + def push_event_data(event, serialized_data) if @message_key.nil? record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data) else record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data) end - prepare(record) + append_record(record) rescue LogStash::ShutdownSignal logger.debug('producer received shutdown signal') rescue => e @@ -369,8 +366,7 @@ def create_producer org.apache.kafka.clients.producer.KafkaProducer.new(props) rescue => e logger.error("Unable to create Kafka producer from given configuration", - :kafka_error_message => e, - :cause => e.respond_to?(:getCause) ? e.getCause() : nil) + :message => e.message, :exception => e.class, :cause => e.cause) raise e end end