Skip to content

Commit

Permalink
Update external worker job unacquire with tenant logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tijsrademakers committed Sep 4, 2023
1 parent 7e7bd00 commit 687d8a3
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ public interface CmmnManagementService {
*/
void unacquireAllExternalWorkerJobsForWorker(String workerId);

/**
* Unaquire all locked external worker jobs for worker and tenant.
*/
void unacquireAllExternalWorkerJobsForWorker(String workerId, String tenantId);

/**
* Create a {@link ChangeTenantIdBuilder} that can be used to change the tenant id of the case instances
* and all the related instances. See {@link CmmnChangeTenantIdEntityTypes} for related instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,12 @@ public void unacquireExternalWorkerJob(String jobId, String workerId) {

@Override
public void unacquireAllExternalWorkerJobsForWorker(String workerId) {
commandExecutor.execute(new UnacquireAllExternalWorkerJobsForWorkerCmd(workerId, configuration.getJobServiceConfiguration()));
commandExecutor.execute(new UnacquireAllExternalWorkerJobsForWorkerCmd(workerId, null, configuration.getJobServiceConfiguration()));
}

@Override
public void unacquireAllExternalWorkerJobsForWorker(String workerId, String tenantId) {
commandExecutor.execute(new UnacquireAllExternalWorkerJobsForWorkerCmd(workerId, tenantId, configuration.getJobServiceConfiguration()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.scope.ScopeTypes;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.identitylink.api.IdentityLink;
import org.flowable.identitylink.api.IdentityLinkType;
import org.flowable.identitylink.service.impl.persistence.entity.IdentityLinkEntity;
import org.flowable.job.api.AcquiredExternalWorkerJob;
import org.flowable.job.api.ExternalWorkerJob;
import org.flowable.job.api.ExternalWorkerJobQuery;
import org.flowable.job.api.Job;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntityManager;
import org.flowable.job.service.impl.persistence.entity.JobEntity;
import org.flowable.task.api.TaskInfo;
import org.flowable.variable.service.impl.persistence.entity.VariableInstanceEntity;
Expand Down Expand Up @@ -1270,6 +1274,64 @@ public void testUnaquireWithWorkerId() {
assertThat(job.getLockOwner()).isNull();
assertThat(job.getLockExpirationTime()).isNull();
}

@Test
@CmmnDeployment(resources = "org/flowable/cmmn/test/externalworker/ExternalWorkerServiceTaskTest.testSimple.cmmn")
public void testUnaquireWithWorkerIdAndTenantId() {
CaseInstance tenant1Instance = cmmnRuntimeService.createCaseInstanceBuilder()
.caseDefinitionKey("simpleExternalWorker")
.overrideCaseDefinitionTenantId("tenant1")
.start();

CaseInstance tenant2Instance = cmmnRuntimeService.createCaseInstanceBuilder()
.caseDefinitionKey("simpleExternalWorker")
.overrideCaseDefinitionTenantId("tenant2")
.start();

cmmnManagementService.createExternalWorkerJobAcquireBuilder()
.topic("simple", Duration.ofMinutes(10))
.acquireAndLock(2, "testWorker1");

ExternalWorkerJobQuery query = cmmnManagementService.createExternalWorkerJobQuery().lockOwner("testWorker1");
assertThat(query.count()).isEqualTo(2);
assertThat(query.list())
.extracting(ExternalWorkerJob::getElementId)
.containsExactlyInAnyOrder("externalWorkerTask", "externalWorkerTask");

assertThatThrownBy(() -> {
cmmnManagementService.unacquireAllExternalWorkerJobsForWorker("testWorker1", "tenant1");

}).isInstanceOf(FlowableException.class)
.hasMessageContaining("provided worker id has external worker jobs from different tenant");

ExternalWorkerJobEntity worker1Tenant2Job = (ExternalWorkerJobEntity) cmmnManagementService.createExternalWorkerJobQuery().lockOwner("testWorker1").jobTenantId("tenant2").singleResult();

cmmnEngineConfiguration.getCommandExecutor().execute(new Command<Void>() {

@Override
public Void execute(CommandContext commandContext) {
ExternalWorkerJobEntityManager externalWorkerJobEntityManager = cmmnEngineConfiguration.getJobServiceConfiguration().getExternalWorkerJobEntityManager();
worker1Tenant2Job.setTenantId("tenant1");
externalWorkerJobEntityManager.update(worker1Tenant2Job);

return null;
}
});

cmmnManagementService.unacquireAllExternalWorkerJobsForWorker("testWorker1");

assertThat(query.count()).isEqualTo(0);

query = cmmnManagementService.createExternalWorkerJobQuery().scopeId(tenant1Instance.getId());
ExternalWorkerJob job = query.singleResult();
assertThat(job.getLockOwner()).isNull();
assertThat(job.getLockExpirationTime()).isNull();

query = cmmnManagementService.createExternalWorkerJobQuery().scopeId(tenant2Instance.getId());
job = query.singleResult();
assertThat(job.getLockOwner()).isNull();
assertThat(job.getLockExpirationTime()).isNull();
}

protected void addUserIdentityLinkToJob(Job job, String userId) {
cmmnEngineConfiguration.getCommandExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ public interface ManagementService {
*/
void unacquireAllExternalWorkerJobsForWorker(String workerId);

/**
* Unaquire all locked external worker jobs for worker and tenant.
*/
void unacquireAllExternalWorkerJobsForWorker(String workerId, String tenantId);

/**
* Create a {@link ChangeTenantIdBuilder} that can be used to change the tenant id of the process instances
* and all the related instances. See {@link BpmnChangeTenantIdEntityTypes} for related instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,12 @@ public void unacquireExternalWorkerJob(String jobId, String workerId) {

@Override
public void unacquireAllExternalWorkerJobsForWorker(String workerId) {
commandExecutor.execute(new UnacquireAllExternalWorkerJobsForWorkerCmd(workerId, configuration.getJobServiceConfiguration()));
commandExecutor.execute(new UnacquireAllExternalWorkerJobsForWorkerCmd(workerId, null, configuration.getJobServiceConfiguration()));
}

@Override
public void unacquireAllExternalWorkerJobsForWorker(String workerId, String tenantId) {
commandExecutor.execute(new UnacquireAllExternalWorkerJobsForWorkerCmd(workerId, tenantId, configuration.getJobServiceConfiguration()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.flowable.job.api.JobInfo;
import org.flowable.job.service.JobService;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntity;
import org.flowable.job.service.impl.persistence.entity.ExternalWorkerJobEntityManager;
import org.junit.jupiter.api.Test;

/**
Expand Down Expand Up @@ -537,6 +538,104 @@ public void testUnaquireWithWorkerId() {
assertThat(externalWorkerJob.getLockExpirationTime()).isNull();
}
}

@Test
@Deployment(resources = "org/flowable/engine/test/api/mgmt/ExternalWorkerJobQueryTest.bpmn20.xml")
public void testUnaquireWithWorkerIdAndTenantId() {
ProcessInstance tenant1Instance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey("externalWorkerJobQueryTest")
.overrideProcessDefinitionTenantId("tenant1")
.start();

ProcessInstance tenant2Instance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey("externalWorkerJobQueryTest")
.overrideProcessDefinitionTenantId("tenant2")
.start();

assertThat(managementService.createExternalWorkerJobQuery()
.jobTenantId("tenant1").count()).isEqualTo(2);

assertThat(managementService.createExternalWorkerJobQuery()
.jobTenantId("tenant2").count()).isEqualTo(2);

managementService.createExternalWorkerJobAcquireBuilder()
.topic("orderService", Duration.ofMinutes(10))
.acquireAndLock(2, "testWorker1");

managementService.createExternalWorkerJobAcquireBuilder()
.topic("customerService", Duration.ofMinutes(10))
.acquireAndLock(2, "testWorker2");

ExternalWorkerJobQuery query = managementService.createExternalWorkerJobQuery()
.lockOwner("testWorker1")
.jobTenantId("tenant1");
assertThat(query.count()).isEqualTo(1);

query = managementService.createExternalWorkerJobQuery()
.lockOwner("testWorker1")
.jobTenantId("tenant2");
assertThat(query.count()).isEqualTo(1);

assertThatThrownBy(() -> {
managementService.unacquireAllExternalWorkerJobsForWorker("testWorker2", "tenant1");

}).isInstanceOf(FlowableException.class)
.hasMessageContaining("provided worker id has external worker jobs from different tenant");

ExternalWorkerJobEntity worker1Tenant2Job = (ExternalWorkerJobEntity) managementService.createExternalWorkerJobQuery().lockOwner("testWorker1").jobTenantId("tenant2").singleResult();
ExternalWorkerJobEntity worker2Tenant1Job = (ExternalWorkerJobEntity) managementService.createExternalWorkerJobQuery().lockOwner("testWorker2").jobTenantId("tenant1").singleResult();

processEngineConfiguration.getCommandExecutor().execute(new Command<Void>() {

@Override
public Void execute(CommandContext commandContext) {
ExternalWorkerJobEntityManager externalWorkerJobEntityManager = processEngineConfiguration.getJobServiceConfiguration().getExternalWorkerJobEntityManager();
worker1Tenant2Job.setTenantId("tenant1");
externalWorkerJobEntityManager.update(worker1Tenant2Job);

worker2Tenant1Job.setTenantId("tenant2");
externalWorkerJobEntityManager.update(worker2Tenant1Job);

return null;
}
});

query = managementService.createExternalWorkerJobQuery()
.lockOwner("testWorker1")
.jobTenantId("tenant1");
assertThat(query.count()).isEqualTo(2);

query = managementService.createExternalWorkerJobQuery()
.lockOwner("testWorker2")
.jobTenantId("tenant2");
assertThat(query.count()).isEqualTo(2);

managementService.unacquireAllExternalWorkerJobsForWorker("testWorker2", "tenant2");
assertThat(managementService.createExternalWorkerJobQuery()
.lockOwner("testWorker2").count()).isEqualTo(0);

managementService.unacquireAllExternalWorkerJobsForWorker("testWorker1", "tenant1");

managementService.unacquireAllExternalWorkerJobsForWorker("testWorker1");
assertThat(managementService.createExternalWorkerJobQuery()
.lockOwner("testWorker1").count()).isEqualTo(0);

query = managementService.createExternalWorkerJobQuery().processInstanceId(tenant1Instance.getId());
assertThat(query.count()).isEqualTo(2);

for (ExternalWorkerJob externalWorkerJob : query.list()) {
assertThat(externalWorkerJob.getLockOwner()).isNull();
assertThat(externalWorkerJob.getLockExpirationTime()).isNull();
}

query = managementService.createExternalWorkerJobQuery().processInstanceId(tenant2Instance.getId());
assertThat(query.count()).isEqualTo(2);

for (ExternalWorkerJob externalWorkerJob : query.list()) {
assertThat(externalWorkerJob.getLockOwner()).isNull();
assertThat(externalWorkerJob.getLockExpirationTime()).isNull();
}
}

protected void addUserIdentityLinkToJob(Job job, String userId) {
managementService.executeCommand(commandContext -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ResponseEntity<?> unacquireJobs(@RequestBody UnacquireExternalWorkerJobsR
throw new FlowableIllegalArgumentException("worker id is required");
}

unaquireExternalWorkerJobs(request.getWorkerId());
unaquireExternalWorkerJobs(request.getWorkerId(), request.getTenantId());

return ResponseEntity.noContent().build();
}
Expand Down Expand Up @@ -113,11 +113,21 @@ public ResponseEntity<?> unaquireJob(@PathVariable String jobId, @RequestBody Un
return ResponseEntity.noContent().build();
}

protected void unaquireExternalWorkerJobs(String workerId) {
protected void unaquireExternalWorkerJobs(String workerId, String tenantId) {
if (managementService != null) {
managementService.unacquireAllExternalWorkerJobsForWorker(workerId);
if (StringUtils.isNotEmpty(tenantId)) {
managementService.unacquireAllExternalWorkerJobsForWorker(workerId, tenantId);
} else {
managementService.unacquireAllExternalWorkerJobsForWorker(workerId);
}

} else if (cmmnManagementService != null) {
cmmnManagementService.unacquireAllExternalWorkerJobsForWorker(workerId);
if (StringUtils.isNotEmpty(tenantId)) {
cmmnManagementService.unacquireAllExternalWorkerJobsForWorker(workerId, tenantId);
} else {
cmmnManagementService.unacquireAllExternalWorkerJobsForWorker(workerId);
}

} else {
throw new FlowableException("Cannot unacquire external jobs. There is no BPMN or CMMN engine available");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
@ApiModel(description = "Request that is used for unacquiring external worker jobs")
public class UnacquireExternalWorkerJobsRequest {

@ApiModelProperty(value = "The id of the external worker", example = "orderWorker1", required = true)
@ApiModelProperty(value = "The worker id for the external worker jobs to unaquire", example = "orderWorker1", required = true)
protected String workerId;

@ApiModelProperty(value = "The tenant id for the external worker jobs to unaquire", example = "tenant1", required = false)
protected String tenantId;

public String getWorkerId() {
return workerId;
Expand All @@ -28,4 +31,12 @@ public String getWorkerId() {
public void setWorkerId(String workerId) {
this.workerId = workerId;
}

public String getTenantId() {
return tenantId;
}

public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}
}
Loading

0 comments on commit 687d8a3

Please sign in to comment.