Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-side AsyncContext initialized in lifecycle observer is lost #3111

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
// this ExecutionStrategy to understand if we need to offload more than we already offloaded:
final HttpExecutionStrategy additionalOffloads = ctx.executionContext().executionStrategy().missing(strategy);

Executor useExecutor = null != executor ? executor : ctx.executionContext().executor();
final Executor useExecutor = executor != null ? executor : ctx.executionContext().executor();

// The service should see this ExecutionStrategy and Executor inside the ExecutionContext:
final HttpServiceContext wrappedCtx =
Expand Down Expand Up @@ -136,12 +136,12 @@ public static StreamingHttpService offloadService(final HttpExecutionStrategy st
new StreamingHttpService() {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
StreamingHttpRequest request,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
Executor useExecutor = null != executor ? executor : ctx.executionContext().executor();
final Executor useExecutor = executor != null ? executor : ctx.executionContext().executor();

// The service should see this ExecutionStrategy and Executor inside the ExecutionContext:
HttpServiceContext wrappedCtx =
final HttpServiceContext wrappedCtx =
new ExecutionContextOverridingServiceContext(ctx, strategy, useExecutor);

return service.handle(wrappedCtx, request, responseFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
*/
final class ClearAsyncContextHttpServiceFilter implements StreamingHttpServiceFilterFactory {

static final ClearAsyncContextHttpServiceFilter CLEAR_ASYNC_CONTEXT_HTTP_SERVICE_FILTER =
new ClearAsyncContextHttpServiceFilter();
static final ClearAsyncContextHttpServiceFilter INSTANCE = new ClearAsyncContextHttpServiceFilter();

private ClearAsyncContextHttpServiceFilter() {
// singleton
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@
final class OffloadingFilter implements StreamingHttpServiceFilterFactory {

private final HttpExecutionStrategy strategy;
private final StreamingHttpServiceFilterFactory offloaded;
private final BooleanSupplier shouldOffload;

/**
* Creates a new instance.
*
* @param strategy Execution strategy for the offloaded filters
* @param offloaded Filters to be offloaded
* @param shouldOffload returns true if offloading is appropriate for the current execution context.
*/
OffloadingFilter(HttpExecutionStrategy strategy, StreamingHttpServiceFilterFactory offloaded,
BooleanSupplier shouldOffload) {
OffloadingFilter(final HttpExecutionStrategy strategy, final BooleanSupplier shouldOffload) {
this.strategy = strategy;
this.offloaded = offloaded;
this.shouldOffload = shouldOffload;
}

@Override
public StreamingHttpServiceFilter create(StreamingHttpService service) {
public StreamingHttpServiceFilter create(final StreamingHttpService service) {
StreamingHttpService offloadedService = StreamingHttpServiceToOffloadedStreamingHttpService.offloadService(
strategy, null, shouldOffload, offloaded.create(service));
strategy, null, shouldOffload, service);
return new StreamingHttpServiceFilter(offloadedService);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

abstract class AbstractAsyncHttpServiceAsyncContextTest extends AbstractHttpServiceAsyncContextTest {

@Test
void newRequestsGetFreshContextImmediate() throws Exception {
newRequestsGetFreshContext(true);
}

private static List<Arguments> params() {
List<Arguments> params = new ArrayList<>();
for (boolean useImmediate : Arrays.asList(false, true)) {
for (InitContextKeyPlace place : InitContextKeyPlace.values()) {
for (boolean asyncService : Arrays.asList(false, true)) {
if (!useImmediate && !asyncService) {
continue;
}
params.add(Arguments.of(useImmediate, place, asyncService));
}
}
}
return params;
}

@ParameterizedTest(name = "{displayName} [{index}]: useImmediate={0} initContextKeyPlace={1} asyncService={2}")
@MethodSource("params")
void contextPreservedOverFilterBoundariesAsync(boolean useImmediate, InitContextKeyPlace place,
boolean asyncService) throws Exception {
contextPreservedOverFilterBoundaries(useImmediate, place, asyncService);
}

@ParameterizedTest(name = "{displayName} [{index}]: connectionAcceptorType={0}")
@EnumSource(ConnectionAcceptorType.class)
void connectionAcceptorContextDoesNotLeakImmediate(ConnectionAcceptorType type) throws Exception {
connectionAcceptorContextDoesNotLeak(type, true);
}
}
Loading
Loading