Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESOURCE_EXHAUSTED - Use algorithm to retry and increase back-off time #140

Open
kbakk opened this issue Mar 12, 2021 · 8 comments
Open
Assignees
Labels
enhancement New feature or request

Comments

@kbakk
Copy link
Collaborator

kbakk commented Mar 12, 2021

Is your feature request related to a problem? Please describe.
With the current behaviour of the watcher, there's no backoff before restarting a task.

According to the Zeebe docs, in case a RESOURCE_EXHAUSTED status is returned (which translates to pyzeebe.exceptions.zeebe_exceptions.ZeebeBackPressure), we should "retry with an appropriate retry policy (e.g. a combination of exponential backoff or jitter wrapped in a circuit breaker)".

Describe the solution you'd like
E.g. wait attempt * 5 + randint(1,3) sec between each attempt.

Describe alternatives you've considered
If the process is being managed in e.g. Kubernetes or similar, there may be some way to instrument Kubernetes to wait n sec before restarting the process. Then we could have Kubernetes handle this. I am however not familiar with such possibilities.

@JonatanMartens
Copy link
Collaborator

Sounds good!

There appears to be a library that implements exponential backoff called retrying however, it was last released in 2014. I would prefer to use something similar to this instead of implementing it ourselves.

If the process is being managed in e.g. Kubernetes or similar, there may be some way to instrument Kubernetes to wait n sec before restarting the process. Then we could have Kubernetes handle this. I am however not familiar with such possibilities.

We should refrain from depending on Kubernetes. pyzeebe should run on all platforms, whether it's a VM, container, or bare metal.

@kbakk kbakk added the enhancement New feature or request label Mar 19, 2021
@Chadys
Copy link
Contributor

Chadys commented Apr 20, 2021

Hello and first of all thanks for having created this project. I'm starting out with Zeebe and having an already available Python client is really practical :)
I looked around a bit and I don't think you'll find a serious Python library providing a retry functionality; from what I see any project that needs it just implement a simple decorator.
Useful links:

@Chadys
Copy link
Contributor

Chadys commented Apr 21, 2021

Just had another thought about it and if Pyzeebe becomes async as per #143 , the waiting backoff function should of course be awaitable

@kbakk
Copy link
Collaborator Author

kbakk commented Apr 21, 2021

https://github.com/jd/tenacity is an alternative (a fork of https://github.com/invl/retry which I've been happy with in the past) with support for both synchronous Python and async.

It's a single-dependency package, rather popular and seemingly well-maintained. Think it could be a good fit for our use case - if we design it correctly, we could allow the user to pass a custom retry strategy consisting of tenancy objects.

@kbakk kbakk changed the title Watcher - Use algorithm to increase back-off time RESOURCE_EXHAUSTED - Use algorithm to retry and increase back-off time May 22, 2021
@kbakk
Copy link
Collaborator Author

kbakk commented May 22, 2021

Updating this issue, because I think we need to rethink the approach here, and also because #172 will remove the watcher.

I've been seeing some issues with RESOURCE_EXCHAUSTED being thrown quite often. I posted the following on the Zeebe/Camunda Slack:

What I've seen debugging this on a local docker-compose Zeebe cluster (operate), is that – on a cluster running the exporters, I'll be able to run into RESOURCE_EXHAUSTED by triggering e.g. 128 jobs, then starting the worker (so it's in a situation where it can't process jobs as they're created, but has to catch up with a backlog of tasks).

The worker will poll 32 jobs (by default), process those, and submit the results (synchronously I think, since the grpc channel is shared between the threads that process those jobs).

When it goes to pull jobs again (ActivateJobs RPC), it will sometimes get the RESOURCE_EXHAUSTED back.

Is this expected, and should we see if we can put better backoff handling around this as in Pyzeebe. Right now, the process will eventually die, if it continuously tries to pull 32 jobs and get the error.

I have tried to find the cause for the error being thrown, but I don't see any logging on this; neither in gateway nor on the broker.

zell pointed me to how it's being done in c#, node, java:

Yes the client should implement something like exponential backoff, this is already done by other clients.
In java: https://github.com/camunda-cloud/zeebe/blob/develop/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerImpl.java#L194
In c#: https://github.com/camunda-community-hub/zeebe-client-csharp/blob/master/Client/Impl/Worker/JobWorker.cs#L148
In node: https://github.com/camunda-community-hub/zeebe-client-node-js#client-side-grpc-retry-in-zbclient

Studying the c# example,

https://github.com/camunda-community-hub/zeebe-client-csharp/blob/e4d77965cbfc678baa11abc3a29a8c82fa403798/Client/Impl/Misc/TransientGrpcErrorRetryStrategy.cs#L20-L44

I notice that the Pyzeebe approach is a bit different, e.g.

def complete_job(self, job_key: int, variables: Dict) -> CompleteJobResponse:
try:
return self._gateway_stub.CompleteJob(CompleteJobRequest(jobKey=job_key, variables=json.dumps(variables)))
except grpc.RpcError as rpc_error:
if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND):
raise JobNotFound(job_key=job_key)
elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION):
raise JobAlreadyDeactivated(job_key=job_key)
else:
self._common_zeebe_grpc_errors(rpc_error)

I.e. if the worker receives a RESOURCE_EXHAUSTED at this point, there's no way to catch and retry it. I think we should look into adding a retry strategy similar to the one in TransientGrpcErrorRetryStrategy.cs linked above.

@EyalCMX
Copy link
Contributor

EyalCMX commented Jul 17, 2022

@Chadys @JonatanMartens @kbakk
hi we have this issue with Camunda cloud, any solution for this?

we are working with
channel = create_camunda_cloud_channel(
client_id=WORKFLOW_ZEEBE_AUTH_CLIENT_ID,
client_secret=WORKFLOW_ZEEBE_AUTH_CLIENT_SECRET,
cluster_id=cluster_id
)

and we get lots of exceptions
"Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."
then after a while its stop but zeebe client dont get any task
its look like its not really connected but its not trying to reconnect any more

when we use
channel = create_insecure_channel(hostname=ZEEBE_GATEWAY_HOSTNAME, port=ZEEBE_GATEWAY_PORT)

we dont have this problem

we also have java client and in java we rarely see this exception and it always manages to connect at the end

anyone can point us to the problem?

example for logs we get(we get it much more)
{"time": "2022-07-12T11:40:47Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T11:50:48Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T11:51:38Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T12:00:57Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T12:01:07Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T12:08:10Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T12:14:42Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T12:15:51Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}
{"time": "2022-07-12T12:19:21Z", "level": "WARNING", "pid": 1, "thread": 140004807083840, "loggerName": "pyzeebe.worker.job_poller", "message": "Failed to activate jobs from the gateway. Exception: ZeebeBackPressureError(). Retrying in 5 seconds..."}

@npepinpe
Copy link

npepinpe commented Apr 19, 2023

I have some good news :)

You can use the built-in gRPC client retry mechanism, which is described in more details here

Here's a quick example:

retryPolicy = json.dumps(
    {
        "methodConfig": [
            {
                "name": [{"service": "gateway_protocol.Gateway"}],
                "retryPolicy": {
                    "maxAttempts": 5,
                    "initialBackoff": "0.1s",
                    "maxBackoff": "10s",
                    "backoffMultiplier": 2,
                    "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED"],
                },
            }
        ]
    }
)

grpc_channel = create_insecure_channel(hostname="localhost", port=26500, channel_options={"grpc.service_config": retryPolicy})

This will retry all requests returning "UNAVAILABLE" or "RESOURCE_EXHAUSTED". You can assign specific policies on a per method basis as well, e.g.

{
    "methodConfig": [
        {
            "name": [{"service": "gateway_protocol.Gateway"}],
            "retryPolicy": {
                "maxAttempts": 5,
                "initialBackoff": "1s",
                "maxBackoff": "10s",
                "backoffMultiplier": 2,
                "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED"],
            },
        },
        {
            "name": [{"service": "gateway_protocol.Gateway", "method": "Topology"}],
            "retryPolicy": {
                "maxAttempts": 2,
                "initialBackoff": "0.1s",
                "maxBackoff": "5s",
                "backoffMultiplier": 2,
                "retryableStatusCodes": ["UNAVAILABLE"],
            },
        }
    ]
}

I've tested it with request time outs as well. If you assign a request time out directly on the client RPC call, then that time out spans all the retries. Meaning if I set a request time out of 5 seconds, it will return DEADLINE_EXCEEDED before the max retry attempts is reached.

