-
Notifications
You must be signed in to change notification settings - Fork 23
TimeoutError: Deadline exceeded on activities taking greating than 120 seconds #15
Comments
Hi @CGreenburg, I'm not able to reproduce this with the code below. Could you share some sample code that produces that error on your side. import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
await asyncio.sleep(200)
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello!", name)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main())
loop.run_forever() |
We migrated a pyspark/pandas application to a workflow activity and it takes longer than 2 minutes. That's where we've seen the deadline exceeded exception. I've been replicating it using |
You might want to look into https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor if your activity is CPU bound or if the APIs that you're using aren't async friendly. |
There was a thread about this in the past where I proposed creating an event loop per worker (running in its own thread) so that the activity code is free to block as it pleases but they feedback I got previously was that the library should do the bare minimum and allow the user to decide how to handle blocking calls. https://community.temporal.io/t/timeline-for-python-client-support/223/26 |
If you're able to, I would run my workflow and activities in separate workers. |
You can also try something like this: import asyncio
import logging
from datetime import timedelta
from threading import Thread
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
import time
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
time.sleep(150)
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello!", name)
async def worker_main(activities=False, workflows=False):
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
if activities:
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
if workflows:
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
def thread1():
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main(activities=True, workflows=False))
loop.run_forever()
def thread2():
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main(activities=False, workflows=True))
loop.run_forever()
if __name__ == '__main__':
t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join() |
Thanks for the suggestions @firdaus! In your thread example, it's okay that the two workers use the same task queue? They don't need to be different task queues? |
It's fine because there are separate GRPC calls behind the scenes for polling for workflow tasks and activity tasks so a workflow worker will never get the work of an activity worker. |
Here's the stack trace.
Here's what I've uncovered:
The python-sdk starts up two parallel threads:
Both threads use the same grpc channel to communicate with the temporal server with a timeout set to 120s. Concurrent RPC calls are supported according to the grpclib docs: https://grpclib.readthedocs.io/en/latest/client.html
The workflow thread polls the workflow task queue and the activity thread polls the activity task queue. Both take 60 seconds before continuing the while loop to poll again if nothing is returned. When the activity thread receives something on the activity task queue, it starts running the activity code. Meanwhile, the workflow thread is in the middle of polling the workflow task queue.
What I'm noticing is that the workflow poll request is "blocked" and doesn't return like it usually would after 60 seconds. The workflow poll request doesn't complete until the activity in the other thread is finished. If an activity takes long enough to complete, the workflow poll request can take more than 120 seconds (note the timeout mentioned earlier) causing a deadline exceeded error.
Solution:
A couple of temporary workaround:
OR
The text was updated successfully, but these errors were encountered: