Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedules no longer working #15919

Open
rmnvncnt opened this issue Nov 4, 2024 · 5 comments
Open

Schedules no longer working #15919

rmnvncnt opened this issue Nov 4, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@rmnvncnt
Copy link

rmnvncnt commented Nov 4, 2024

Bug summary

Since I upgraded Prefect from v2 to v3, schedules on existing deployments seems not to work anymore. I can create or activate a new schedule, either from the UI or from my prefect.yaml file, but no upcoming flows will be scheduled.

In the following example, I created a new hourly schedule for this deployment, yet no upcoming runs are scheduled :

Capture d’écran 2024-11-04 à 16 38 11

I plan on wiping my internal database and upgrade to Prefect 3.1 but is there something I can do before doing that?

Version info

Version:             3.0.1
API version:         0.8.4
Python version:      3.11.7
Git commit:          c6b2ffe1
Built:               Fri, Sep 6, 2024 10:05 AM
OS/Arch:             darwin/arm64
Server type:         server
Pydantic version:    2.9.1
Integrations:
  prefect-slack:     0.3.0
  prefect-aws:       0.5.0
  prefect-docker:    0.6.1
  prefect-dask:      0.3.1

Additional context

No response

@rmnvncnt rmnvncnt added the bug Something isn't working label Nov 4, 2024
@desertaxle
Copy link
Member

Thanks for the bug report @rmnvncnt! Seems like there could be an issue with the scheduler. Do you see any error logs on your server?

@rmnvncnt
Copy link
Author

rmnvncnt commented Nov 4, 2024

Lots of them actually. Here is a sample :

