Skip to content

Commit

Permalink
Merge pull request #31 from agorapulse/fix/let-redis-executor-respect…
Browse files Browse the repository at this point in the history
…-the-selected-scheduler

Let Redis Job Executor respect the selected scheduler
  • Loading branch information
musketyr authored Oct 31, 2024
2 parents a8b61a7 + 90be42e commit 18a2019
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,27 @@
*/
package com.agorapulse.worker.redis;

import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

@Singleton
@Requires(beans = {StatefulRedisConnection.class}, property = "redis.uri")
Expand Down Expand Up @@ -64,10 +71,14 @@ public class RedisJobExecutor implements DistributedJobExecutor {

private final StatefulRedisConnection<String, String> connection;
private final String hostname;
private final BeanContext beanContext;
private final JobManager jobManager;

public RedisJobExecutor(StatefulRedisConnection<String, String> connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname) {
public RedisJobExecutor(StatefulRedisConnection<String, String> connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname, BeanContext beanContext, JobManager jobManager) {
this.connection = connection;
this.hostname = hostname;
this.beanContext = beanContext;
this.jobManager = jobManager;
}

@Override
Expand All @@ -76,7 +87,7 @@ public <R> Publisher<R> executeOnlyOnLeader(String jobName, Callable<R> supplier

return readMasterHostname(jobName, commands).flatMap(h -> {
if (hostname.equals(h)) {
return Mono.fromCallable(supplier);
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
}
return Mono.empty();
}).flux();
Expand All @@ -94,7 +105,7 @@ public <R> Publisher<R> executeConcurrently(String jobName, int maxConcurrency,
return decreaseCurrentExecutionCount(jobName, commands).flatMap(decreased -> Mono.empty());
}

return Mono.fromCallable(supplier).doFinally(signal -> decreaseCurrentExecutionCount(jobName, commands).subscribe());
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))).doFinally(signal -> decreaseCurrentExecutionCount(jobName, commands).subscribe());
}).flux();
}

Expand All @@ -105,7 +116,7 @@ public <R> Publisher<R> executeOnlyOnFollower(String jobName, Callable<R> suppli
if (!"".equals(h) && h.equals(hostname)) {
return Mono.empty();
}
return Mono.fromCallable(supplier);
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
}).flux();
}

Expand Down Expand Up @@ -133,4 +144,14 @@ private Mono<Object> readMasterHostname(String jobName, RedisAsyncCommands<Strin
PREFIX_LEADER + jobName, hostname, String.valueOf(LEADER_INACTIVITY_TIMEOUT)
).toCompletableFuture()).defaultIfEmpty("");
}

private ExecutorService getExecutorService(String jobName) {
return jobManager
.getJob(jobName)
.map(Job::getConfiguration)
.map(JobConfiguration::getScheduler)
.flatMap(name -> beanContext.findBean(ExecutorService.class, Qualifiers.byName(name)))
.or(() -> beanContext.findBean(ExecutorService.class))
.orElseThrow(() -> new IllegalArgumentException("No executor service found for job " + jobName));
}
}
1 change: 1 addition & 0 deletions libs/micronaut-worker-tck/micronaut-worker-tck.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ dependencies {
implementation 'io.micronaut.reactor:micronaut-reactor'
implementation 'org.spockframework:spock-core'
implementation 'io.micronaut:micronaut-management'
implementation 'io.netty:netty-common'
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.agorapulse.worker.annotation.Job
import com.agorapulse.worker.annotation.LeaderOnly
import groovy.transform.CompileStatic
import io.micronaut.context.annotation.Requires
import io.netty.util.concurrent.FastThreadLocalThread
import org.reactivestreams.Publisher

import jakarta.inject.Singleton
Expand Down Expand Up @@ -102,7 +103,11 @@ class LongRunningJob {
return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork}"
}

@SuppressWarnings('Instanceof')
private static void runLongTask() {
if (Thread.currentThread() instanceof FastThreadLocalThread) {
throw new IllegalStateException('Running on FastThreadLocalThread will fail execution of HTTP client requests')
}
Thread.sleep(LONG_RUNNING_JOB_DURATION)
}

Expand Down

0 comments on commit 18a2019

Please sign in to comment.