When all your retries are up, you will get the most recent error. So this doesn't fully solve the issues with the job poller. We would likely want to retry forever there, just keep polling, and only logging perhaps throttled warnings when things are not working (ideally configurable). For the job poller, maybe catching the errors and retrying them is best. There we can do something like the Java job worker does.

Hope this helps!

@npepinpe
Copy link

npepinpe commented Apr 19, 2023

As this may be useful for others, I can expand a bit on what's safe to retry or not.

UNAVAILABLE and RESOURCE_EXHAUSTED errors are always safe to retry. These have definitely not been processed by the server.

The following errors should never be retried:

  • INVALID_ARGUMENT: this means your request is wrong, so there's no point retrying. The message should have details on what to do there.
  • NOT_FOUND: this means the entity on which the command was to operate does not exist. There's no point in retrying the request. This is technically an error, but it can also be used a sentinel value to detect when a previous command has succeeded. I'll get into that in a bit.
  • ALREADY_EXISTS: this means whatever the command wanted to do is already done, so no point in retrying.
  • PERMISSION_DENIED: your client is unauthorized, the request shouldn't be retried until the credentials have been refreshed.
  • FAILED_PRECONDITION: the command cannot be retried because it depends on something that is missing, i.e. the state of the system must be changed by the user (e.g. setting a new variable in the PI) before this should be retried.
  • OUT_OF_RANGE: we don't use this one.
  • UNIMPLEMENTED: self explanatory, but should not be retried either, as the server does not know how to deal with this command.
  • INTERNAL: 99% should not be retried. This is reserved for fatal server errors, so chances are someone should tell the user their system is operational again before the command is retried.
  • DATA_LOSS: we don't use it, but it typically denotes unrecoverable errors (e.g. your data is corrupted), so no retries there. But again, we don't use it.
  • UNAUTHENTICATED: same as PERMISSION_DENIED, no point in retrying until credentials are refreshed.

DEADLINE_EXCEEDED is a special case. This is typically thrown by the client itself after a time out expires. As such, we cannot tell if the command has been received by the server, and consequently will be processed by the server. We will never get a response for it, since the client has closed the connection (conceptually anyway). If this happens, only certain commands are safe to be retried. These are so-called idempotent commands (quick reminder, you can find the list of commands here ):

Here, when I say "safe", I mean it in terms of consistency.

Not safe to retry on DEADLINE_EXCEEDED:

  • BroadcastSignal: You may end up sending too many signals, triggering more events than expected.
  • CreateProcessInstance: may not be safe to retry depending on your use case, as you may end up creating multiple process instances.
  • CreateProcessInstanceWithResults: same as CreateProcessInstance.
  • ModifyProcessInstance: retrying a modification with activate instructions would activate additional tokens.

Sometimes safe to retry on DEADLINE_EXCEEDED:

  • PublishMessage: this is a special case. Publishing the same message without any ID will publish two messages, meaning there may be more correlations than desired. You can safely always retry if you use message unique IDs.
  • SetVariables: another special case. Sort of safe to retry, but keep in mind that if you retry much later, the overall state may have changed due to concurrent executing branches, other workers, etc. So it depends a bit on your process itself.

Generally safe to retry on DEADLINE_EXCEEDED:

  • ActivateJobs: activate jobs is stateless, so it's always safe to retry.
  • CancelProcessInstance: canceling a process is idempotent. If you try to cancel an already canceled or terminated process, you will get a NOT_FOUND error, at which point you know you've achieved what you wanted anyway.
  • CompleteJob: completing a job is generally idempotent. Either it succeeds (you completed it), it returns NOT_FOUND (indicating it was either completed or terminated - regardless, the job is done, so no need to retry), or it returns FAILED_PRECONDITION. The last one occurs if the job has been failed in the mean time, so you would need to resolve the associated incident or give it more retries manually before retrying the request.
  • EvaluateDecision: this will always return the same thing, so go nuts and retry it as often as you'd like.
  • DeployResource: deploying the same resource twice will simply do nothing, so safe to always retry.
  • FailJob: either this succeeds and the job is failed, or this returns NOT_FOUND, meaning the job has been canceled or completed - either way, no need to retry. So safe to retry if you get DEADLINE_EXCEEDED.
  • ResolveIncident: totally safe to retry. Either it eventually succeeds, or you get a NOT_FOUND eventually, meaning the incident does not exist anymore (e.g. has been resolved, or the PI cancelled, etc.)
  • ThrowError: perfectly safe to retry. Either it eventually succeeds, eventually returns NOT_FOUND (meaning the job has been completed/cancelled, so no need to send an error), or you get FAILED_PRECONDITION (at which point you should not retry, as mentioned above).
  • Topology: always safe to retry, all day every day.
  • UpdateJobRetries: also safe to retry - either it eventually succeeds, or it eventually returns a NOT_FOUND if the job has already been completed/cancelled.