| 2024-11-04 16:15:42.835 | 16:15:42.831 | ERROR   | prefect.server.services.scheduler - Unexpected error in: TimeoutError() |
| 2024-11-04 16:15:42.835 | Traceback (most recent call last): |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/loop_service.py", line 83, in start |
| 2024-11-04 16:15:42.835 | await self.run_once() |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:42.835 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 104, in run_once |
| 2024-11-04 16:15:42.835 | inserted_runs = await self._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:42.835 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 291, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:42.835 | return await models.deployments._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper |
| 2024-11-04 16:15:42.835 | return await func(db, *args, **kwargs)  # type: ignore |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/models/deployments.py", line 693, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:42.835 | inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all() |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute |
| 2024-11-04 16:15:42.835 | result = await greenlet_spawn( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn |
| 2024-11-04 16:15:42.835 | result = context.switch(value) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute |
| 2024-11-04 16:15:42.835 | return self._execute_internal( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal |
| 2024-11-04 16:15:42.835 | result: Result[Any] = compile_state_cls.orm_execute_statement( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement |
| 2024-11-04 16:15:42.835 | result = conn.execute( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute |
| 2024-11-04 16:15:42.835 | return meth( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection |
| 2024-11-04 16:15:42.835 | return connection._execute_clauseelement( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement |
| 2024-11-04 16:15:42.835 | ret = self._execute_context( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context |
| 2024-11-04 16:15:42.835 | return self._exec_single_context( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context |
| 2024-11-04 16:15:42.835 | self._handle_dbapi_exception( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception |
| 2024-11-04 16:15:42.835 | raise exc_info[1].with_traceback(exc_info[2]) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context |
| 2024-11-04 16:15:42.835 | self.dialect.do_execute( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute |
| 2024-11-04 16:15:42.835 | cursor.execute(statement, parameters) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute |
| 2024-11-04 16:15:42.835 | self._adapt_connection.await_( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only |
| 2024-11-04 16:15:42.835 | return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501 |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn |
| 2024-11-04 16:15:42.835 | value = await result |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute |
| 2024-11-04 16:15:42.835 | self._handle_exception(error) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception |
| 2024-11-04 16:15:42.835 | self._adapt_connection._handle_exception(error) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception |
| 2024-11-04 16:15:42.835 | raise error |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute |
| 2024-11-04 16:15:42.835 | self._rows = deque(await prepared_stmt.fetch(*parameters)) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch |
| 2024-11-04 16:15:42.835 | data = await self.__bind_execute(args, 0, timeout) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute |
| 2024-11-04 16:15:42.835 | data, status, _ = await self.__do_execute( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute |
| 2024-11-04 16:15:42.835 | return await executor(protocol) |
| 2024-11-04 16:15:42.835 | File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute |
| 2024-11-04 16:15:42.835 | asyncio.exceptions.TimeoutError |
| 2024-11-04 16:15:12.799 | 16:15:12.795 | ERROR   | prefect.server.services.recentdeploymentsscheduler - Unexpected error in: TimeoutError() |
| 2024-11-04 16:15:12.799 | Traceback (most recent call last): |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/loop_service.py", line 83, in start |
| 2024-11-04 16:15:12.799 | await self.run_once() |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:12.799 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 104, in run_once |
| 2024-11-04 16:15:12.799 | inserted_runs = await self._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:12.799 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 291, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:12.799 | return await models.deployments._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper |
| 2024-11-04 16:15:12.799 | return await func(db, *args, **kwargs)  # type: ignore |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/models/deployments.py", line 693, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:12.799 | inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all() |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute |
| 2024-11-04 16:15:12.799 | result = await greenlet_spawn( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn |
| 2024-11-04 16:15:12.799 | result = context.switch(value) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute |
| 2024-11-04 16:15:12.799 | return self._execute_internal( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal |
| 2024-11-04 16:15:12.799 | result: Result[Any] = compile_state_cls.orm_execute_statement( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement |
| 2024-11-04 16:15:12.799 | result = conn.execute( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute |
| 2024-11-04 16:15:12.799 | return meth( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection |
| 2024-11-04 16:15:12.799 | return connection._execute_clauseelement( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement |
| 2024-11-04 16:15:12.799 | ret = self._execute_context( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context |
| 2024-11-04 16:15:12.799 | return self._exec_single_context( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context |
| 2024-11-04 16:15:12.799 | self._handle_dbapi_exception( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception |
| 2024-11-04 16:15:12.799 | raise exc_info[1].with_traceback(exc_info[2]) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context |
| 2024-11-04 16:15:12.799 | self.dialect.do_execute( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute |
| 2024-11-04 16:15:12.799 | cursor.execute(statement, parameters) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute |
| 2024-11-04 16:15:12.799 | self._adapt_connection.await_( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only |
| 2024-11-04 16:15:12.799 | return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501 |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn |
| 2024-11-04 16:15:12.799 | value = await result |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute |
| 2024-11-04 16:15:12.799 | self._handle_exception(error) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception |
| 2024-11-04 16:15:12.799 | self._adapt_connection._handle_exception(error) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception |
| 2024-11-04 16:15:12.799 | raise error |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute |
| 2024-11-04 16:15:12.799 | self._rows = deque(await prepared_stmt.fetch(*parameters)) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch |
| 2024-11-04 16:15:12.799 | data = await self.__bind_execute(args, 0, timeout) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute |
| 2024-11-04 16:15:12.799 | data, status, _ = await self.__do_execute( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute |
| 2024-11-04 16:15:12.799 | return await executor(protocol) |
| 2024-11-04 16:15:12.799 | File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute |
| 2024-11-04 16:15:12.799 | asyncio.exceptions.TimeoutError |
| 2024-11-04 16:15:12.799 | 16:15:12.798 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 60.063469 seconds to run, which is longer than its loop interval of 5 seconds. |

@Arthurhussey
Copy link

I'm also seeing this in version 2.20.2

11:15:37.423 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 19.288165 seconds to run, which is longer than its loop interval of 5 seconds.

@Arthurhussey
Copy link

@rmnvncnt fyi this was happening to me because my postgresql db had grown to like 200GB - It seems to keep all state information of every run, for ever. I wrote this script to clear the db which has solved this (and other) issues

"""Flow to delete old entries from the database."""

import os

from prefect import flow, task, get_run_logger
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta


@task
def dbconnect():
    """Connect to the database."""
    log = get_run_logger()
    log.info("Connecting to database")
    connection_url = os.getenv("PREFECT_API_DATABASE_CONNECTION_URL")
    if not connection_url:
        raise ValueError(
            "Variable PREFECT_API_DATABASE_CONNECTION_URL is not set."
        )
    engine = create_engine(connection_url)
    return engine


def delete_old_entries(engine, table_name, date_column):
    """Delete old entries from the database."""
    log = get_run_logger()
    log.info(f"Deleting entries older than 7 days from {table_name}")
    seven_days_ago = datetime.now() - timedelta(days=7)
    query = text(
        f"""
        DELETE FROM {table_name}
        WHERE {date_column} < :seven_days_ago
        """
    )
    with engine.connect() as connection:
        with connection.begin():  # Begin a transaction
            result = connection.execute(
                query, {"seven_days_ago": seven_days_ago.isoformat()}
            )
            deleted_count = result.rowcount
            log.info(f"Deleted {deleted_count} entries from {table_name}")


@task
def delete_old_flow_entries(engine):
    """Delete old flow entries from the database."""
    delete_old_entries(engine, "flow_run_state", "timestamp")
    delete_old_entries(engine, "flow_run", "created")


@task
def delete_old_task_entries(engine):
    """Delete old task entries from the database."""
    delete_old_entries(engine, "task_run_state", "timestamp")
    delete_old_entries(engine, "task_run", "created")


@flow(name="DBFlow")
def run():
    """Run the flow to delete old entries from the database."""
    log = get_run_logger()

    engine = dbconnect()  # Retrieve the engine from dbconnect task
    log.info("Deleting old task entries")
    delete_old_task_entries(engine)  # Pass the engine to q1 task
    log.info("Deleting old flow entries")
    delete_old_flow_entries(engine)  # Pass the engine to q1 task


if __name__ == "__main__":
    run()

@rmnvncnt
Copy link
Author

rmnvncnt commented Nov 8, 2024

Indeed, these tables are getting huge :

prefect=> \d+
                                                 List of relations
 Schema |              Name              | Type  |  Owner  | Persistence | Access method |    Size    | Description 
--------+--------------------------------+-------+---------+-------------+---------------+------------+-------------
 public | agent                          | table | prefect | permanent   | heap          | 8192 bytes | 
 public | alembic_version                | table | prefect | permanent   | heap          | 40 kB      | 
 public | artifact                       | table | prefect | permanent   | heap          | 71 MB      | 
 public | artifact_collection            | table | prefect | permanent   | heap          | 8192 bytes | 
 public | automation                     | table | prefect | permanent   | heap          | 8192 bytes | 
 public | automation_bucket              | table | prefect | permanent   | heap          | 8192 bytes | 
 public | automation_event_follower      | table | prefect | permanent   | heap          | 24 kB      | 
 public | automation_related_resource    | table | prefect | permanent   | heap          | 8192 bytes | 
 public | block_document                 | table | prefect | permanent   | heap          | 304 kB     | 
 public | block_document_reference       | table | prefect | permanent   | heap          | 16 kB      | 
 public | block_schema                   | table | prefect | permanent   | heap          | 576 kB     | 
 public | block_schema_reference         | table | prefect | permanent   | heap          | 16 kB      | 
 public | block_type                     | table | prefect | permanent   | heap          | 192 kB     | 
 public | composite_trigger_child_firing | table | prefect | permanent   | heap          | 8192 bytes | 
 public | concurrency_limit              | table | prefect | permanent   | heap          | 56 kB      | 
 public | concurrency_limit_v2           | table | prefect | permanent   | heap          | 48 kB      | 
 public | configuration                  | table | prefect | permanent   | heap          | 16 kB      | 
 public | csrf_token                     | table | prefect | permanent   | heap          | 48 kB      | 
 public | deployment                     | table | prefect | permanent   | heap          | 31 MB      | 
 public | deployment_schedule            | table | prefect | permanent   | heap          | 48 kB      | 
 public | event_resources                | table | prefect | permanent   | heap          | 3046 MB    | 
 public | events                         | table | prefect | permanent   | heap          | 807 MB     | 
 public | flow                           | table | prefect | permanent   | heap          | 48 kB      | 
 public | flow_run                       | table | prefect | permanent   | heap          | 22 MB      | 
 public | flow_run_input                 | table | prefect | permanent   | heap          | 8192 bytes | 
 public | flow_run_notification_policy   | table | prefect | permanent   | heap          | 8192 bytes | 
 public | flow_run_notification_queue    | table | prefect | permanent   | heap          | 0 bytes    | 
 public | flow_run_state                 | table | prefect | permanent   | heap          | 5348 MB    | 
 public | log                            | table | prefect | permanent   | heap          | 651 MB     | 
 public | saved_search                   | table | prefect | permanent   | heap          | 8192 bytes | 
 public | task_run                       | table | prefect | permanent   | heap          | 373 MB     | 
 public | task_run_state                 | table | prefect | permanent   | heap          | 6308 MB    | 
 public | task_run_state_cache           | table | prefect | permanent   | heap          | 16 kB      | 
 public | variable                       | table | prefect | permanent   | heap          | 8192 bytes | 
 public | work_pool                      | table | prefect | permanent   | heap          | 264 kB     | 
 public | work_queue                     | table | prefect | permanent   | heap          | 752 kB     | 
 public | worker                         | table | prefect | permanent   | heap          | 1456 kB    | 

@desertaxle Is there a way to avoid events, event_resources, task_run_state and flow_run_state to become that big over time?

@Arthurhussey Thanks for your solution, it seems that it worked and scheduled runs now appear in the queues!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants