diff --git a/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java b/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java index 90f8c3d..c6c0160 100644 --- a/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java +++ b/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java @@ -17,11 +17,16 @@ */ package com.agorapulse.worker.redis; +import com.agorapulse.worker.Job; +import com.agorapulse.worker.JobConfiguration; +import com.agorapulse.worker.JobManager; import com.agorapulse.worker.executor.DistributedJobExecutor; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; +import io.micronaut.context.BeanContext; import io.micronaut.context.annotation.Requires; +import io.micronaut.inject.qualifiers.Qualifiers; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +34,10 @@ import jakarta.inject.Named; import jakarta.inject.Singleton; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; @Singleton @Requires(beans = {StatefulRedisConnection.class}, property = "redis.uri") @@ -64,10 +71,14 @@ public class RedisJobExecutor implements DistributedJobExecutor { private final StatefulRedisConnection connection; private final String hostname; + private final BeanContext beanContext; + private final JobManager jobManager; - public RedisJobExecutor(StatefulRedisConnection connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname) { + public RedisJobExecutor(StatefulRedisConnection connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname, BeanContext beanContext, JobManager jobManager) { this.connection = connection; this.hostname = hostname; + this.beanContext = beanContext; + this.jobManager = jobManager; } @Override @@ -76,7 +87,7 @@ public Publisher executeOnlyOnLeader(String jobName, Callable supplier return readMasterHostname(jobName, commands).flatMap(h -> { if (hostname.equals(h)) { - return Mono.fromCallable(supplier); + return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))); } return Mono.empty(); }).flux(); @@ -94,7 +105,7 @@ public Publisher executeConcurrently(String jobName, int maxConcurrency, return decreaseCurrentExecutionCount(jobName, commands).flatMap(decreased -> Mono.empty()); } - return Mono.fromCallable(supplier).doFinally(signal -> decreaseCurrentExecutionCount(jobName, commands).subscribe()); + return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))).doFinally(signal -> decreaseCurrentExecutionCount(jobName, commands).subscribe()); }).flux(); } @@ -105,7 +116,7 @@ public Publisher executeOnlyOnFollower(String jobName, Callable suppli if (!"".equals(h) && h.equals(hostname)) { return Mono.empty(); } - return Mono.fromCallable(supplier); + return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))); }).flux(); } @@ -133,4 +144,14 @@ private Mono readMasterHostname(String jobName, RedisAsyncCommands beanContext.findBean(ExecutorService.class, Qualifiers.byName(name))) + .or(() -> beanContext.findBean(ExecutorService.class)) + .orElseThrow(() -> new IllegalArgumentException("No executor service found for job " + jobName)); + } } diff --git a/libs/micronaut-worker-tck/micronaut-worker-tck.gradle b/libs/micronaut-worker-tck/micronaut-worker-tck.gradle index 9250638..32b3b60 100644 --- a/libs/micronaut-worker-tck/micronaut-worker-tck.gradle +++ b/libs/micronaut-worker-tck/micronaut-worker-tck.gradle @@ -23,4 +23,5 @@ dependencies { implementation 'io.micronaut.reactor:micronaut-reactor' implementation 'org.spockframework:spock-core' implementation 'io.micronaut:micronaut-management' + implementation 'io.netty:netty-common' } diff --git a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy index fd630f2..0cc1836 100644 --- a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy +++ b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy @@ -25,6 +25,7 @@ import com.agorapulse.worker.annotation.Job import com.agorapulse.worker.annotation.LeaderOnly import groovy.transform.CompileStatic import io.micronaut.context.annotation.Requires +import io.netty.util.concurrent.FastThreadLocalThread import org.reactivestreams.Publisher import jakarta.inject.Singleton @@ -102,7 +103,11 @@ class LongRunningJob { return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork}" } + @SuppressWarnings('Instanceof') private static void runLongTask() { + if (Thread.currentThread() instanceof FastThreadLocalThread) { + throw new IllegalStateException('Running on FastThreadLocalThread will fail execution of HTTP client requests') + } Thread.sleep(LONG_RUNNING_JOB_DURATION) }