Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fal_client): introduce priority for subscribe and submit #343

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions projects/fal_client/src/fal_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import base64
from dataclasses import dataclass, field
from functools import cached_property
from typing import Any, AsyncIterator, Iterator, TYPE_CHECKING, Optional
from typing import Any, AsyncIterator, Iterator, TYPE_CHECKING, Optional, Literal

import httpx
from httpx_sse import aconnect_sse, connect_sse
Expand All @@ -18,6 +18,7 @@
from PIL import Image

AnyJSON = dict[str, Any]
Priority = Literal["normal", "low"]

RUN_URL_FORMAT = f"https://{FAL_RUN_HOST}/"
QUEUE_URL_FORMAT = f"https://queue.{FAL_RUN_HOST}/"
Expand Down Expand Up @@ -317,6 +318,7 @@ async def submit(
path: str = "",
hint: str | None = None,
webhook_url: str | None = None,
priority: Optional[Priority] = None,
) -> AsyncRequestHandle:
"""Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to
specify a subpath when applicable. This method will return a handle to the request that can be used to check the status
Expand All @@ -333,6 +335,9 @@ async def submit(
if hint is not None:
headers["X-Fal-Runner-Hint"] = hint

if priority is not None:
headers["X-Fal-Queue-Priority"] = priority

response = await self._client.post(
url,
json=arguments,
Expand All @@ -359,8 +364,15 @@ async def subscribe(
with_logs: bool = False,
on_enqueue: Optional[callable[[Queued], None]] = None,
on_queue_update: Optional[callable[[Status], None]] = None,
priority: Optional[Priority] = None,
) -> AnyJSON:
handle = await self.submit(application, arguments, path=path, hint=hint)
handle = await self.submit(
application,
arguments,
path=path,
hint=hint,
priority=priority,
)

if on_enqueue is not None:
on_enqueue(handle.request_id)
Expand Down Expand Up @@ -501,6 +513,7 @@ def submit(
path: str = "",
hint: str | None = None,
webhook_url: str | None = None,
priority: Optional[Priority] = None,
) -> SyncRequestHandle:
"""Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to
specify a subpath when applicable. This method will return a handle to the request that can be used to check the status
Expand All @@ -517,6 +530,9 @@ def submit(
if hint is not None:
headers["X-Fal-Runner-Hint"] = hint

if priority is not None:
headers["X-Fal-Queue-Priority"] = priority

response = self._client.post(
url,
json=arguments,
Expand Down Expand Up @@ -544,8 +560,15 @@ def subscribe(
with_logs: bool = False,
on_enqueue: Optional[callable[[Queued], None]] = None,
on_queue_update: Optional[callable[[Status], None]] = None,
priority: Optional[Priority] = None,
) -> AnyJSON:
handle = self.submit(application, arguments, path=path, hint=hint)
handle = self.submit(
application,
arguments,
path=path,
hint=hint,
priority=priority,
)

if on_enqueue is not None:
on_enqueue(handle.request_id)
Expand Down
Loading