Skip to content

Commit

Permalink
rate-limiting: propagate back-pressure from queue as HTTP 429's
Browse files Browse the repository at this point in the history
Adds a proactive handler that rejects new requests with HTTP 429's when the
queue has been blocking for more than 10 consecutive seconds, allowing back-
pressure to propagate in advance of filling up the connection backlog queue.
  • Loading branch information
yaauie committed Oct 1, 2024
1 parent f802fc3 commit 0413f83
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 9 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ Gemfile.bak
.bundle
vendor
.idea
.ci
build/*
.ci/*
.gradle/*
lib/logstash-input-http_jars.rb
logstash-input-http.iml
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.9.0
- add improved proactive rate-limiting, rejecting new requests when queue has been actively blocking for more than 10 seconds.

## 3.8.1
- bump netty to 4.1.109 [#173](https://github.com/logstash-plugins/logstash-input-http/pull/173)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.1
3.9.0
6 changes: 5 additions & 1 deletion spec/inputs/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@

def certificate_path(filename)
File.join(CERTS_DIR, filename)
end
end

RSpec.configure do |config|
config.formatter = :documentation
end
74 changes: 68 additions & 6 deletions spec/inputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,85 @@
let(:config) { { "port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests } }

context "when sending more requests than queue slots" do
it "should block when the queue is full" do
it "rejects additional incoming requests with HTTP 429" do
# these will queue and return 200
logstash_queue_size.times.each do |i|
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
expect(response.code).to eq(200)
end

# these will block
(threads + max_pending_requests).times.each do |i|
expect {
client.post("http://127.0.0.1:#{port}", :body => '{}').call
}.to raise_error(Manticore::SocketTimeout)
blocked_calls = (threads + max_pending_requests).times.map do
Thread.new do
begin
{:result => client.post("http://127.0.0.1:#{port}", :body => '{}').call}
rescue Manticore::SocketException, Manticore::SocketTimeout => e
{:exception => e}
end
end
end

sleep 1 # let those requests go, but not so long that our block-detector starts emitting 429's

# by now we should be rejecting with 429 since the backlog is full
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
expect(response.code).to eq(429)

# ensure that our blocked connections did block
aggregate_failures do
blocked_calls.map(&:value).each do |blocked|
expect(blocked[:result]).to be_nil
expect(blocked[:exception]).to be_a_kind_of Manticore::SocketTimeout
end
end
end
end
end

describe "observing queue back-pressure" do
let(:logstash_queue_size) { rand(10) + 1 }
let(:max_pending_requests) { rand(5) + 1 }
let(:threads) { rand(4) + 1 }
let(:logstash_queue) { SizedQueue.new(logstash_queue_size) }
let(:client_options) { {
"request_timeout" => 0.1,
"connect_timeout" => 3,
"socket_timeout" => 0.1
} }

let(:config) { { "port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests } }

context "when sending request to an input that has blocked connections" do
it "rejects incoming requests with HTTP 429" do
# these will queue and return 200
logstash_queue_size.times.each do |i|
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
expect(response.code).to eq(200)
end

# by now we should be rejecting with 429
# these will block
blocked_call = Thread.new do
begin
{:result => client.post("http://127.0.0.1:#{port}", :body => '{}').call}
rescue Manticore::SocketException, Manticore::SocketTimeout => e
{:exception => e}
end
end

sleep 12 # let that requests go, and ensure it is blocking long enough to be problematic

# by now we should be rejecting with 429 since at least one existing request is blocked
# for more than 10s.
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
expect(response.code).to eq(429)

# ensure that our blocked connections did block
aggregate_failures do
blocked_call.value.tap do |blocked|
expect(blocked[:result]).to be_nil
expect(blocked[:exception]).to be_a_kind_of Manticore::SocketTimeout
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslHandler;
import org.logstash.plugins.inputs.http.util.ExecutionObserver;
import org.logstash.plugins.inputs.http.util.ExecutionObservingMessageHandler;
import org.logstash.plugins.inputs.http.util.RejectWhenBlockedInboundHandler;
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;

import java.time.Duration;
import java.util.concurrent.ThreadPoolExecutor;

/**
Expand All @@ -22,9 +26,11 @@ public class HttpInitializer extends ChannelInitializer<SocketChannel> {
private final HttpResponseStatus responseStatus;
private final ThreadPoolExecutor executorGroup;

private final ExecutionObserver executionObserver = new ExecutionObserver();

public HttpInitializer(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
int maxContentLength, HttpResponseStatus responseStatus) {
this.messageHandler = messageHandler;
this.messageHandler = new ExecutionObservingMessageHandler(executionObserver, messageHandler);
this.executorGroup = executorGroup;
this.maxContentLength = maxContentLength;
this.responseStatus = responseStatus;
Expand All @@ -37,7 +43,9 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
SslHandler sslHandler = sslHandlerProvider.getSslHandler(socketChannel.alloc());
pipeline.addLast(sslHandler);
}

pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new RejectWhenBlockedInboundHandler(executionObserver, Duration.ofSeconds(10)));
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup, responseStatus));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package org.logstash.plugins.inputs.http.util;

import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

/**
* An {@code ExecutionObserver} observes possibly-concurrent execution, and provides information about the
* longest-running observed execution.
*
* <p>
* It is concurrency-safe and non-blocking, and uses plain memory access where practical.
* </p>
*/
public class ExecutionObserver {
private final AtomicReference<Execution> head; // newest execution
private final AtomicReference<Execution> tail; // oldest execution

private final LongSupplier nanosSupplier;

public ExecutionObserver() {
this(System::nanoTime);
}

ExecutionObserver(final LongSupplier nanosSupplier) {
this.nanosSupplier = nanosSupplier;
final Execution anchor = new Execution(nanosSupplier.getAsLong(), true);
this.head = new AtomicReference<>(anchor);
this.tail = new AtomicReference<>(anchor);
}

/**
* @see ExecutionObserver#anyExecuting(Duration)
* @return true if there are any active executions.
*/
public boolean anyExecuting() {
return this.anyExecuting(Duration.ZERO);
}

/**
* @param minimumDuration a threshold to exclude young executions
* @return true if any active execution has been running for at least the provided {@code Duration}
*/
public boolean anyExecuting(final Duration minimumDuration) {
final Execution tailExecution = compactTail();
if (tailExecution.isComplete) {
return false;
} else {
return nanosSupplier.getAsLong() - tailExecution.startNanos >= minimumDuration.toNanos();
}
}

// visible for test
Optional<Duration> longestExecuting() {
final Execution tailExecution = compactTail();
if (tailExecution.isComplete) {
return Optional.empty();
} else {
return Optional.of(Duration.ofNanos(nanosSupplier.getAsLong() - tailExecution.startNanos));
}
}

// test inspections
Stats stats() {
int nodes = 0;
int executing = 0;

Execution candidate = this.tail.get();
while (candidate != null) {
nodes += 1;
if (!candidate.isComplete) {
executing += 1;
}
candidate = candidate.getNextPlain();
}
return new Stats(nodes, executing);
}

static class Stats {
final int nodes;
final int executing;

Stats(int nodes, int executing) {
this.nodes = nodes;
this.executing = executing;
}
}

@FunctionalInterface
public interface ExceptionalSupplier<T, E extends Throwable> {
T get() throws E;
}

public <T,E extends Throwable> T observeExecution(final ExceptionalSupplier<T,E> supplier) throws E {
final Execution execution = startExecution();
try {
return supplier.get();
} finally {
final boolean isCompact = execution.markComplete();
if (!isCompact) {
this.compactTail();
}
}
}

@FunctionalInterface
public interface ExceptionalRunnable<E extends Throwable> {
void run() throws E;
}

public <E extends Throwable> void observeExecution(final ExceptionalRunnable<E> runnable) throws E {
observeExecution(() -> { runnable.run(); return null; });
}

// visible for test
Execution startExecution() {
final Execution newHead = new Execution(nanosSupplier.getAsLong());

// atomically attach the new execution as a new (detached) head
final Execution oldHead = this.head.getAndSet(newHead);
// attach our new head to the old one
oldHead.linkNext(newHead);

return newHead;
}

private Execution compactTail() {
return this.tail.updateAndGet(Execution::chaseTail);
}

static class Execution {
private static final java.lang.invoke.VarHandle NEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Execution.class, "next", Execution.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

private final long startNanos;

private volatile boolean isComplete;
private volatile Execution next;

Execution(long startNanos) {
this(startNanos, false);
}

Execution(final long startNanos,
final boolean isComplete) {
this.startNanos = startNanos;
this.isComplete = isComplete;
}

/**
* marks this execution as complete
* @return true if the completion resulted in a compaction
*/
boolean markComplete() {
isComplete = true;

// concurrency: use plain memory for reads because we can tolerate
// completed nodes remaining as the result of a race
final Execution preCompletionNext = this.getNextPlain();
if (preCompletionNext != null) {
final Execution newNext = preCompletionNext.chaseTail();
return (newNext != preCompletionNext) && NEXT.compareAndSet(this, preCompletionNext, newNext);
}
return false;
}

private void linkNext(final Execution proposedNext) {
final Execution witness = (Execution)NEXT.compareAndExchange(this, null, proposedNext);
if (witness != null && witness != proposedNext) {
throw new IllegalStateException();
}
}

/**
* @return the next {@code Execution} that is either not yet complete
* or is the current head, using plain memory access.
*/
private Execution chaseTail() {
Execution compactedTail = this;
Execution candidate = this.getNextPlain();
while (candidate != null && compactedTail.isComplete) {
compactedTail = candidate;
candidate = candidate.getNextPlain();
}
return compactedTail;
}

private Execution getNextPlain() {
return (Execution) NEXT.get(this);
}
}
}
Loading

0 comments on commit 0413f83

Please sign in to comment.