Skip to content

Commit

Permalink
Merge branch '4.8.x' of github.com:micronaut-projects/micronaut-core …
Browse files Browse the repository at this point in the history
…into 4.8.x
  • Loading branch information
sdelamo committed Oct 29, 2024
2 parents 8336b2e + 0a78c94 commit 0d76771
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ protected Flux<ByteBuf> toByteBufPublisher() {
return NettyByteBufferFactory.DEFAULT.wrap(claim());
}

@Override
public @NonNull CloseableByteBody move() {
return new AvailableNettyByteBody(claim());
}

@Override
public @NonNull String toString(Charset charset) {
ByteBuf b = claim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ public void error(Throwable e) {
return sharedBuffer.subscribeFull(upstream, forceDelaySubscribe).map(AvailableNettyByteBody::new);
}

@Override
public @NonNull CloseableByteBody move() {
BufferConsumer.Upstream upstream = this.upstream;
if (upstream == null) {
failClaim();
}
this.upstream = null;
return new StreamingNettyByteBody(sharedBuffer, forceDelaySubscribe, upstream);
}

@Override
public void close() {
BufferConsumer.Upstream upstream = this.upstream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.http.netty.body

import io.netty.buffer.Unpooled
import spock.lang.Specification

import java.nio.charset.StandardCharsets

class AvailableNettyByteBodySpec extends Specification {
def move() {
given:
def a = new AvailableNettyByteBody(Unpooled.copiedBuffer("foo", StandardCharsets.UTF_8))
def b = a.move()

when:
a.close()
then:
b.buffer().get().toString(StandardCharsets.UTF_8) == "foo"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.micronaut.http.netty.body

import io.netty.buffer.Unpooled
import io.netty.channel.embedded.EmbeddedChannel
import reactor.core.publisher.Flux
import spock.lang.Specification

import java.nio.charset.StandardCharsets

class StreamingNettyByteBodySpec extends Specification {
def move() {
given:
def a = NettyBodyAdapter.adapt(Flux.just(Unpooled.copiedBuffer("foo", StandardCharsets.UTF_8)), new EmbeddedChannel().eventLoop())
def b = a.move()

when:
a.close()
then:
b.buffer().get().toString(StandardCharsets.UTF_8) == "foo"
}
}
17 changes: 17 additions & 0 deletions http/src/main/java/io/micronaut/http/body/ByteBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,25 @@ default ByteBody allowDiscard() {
*
* @return A future that completes when all bytes are available
*/
@NonNull
CompletableFuture<? extends CloseableAvailableByteBody> buffer();

/**
* Create a new {@link CloseableByteBody} with the same content but an independent lifecycle,
* claiming this body in the process.
* <p>This is a primary operation. After this operation, no other primary operation or
* {@link #split()} may be done.
* <p>The purpose of this method is to <i>move</i> the data to a different component in an
* application, making clear that the receiving component claims ownership of the body. If the
* sending component then closes the original {@link ByteBody} for example, it will have no
* impact on the new {@link CloseableByteBody} that the receiver is working with.
*
* @return A new {@link CloseableByteBody} with the same content.
* @since 4.8.0
*/
@NonNull
CloseableByteBody move();

/**
* This enum controls how backpressure should be handled if one of the two bodies
* ("downstreams") is consuming data slower than the other.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.micronaut.core.io.buffer.ByteBufferFactory;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.body.InternalByteBody;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -88,6 +89,11 @@ public long length() {
return bufferFactory.wrap(toByteArray());
}

@Override
public @NonNull CloseableByteBody move() {
return new AvailableByteArrayBody(bufferFactory, toByteArray());
}

@Override
public void close() {
array = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public void close() {
});
}

@Override
public @NonNull CloseableByteBody move() {
return new InputStreamByteBody(context, toInputStream());
}

private record Context(
OptionalLong expectedLength,
Executor ioExecutor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.http.body.stream

import io.micronaut.core.io.buffer.ByteArrayBufferFactory
import spock.lang.Specification

import java.nio.charset.StandardCharsets

class AvailableByteArrayBodySpec extends Specification {
def move() {
given:
def a = AvailableByteArrayBody.create(ByteArrayBufferFactory.INSTANCE, "foo".getBytes(StandardCharsets.UTF_8))
def b = a.move()

when:
a.close()
then:
b.buffer().get().toString(StandardCharsets.UTF_8) == "foo"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.micronaut.http.body.stream

import io.micronaut.core.io.buffer.ByteArrayBufferFactory
import spock.lang.Specification

import java.nio.charset.StandardCharsets
import java.util.concurrent.Executors

class InputStreamByteBodySpec extends Specification {
def move() {
given:
def pool = Executors.newCachedThreadPool()
def a = InputStreamByteBody.create(new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)), OptionalLong.empty(), pool, ByteArrayBufferFactory.INSTANCE)
def b = a.move()

when:
a.close()
then:
b.buffer().get().toString(StandardCharsets.UTF_8) == "foo"

cleanup:
pool.shutdown()
}
}

0 comments on commit 0d76771

Please sign in to comment.