So now we can talk about NOT_FOUND. There's a few commands where we can treat not found as "success". For example, when retrying a job complete command - if I send twice the same job complete command without waiting for the result, one of them will succeed, and one will return NOT_FOUND (since once completed, the job is deleted).

So one could use the following retry policy:

{
    "methodConfig": [
        {
            "name": [],
            "retryPolicy": {
                "maxAttempts": 5,
                "initialBackoff": "0.1s",
                "maxBackoff": "5s",
                "backoffMultiplier": 3,
                "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED"]
            }
        },
        {
            "name": [
              {"service": "gateway_protocol.Gateway", "method": "ActivateJobs"},
              {"service": "gateway_protocol.Gateway", "method": "CancelProcessInstance"},
              {"service": "gateway_protocol.Gateway", "method": "EvaluateDecision"},
              {"service": "gateway_protocol.Gateway", "method": "DeployResource"},
              {"service": "gateway_protocol.Gateway", "method": "FailJob"},
              {"service": "gateway_protocol.Gateway", "method": "ResolveIncident"},
              {"service": "gateway_protocol.Gateway", "method": "ThrowError"},
              {"service": "gateway_protocol.Gateway", "method": "Topology"},
              {"service": "gateway_protocol.Gateway", "method": "UpdateJobRetries"},
            ],
            "retryPolicy": {
                "maxAttempts": 5,
                "initialBackoff": "0.1s",
                "maxBackoff": "5s",
                "backoffMultiplier": 3,
                "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED", "DEADLINE_EXCEEDED"]
            }
        }
    ]
}

This will always retry UNAVAILABLE and RESOURCE_EXHAUSTED for all requests, up at 5 times, starting with a back off of 0.1s up to a max of 5s. For the requests which are safe to also retry with DEADLINE_EXCEEDED (as I listed above), it will do the same for UNAVAILABLE, RESOURCE_EXHAUSTED, and DEADLINE_EXCEEDED.

Note here that the DEADLINE_EXCEEDED is retried only if the server returned that. If the client is the one timing out the request, then obviously it won't be retried.

So you would create your channel as:

# create retry policy
retryPolicy = json.dumps(
    {
        "methodConfig": [
            {
                "name": [{"service": "gateway_protocol.Gateway"}],
                "retryPolicy": {
                    "maxAttempts": 5,
                    "initialBackoff": "0.1s",
                    "maxBackoff": "10s",
                    "backoffMultiplier": 4,
                    "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED"]
                }
            },
            {
                "name": [
                    {"service": "gateway_protocol.Gateway", "method": "ActivateJobs"},
                    {"service": "gateway_protocol.Gateway", "method": "CancelProcessInstance"},
                    {"service": "gateway_protocol.Gateway", "method": "EvaluateDecision"},
                    {"service": "gateway_protocol.Gateway", "method": "DeployResource"},
                    {"service": "gateway_protocol.Gateway", "method": "FailJob"},
                    {"service": "gateway_protocol.Gateway", "method": "ResolveIncident"},
                    {"service": "gateway_protocol.Gateway", "method": "ThrowError"},
                    {"service": "gateway_protocol.Gateway", "method": "Topology"},
                    {"service": "gateway_protocol.Gateway", "method": "UpdateJobRetries"},
                ],
                "retryPolicy": {
                    "maxAttempts": 5,
                    "initialBackoff": "0.1s",
                    "maxBackoff": "10s",
                    "backoffMultiplier": 5,
                    "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED", "DEADLINE_EXCEEDED"]
                }
            }
        ]
    }
)

# Create a zeebe client without credentials
grpc_channel = create_insecure_channel(hostname="localhost", port=26500, channel_options={"grpc.service_config": retryPolicy})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants