Skip to content

Commit

Permalink
Re-use ContentStreamProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Farr <[email protected]>
  • Loading branch information
Xtansia committed Nov 11, 2024
1 parent 58823eb commit 6e4e8c2
Showing 1 changed file with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.SdkAutoCloseable;
Expand Down Expand Up @@ -210,10 +211,10 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
@Nullable TransportOptions options
) throws IOException {
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
SignedRequest clientReq = prepareRequest(request, endpoint, options, requestBody);

if (httpClient instanceof SdkHttpClient) {
return executeSync((SdkHttpClient) httpClient, clientReq, requestBody, endpoint, options);
return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
} else if (httpClient instanceof SdkAsyncHttpClient) {
try {
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get();
Expand Down Expand Up @@ -242,11 +243,11 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
) {
try {
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
SignedRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
if (httpClient instanceof SdkAsyncHttpClient) {
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options);
} else if (httpClient instanceof SdkHttpClient) {
ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, requestBody, endpoint, options);
ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
return CompletableFuture.completedFuture(result);
} else {
throw new IOException("invalid httpClient: " + httpClient);
Expand Down Expand Up @@ -293,7 +294,7 @@ private <RequestT> OpenSearchRequestBodyBuffer prepareRequestBody(
return null;
}

private <RequestT> SdkHttpRequest prepareRequest(
private <RequestT> SignedRequest prepareRequest(
RequestT request,
Endpoint<RequestT, ?, ?> endpoint,
@CheckForNull TransportOptions options,
Expand Down Expand Up @@ -353,21 +354,21 @@ private <RequestT> SdkHttpRequest prepareRequest(

final Clock signingClock = getOption(options, AwsSdk2TransportOptions::signingClock).orElse(null);

SdkHttpRequest.Builder signedReq = AwsV4HttpSigner.create()
SignedRequest signedReq = AwsV4HttpSigner.create()
.sign(
b -> b.identity(credentials.resolveCredentials())
.request(req.build())
.payload(bodyProvider)
.putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, this.signingServiceName)
.putProperty(AwsV4HttpSigner.REGION_NAME, this.signingRegion.id())
.putProperty(AwsV4HttpSigner.SIGNING_CLOCK, signingClock)
)
.request()
.toBuilder();
);

SdkHttpRequest.Builder httpRequest = signedReq.request().toBuilder();

applyHeadersPostSigning(signedReq, body);
applyHeadersPostSigning(httpRequest, body);

return signedReq.build();
return signedReq.toBuilder().request(httpRequest.build()).build();
}

private void applyHeadersPreSigning(SdkHttpRequest.Builder req, TransportOptions options, OpenSearchRequestBodyBuffer body) {
Expand Down Expand Up @@ -424,15 +425,13 @@ private void applyOptionsHeaders(SdkHttpRequest.Builder builder, TransportOption

private <ResponseT> ResponseT executeSync(
SdkHttpClient syncHttpClient,
SdkHttpRequest httpRequest,
OpenSearchRequestBodyBuffer requestBody,
SignedRequest signedRequest,
Endpoint<?, ResponseT, ?> endpoint,
TransportOptions options
) throws IOException {
SdkHttpRequest httpRequest = signedRequest.request();
HttpExecuteRequest.Builder executeRequest = HttpExecuteRequest.builder().request(httpRequest);
if (requestBody != null) {
executeRequest.contentStreamProvider(ContentStreamProvider.fromByteArrayUnsafe(requestBody.getByteArray()));
}
signedRequest.payload().ifPresent(executeRequest::contentStreamProvider);
HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call();
AbortableInputStream bodyStream = null;
try {
Expand All @@ -456,11 +455,12 @@ private <ResponseT> ResponseT executeSync(

private <ResponseT> CompletableFuture<ResponseT> executeAsync(
SdkAsyncHttpClient asyncHttpClient,
SdkHttpRequest httpRequest,
SignedRequest signedRequest,
@CheckForNull OpenSearchRequestBodyBuffer requestBody,
Endpoint<?, ResponseT, ?> endpoint,
TransportOptions options
) {
SdkHttpRequest httpRequest = signedRequest.request();
byte[] requestBodyArray = requestBody == null ? NO_BYTES : requestBody.getByteArray();
final AsyncCapturingResponseHandler responseHandler = new AsyncCapturingResponseHandler();
AsyncExecuteRequest.Builder executeRequest = AsyncExecuteRequest.builder()
Expand Down

0 comments on commit 6e4e8c2

Please sign in to comment.