Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ output plugin does not engage backpressure when queue is full #58

Closed
IvanRibakov opened this issue Mar 20, 2024 · 9 comments
Closed
Labels
bug Something isn't working

Comments

@IvanRibakov
Copy link

IvanRibakov commented Mar 20, 2024

Logstash information:

  1. Logstash version:
    # bin/logstash --version
    Using bundled JDK: /usr/share/logstash/jdk
    logstash 8.12.2
    
  2. Logstash installation source: logstash:8.12.2 Docker image
  3. How is Logstash being run: as a service managed by a Docker Compose
  4. How was the Logstash Plugin installed: I did not install it explicitly so I'm assuming a default version shipped with the Docker image is used

OS version

Docker host:

Linux pop-os 6.6.10-76060610-generic #202401051437~1709085277~22.04~31d73d8 SMP PREEMPT_DYNAMIC Wed F x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

I have configured queue with following arguments:

"arguments": {
    "x-queue-type": "classic",
    "x-max-length": 1000,
    "x-overflow": "reject-publish"
},

Expected behaviour:

  1. Upon reaching queue limit of 1000 enqueued messages, Logstash stops publishing
  2. Once space on the queue is freed up, Logstash resumes publishing

Observed behaviour:

  1. Logstash ignores queue being full and publishes all messages anyway, leading to a data loss

Related materials:

Steps to reproduce:

  1. docker compose up
  2. Wait for services to start up
  3. Navigate to RabbitMQ management console (http://<container_ip>:15672/#/queues/%2F/i3logs), browse i3logs Queue stats, confirm that queue has 1000 messages in the "Ready" state
  4. Use "Get messages" section at the bottom of the queue page to manually remove some messages (Ack mode: Reject requeue false, Messages: 100)
  5. Observe that number of "Ready" messages dropped to 900 and does NOT go back up 1000 again

Docker compose service definition:

version: '3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: rabbitmq
    volumes:
      - ./rabbitmq_conf/definitions.json:/etc/rabbitmq/definitions.json
      - ./rabbitmq_conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 10s
      timeout: 10s
      retries: 10

  logstash:
    depends_on:
      rabbitmq:
       condition: service_healthy
    image: logstash:8.12.2
    user: root
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
      - ./logs:/var/log
    environment:
      - xpack.monitoring.enabled=false
      - LOG_LEVEL=info
rabbitmq_conf/definitions.json
{
    "rabbit_version": "3.6.15",
    "users": [
        {
            "name": "guest",
            "password_hash": "roeR8CMxbpWbDUzBwN7eQ+rdnnG6UfwICGG1smu1GdssyyQ/",
            "hashing_algorithm": "rabbit_password_hashing_sha256",
            "tags": "administrator"
        }
    ],
    "vhosts": [
        {
            "name": "/"
        }
    ],
    "permissions": [
        {
            "user": "guest",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        }
    ],
    "parameters": [],
    "global_parameters": [],
    "policies": [],
    "queues": [
        {
            "arguments": {
                "x-queue-type": "classic",
                "x-max-length": 1000,
                "x-overflow": "reject-publish"
            },
            "auto_delete": false,
            "durable": true,
            "name": "i3logs",
            "type": "classic",
            "vhost": "/"
        }
    ],
    "exchanges": [
        {
            "arguments": {},
            "auto_delete": false,
            "durable": true,
            "name": "i3logs",
            "type": "fanout",
            "vhost": "/"
        }
    ],
    "bindings": [
        {
            "arguments": {},
            "destination": "i3logs",
            "destination_type": "queue",
            "routing_key": "logstash",
            "source": "i3logs",
            "vhost": "/"
        }
    ]
}
rabbitmq_conf/rabbitmq.conf
loopback_users.guest = false
listeners.tcp.default = 5672
management.listener.port = 15672
management.listener.ssl = false
management.load_definitions = /etc/rabbitmq/definitions.json
log.console.level = info
logstash.conf
input {
  file {
    path => "/var/log/myproduct.stdout.log"
    start_position => "beginning"
  }
}

output {
  rabbitmq {
    exchange => "i3logs"
    exchange_type => "fanout"
    host => "rabbitmq"
    port => 5672
    persistent => true
    user => "guest"
    password => "guest"
    vhost => "/"
    codec => "json"
  }
}

Provide logs (if relevant):

INFO logs
logstash-1  | 2024/03/20 06:51:51 Setting 'xpack.monitoring.enabled' from environment.
logstash-1  | 2024/03/20 06:51:51 Setting 'log.level' from environment.
logstash-1  | Using bundled JDK: /usr/share/logstash/jdk
logstash-1  | /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
logstash-1  | /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
logstash-1  | Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
logstash-1  | [2024-03-20T06:52:03,576][WARN ][deprecation.logstash.runner] NOTICE: Running Logstash as superuser is not recommended and won't be allowed in the future. Set 'allow_superuser' to 'false' to avoid startup errors in future releases.
logstash-1  | [2024-03-20T06:52:03,583][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
logstash-1  | [2024-03-20T06:52:03,584][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.12.2", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
logstash-1  | [2024-03-20T06:52:03,585][INFO ][logstash.runner          ] JVM bootstrap flags: [-XX:+HeapDumpOnOutOfMemoryError, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, -Djruby.regexp.interruptible=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11, -Dlog4j2.isThreadContextMapInheritable=true, -Xms1g, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Djdk.io.File.enableADS=true, -Dfile.encoding=UTF-8, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, -Djruby.compile.invokedynamic=true, -Xmx1g, -Djava.security.egd=file:/dev/urandom, -Djava.awt.headless=true, -Dls.cgroup.cpuacct.path.override=/, -Dls.cgroup.cpu.path.override=/, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED]
logstash-1  | [2024-03-20T06:52:03,587][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `200000000`
logstash-1  | [2024-03-20T06:52:03,587][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
logstash-1  | [2024-03-20T06:52:03,593][INFO ][logstash.settings        ] Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
logstash-1  | [2024-03-20T06:52:03,595][INFO ][logstash.settings        ] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
logstash-1  | [2024-03-20T06:52:03,769][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"0dbd11ae-c44d-4c81-b50c-796472497f17", :path=>"/usr/share/logstash/data/uuid"}
logstash-1  | [2024-03-20T06:52:04,282][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
rabbitmq-1  | 2024-03-20 06:52:04.394571+00:00 [info] <0.886.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
rabbitmq-1  | 2024-03-20 06:52:04.394839+00:00 [info] <0.886.0> Successfully synced tables from a peer
logstash-1  | [2024-03-20T06:52:04,620][INFO ][org.reflections.Reflections] Reflections took 91 ms to scan 1 urls, producing 132 keys and 468 values
logstash-1  | [2024-03-20T06:52:04,807][INFO ][logstash.codecs.json     ] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
logstash-1  | [2024-03-20T06:52:04,831][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
rabbitmq-1  | 2024-03-20 06:52:04.877477+00:00 [info] <0.894.0> accepting AMQP connection <0.894.0> (10.22.99.3:52328 -> 10.22.99.2:5672)
rabbitmq-1  | 2024-03-20 06:52:04.898726+00:00 [info] <0.894.0> connection <0.894.0> (10.22.99.3:52328 -> 10.22.99.2:5672): user 'guest' authenticated and granted access to vhost '/'
logstash-1  | [2024-03-20T06:52:04,913][INFO ][logstash.outputs.rabbitmq][main] Connected to RabbitMQ {:url=>"amqp://guest:XXXXXX@localhost:5672/"}
logstash-1  | [2024-03-20T06:52:04,945][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["/usr/share/logstash/pipeline/logstash.conf"], :thread=>"#<Thread:0x61765f71 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
logstash-1  | [2024-03-20T06:52:05,472][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.53}
logstash-1  | [2024-03-20T06:52:05,480][INFO ][logstash.inputs.file     ][main] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/usr/share/logstash/data/plugins/inputs/file/.sincedb_08cfe5e821a4884a8b77971020dcc599", :path=>["/var/log/myproduct.log"]}
logstash-1  | [2024-03-20T06:52:05,481][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
logstash-1  | [2024-03-20T06:52:05,486][INFO ][filewatch.observingtail  ][main][7ad47ad9b8977afed9528ba0b335f1a77be695b9c7380d30afa97c0b7c37656b] START, creating Discoverer, Watch with file and sincedb collections
logstash-1  | [2024-03-20T06:52:05,489][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
@IvanRibakov IvanRibakov added the bug Something isn't working label Mar 20, 2024
@IvanRibakov
Copy link
Author

IvanRibakov commented Apr 1, 2024

Any chance any of the maintainers (@edmocosta @mashhurs perhaps?) could confirm the bugreport or at least acknowledge it? Thanks in advance!

@IvanRibakov
Copy link
Author

Just realised that there is an open PR that possibly addresses this very same issue: #57

Any reason why this is not getting any traction from the Logstash/Logstash Plugins teams?

@mashhurs
Copy link
Contributor

@IvanRibakov can you share debug or trace level of logs? The attached logs with info level only shows until pipeline started, rest is dark.

@IvanRibakov
Copy link
Author

Hi, @mashhurs. I'm not sure if you wanted more details from the logstash.outputs.rabbitmq plugin only or from the entire Logstash. Attached is a slightly trimmed log file from running Logstash with LOG_LEVEL=trace. I've removed the actual contents of the input file ingested by logstash.inputs.file and intermediate lines returned by filewatch.tailmode.processor since I'm not appending to the input file used for testing.
logstash.log

P.S. I recently learned about RabbitMQ's Blocked Connections feature. I want to highlight specifically that it is NOT the kind of back-pressure I'm talking about. Blocked Connection events are published based on the RabbitMQ resource alarms (disk space, RAM, CPU), while I'm trying to engage back-pressure based on the RabbitMQ queue depth.

@IvanRibakov
Copy link
Author

Hi @mashhurs , thanks for reaching out the other day. Did you get a chance to look at the logs? Do they provide the information you were looking for?

@IvanRibakov
Copy link
Author

Hi @mashhurs, do you happen to have any news? Am I even barking up the right tree? Can you confirm if the behaviour I'm expecting is even meant to be supported by Logstash?

@IvanRibakov
Copy link
Author

Hi @mashhurs, is there anything I can do to help produce at least some answer on this issue?

As I stated before, from the current documentation it's not very clear whether the backpressure that I'm talking about is even meant to be supported. So getting some clarity on that would already have value for me (and for anyone else wondering the same).

I thought that "Steps to reproduce" provide fairly comprehensive way to reproduce the issue, but let me know if I can somehow improve it.

@flexitrev
Copy link

Hi @IvanRibakov, I'm not sure the behavior you are expecting is what the configuration would indicate. I think the expected behavior for reject-publish is this:

  1. Upon reaching queue limit of 1000 enqueued messages, Logstash stops publishing
  2. Additional messages are discarded rather than published. Data loss is expected
    https://www.rabbitmq.com/docs/maxlength#overflow-behaviour

I believe the observed behavior stated is consistent with this from the example provided. If I understand this correctly, the reject-publish argument doesn't actually apply backpressure, it merely prioritizes the older messages in the queue by not publishing new messages.

My recommendation is to test this with reject-publish-dlx, to dead letter the rejected messages so you can determine how the queue is handled.

@IvanRibakov
Copy link
Author

HI @flexitrev, thanks for joining the conversation.

Upon reaching queue limit of 1000 enqueued messages, Logstash stops publishing

  1. How does Logstash know that queue limit has been reached? Does that mean that Logstash supports publisher confirms?
  2. If Logstash stops publishing, how does it know when it is safe to restart?

Additional messages are discarded rather than published. Data loss is expected
https://www.rabbitmq.com/docs/maxlength#overflow-behaviour

I'm assuming above is from the RabbitMQ perspective? If so, the RabbitMQs behaviour was clear to me. What I haven't seen (missed?) was any Logstash documentation that explains what happens to the ingressed events when they fail to be delivered.


If I understand this correctly, the reject-publish argument doesn't actually apply backpressure

I'm inclined to agree with above based on everything I've seen/read since I created this issue. The only thing left that puzzles me a bit is how come someone had a need to apply backpressure based on the RabbitMQ resource availability, but not in other scenarios that could lead to data loss (like queue overflow)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants