diff --git a/docs/concepts/flows.mdx b/docs/concepts/flows.mdx index e3f0e6b451..9ead26d70b 100644 --- a/docs/concepts/flows.mdx +++ b/docs/concepts/flows.mdx @@ -18,60 +18,63 @@ Flows allow you to create structured, event-driven workflows. They provide a sea 4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows. +5. **Input Flexibility**: Flows can accept inputs to initialize or update their state, with different handling for structured and unstructured state management. + ## Getting Started Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task. -```python Code +### Passing Inputs to Flows -from crewai.flow.flow import Flow, listen, start -from dotenv import load_dotenv -from litellm import completion +Flows can accept inputs to initialize or update their state before execution. The way inputs are handled depends on whether the flow uses structured or unstructured state management. + +#### Structured State Management +In structured state management, the flow's state is defined using a Pydantic `BaseModel`. Inputs must match the model's schema, and any updates will overwrite the default values. + +```python +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel -class ExampleFlow(Flow): - model = "gpt-4o-mini" +class ExampleState(BaseModel): + counter: int = 0 + message: str = "" +class StructuredExampleFlow(Flow[ExampleState]): @start() - def generate_city(self): - print("Starting flow") - - response = completion( - model=self.model, - messages=[ - { - "role": "user", - "content": "Return the name of a random city in the world.", - }, - ], - ) + def first_method(self): + # Implementation - random_city = response["choices"][0]["message"]["content"] - print(f"Random City: {random_city}") - - return random_city - - @listen(generate_city) - def generate_fun_fact(self, random_city): - response = completion( - model=self.model, - messages=[ - { - "role": "user", - "content": f"Tell me a fun fact about {random_city}", - }, - ], - ) +flow = StructuredExampleFlow() +flow.kickoff(inputs={"counter": 10}) +``` + +In this example, the `counter` is initialized to `10`, while `message` retains its default value. + +#### Unstructured State Management + +In unstructured state management, the flow's state is a dictionary. You can pass any dictionary to update the state. + +```python +from crewai.flow.flow import Flow, listen, start + +class UnstructuredExampleFlow(Flow): + @start() + def first_method(self): + # Implementation - fun_fact = response["choices"][0]["message"]["content"] - return fun_fact +flow = UnstructuredExampleFlow() +flow.kickoff(inputs={"counter": 5, "message": "Initial message"}) +``` +Here, both `counter` and `message` are updated based on the provided inputs. +**Note:** Ensure that inputs for structured state management adhere to the defined schema to avoid validation errors. -flow = ExampleFlow() -result = flow.kickoff() +### Example Flow -print(f"Generated fun fact: {result}") +```python +# Existing example code ``` In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task. @@ -94,14 +97,14 @@ The `@listen()` decorator can be used in several ways: 1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered. - ```python Code + ```python @listen("generate_city") def generate_fun_fact(self, random_city): # Implementation ``` 2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered. - ```python Code + ```python @listen(generate_city) def generate_fun_fact(self, random_city): # Implementation @@ -118,7 +121,7 @@ When you run a Flow, the final output is determined by the last method that comp Here's how you can access the final output: -```python Code +```python from crewai.flow.flow import Flow, listen, start class OutputExampleFlow(Flow): @@ -130,18 +133,17 @@ class OutputExampleFlow(Flow): def second_method(self, first_output): return f"Second method received: {first_output}" - flow = OutputExampleFlow() final_output = flow.kickoff() print("---- Final Output ----") print(final_output) -```` +``` -``` text Output +```text ---- Final Output ---- Second method received: Output from first_method -```` +``` @@ -156,7 +158,7 @@ Here's an example of how to update and access the state: -```python Code +```python from crewai.flow.flow import Flow, listen, start from pydantic import BaseModel @@ -184,7 +186,7 @@ print("Final State:") print(flow.state) ``` -```text Output +```text Final Output: Hello from first_method - updated by second_method Final State: counter=2 message='Hello from first_method - updated by second_method' @@ -208,10 +210,10 @@ allowing developers to choose the approach that best fits their application's ne In unstructured state management, all state is stored in the `state` attribute of the `Flow` class. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema. -```python Code +```python from crewai.flow.flow import Flow, listen, start -class UntructuredExampleFlow(Flow): +class UnstructuredExampleFlow(Flow): @start() def first_method(self): @@ -230,8 +232,7 @@ class UntructuredExampleFlow(Flow): print(f"State after third_method: {self.state}") - -flow = UntructuredExampleFlow() +flow = UnstructuredExampleFlow() flow.kickoff() ``` @@ -245,16 +246,14 @@ flow.kickoff() Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments. -```python Code +```python from crewai.flow.flow import Flow, listen, start from pydantic import BaseModel - class ExampleState(BaseModel): counter: int = 0 message: str = "" - class StructuredExampleFlow(Flow[ExampleState]): @start() @@ -273,7 +272,6 @@ class StructuredExampleFlow(Flow[ExampleState]): print(f"State after third_method: {self.state}") - flow = StructuredExampleFlow() flow.kickoff() ``` @@ -307,7 +305,7 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger -```python Code +```python from crewai.flow.flow import Flow, listen, or_, start class OrExampleFlow(Flow): @@ -324,13 +322,11 @@ class OrExampleFlow(Flow): def logger(self, result): print(f"Logger: {result}") - - flow = OrExampleFlow() flow.kickoff() ``` -```text Output +```text Logger: Hello from the start method Logger: Hello from the second method ``` @@ -346,7 +342,7 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge -```python Code +```python from crewai.flow.flow import Flow, and_, listen, start class AndExampleFlow(Flow): @@ -368,7 +364,7 @@ flow = AndExampleFlow() flow.kickoff() ``` -```text Output +```text ---- Logger ---- {'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'} ``` @@ -385,7 +381,7 @@ You can specify different routes based on the output of the method, allowing you -```python Code +```python import random from crewai.flow.flow import Flow, listen, router, start from pydantic import BaseModel @@ -416,12 +412,11 @@ class RouterFlow(Flow[ExampleState]): def fourth_method(self): print("Fourth method running") - flow = RouterFlow() flow.kickoff() ``` -```text Output +```text Starting the structured flow Third method running Fourth method running @@ -484,7 +479,7 @@ The `main.py` file is where you create your flow and connect the crews together. Here's an example of how you can connect the `poem_crew` in the `main.py` file: -```python Code +```python #!/usr/bin/env python from random import randint @@ -612,7 +607,7 @@ CrewAI provides two convenient methods to generate plots of your flows: If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow. -```python Code +```python # Assuming you have a flow instance flow.plot("my_flow_plot") ``` diff --git a/src/crewai/agent.py b/src/crewai/agent.py index f2a0a5c310..d307468082 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -8,6 +8,7 @@ from crewai.agents import CacheHandler from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.crew_agent_executor import CrewAgentExecutor +from crewai.cli.constants import ENV_VARS from crewai.llm import LLM from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.tools.agent_tools.agent_tools import AgentTools @@ -131,8 +132,12 @@ def post_init_setup(self): # If it's already an LLM instance, keep it as is pass elif self.llm is None: - # If it's None, use environment variables or default - model_name = os.environ.get("OPENAI_MODEL_NAME", "gpt-4o-mini") + # Determine the model name from environment variables or use default + model_name = ( + os.environ.get("OPENAI_MODEL_NAME") + or os.environ.get("MODEL") + or "gpt-4o-mini" + ) llm_params = {"model": model_name} api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get( @@ -141,9 +146,39 @@ def post_init_setup(self): if api_base: llm_params["base_url"] = api_base - api_key = os.environ.get("OPENAI_API_KEY") - if api_key: - llm_params["api_key"] = api_key + # Iterate over all environment variables to find matching API keys or use defaults + for provider, env_vars in ENV_VARS.items(): + for env_var in env_vars: + # Check if the environment variable is set + if "key_name" in env_var: + env_value = os.environ.get(env_var["key_name"]) + if env_value: + # Map key names containing "API_KEY" to "api_key" + key_name = ( + "api_key" + if "API_KEY" in env_var["key_name"] + else env_var["key_name"] + ) + # Map key names containing "API_BASE" to "api_base" + key_name = ( + "api_base" + if "API_BASE" in env_var["key_name"] + else key_name + ) + # Map key names containing "API_VERSION" to "api_version" + key_name = ( + "api_version" + if "API_VERSION" in env_var["key_name"] + else key_name + ) + llm_params[key_name] = env_value + # Check for default values if the environment variable is not set + elif env_var.get("default", False): + for key, value in env_var.items(): + if key not in ["prompt", "key_name", "default"]: + # Only add default if the key is already set in os.environ + if key in os.environ: + llm_params[key] = value self.llm = LLM(**llm_params) else: diff --git a/src/crewai/cli/constants.py b/src/crewai/cli/constants.py index 9a0b36c396..4be08fa2a3 100644 --- a/src/crewai/cli/constants.py +++ b/src/crewai/cli/constants.py @@ -1,19 +1,168 @@ ENV_VARS = { - 'openai': ['OPENAI_API_KEY'], - 'anthropic': ['ANTHROPIC_API_KEY'], - 'gemini': ['GEMINI_API_KEY'], - 'groq': ['GROQ_API_KEY'], - 'ollama': ['FAKE_KEY'], + "openai": [ + { + "prompt": "Enter your OPENAI API key (press Enter to skip)", + "key_name": "OPENAI_API_KEY", + } + ], + "anthropic": [ + { + "prompt": "Enter your ANTHROPIC API key (press Enter to skip)", + "key_name": "ANTHROPIC_API_KEY", + } + ], + "gemini": [ + { + "prompt": "Enter your GEMINI API key (press Enter to skip)", + "key_name": "GEMINI_API_KEY", + } + ], + "groq": [ + { + "prompt": "Enter your GROQ API key (press Enter to skip)", + "key_name": "GROQ_API_KEY", + } + ], + "watson": [ + { + "prompt": "Enter your WATSONX URL (press Enter to skip)", + "key_name": "WATSONX_URL", + }, + { + "prompt": "Enter your WATSONX API Key (press Enter to skip)", + "key_name": "WATSONX_APIKEY", + }, + { + "prompt": "Enter your WATSONX Project Id (press Enter to skip)", + "key_name": "WATSONX_PROJECT_ID", + }, + ], + "ollama": [ + { + "default": True, + "API_BASE": "http://localhost:11434", + } + ], + "bedrock": [ + { + "prompt": "Enter your AWS Access Key ID (press Enter to skip)", + "key_name": "AWS_ACCESS_KEY_ID", + }, + { + "prompt": "Enter your AWS Secret Access Key (press Enter to skip)", + "key_name": "AWS_SECRET_ACCESS_KEY", + }, + { + "prompt": "Enter your AWS Region Name (press Enter to skip)", + "key_name": "AWS_REGION_NAME", + }, + ], + "azure": [ + { + "prompt": "Enter your Azure deployment name (must start with 'azure/')", + "key_name": "model", + }, + { + "prompt": "Enter your AZURE API key (press Enter to skip)", + "key_name": "AZURE_API_KEY", + }, + { + "prompt": "Enter your AZURE API base URL (press Enter to skip)", + "key_name": "AZURE_API_BASE", + }, + { + "prompt": "Enter your AZURE API version (press Enter to skip)", + "key_name": "AZURE_API_VERSION", + }, + ], + "cerebras": [ + { + "prompt": "Enter your Cerebras model name (must start with 'cerebras/')", + "key_name": "model", + }, + { + "prompt": "Enter your Cerebras API version (press Enter to skip)", + "key_name": "CEREBRAS_API_KEY", + }, + ], } -PROVIDERS = ['openai', 'anthropic', 'gemini', 'groq', 'ollama'] + +PROVIDERS = [ + "openai", + "anthropic", + "gemini", + "groq", + "ollama", + "watson", + "bedrock", + "azure", + "cerebras", +] MODELS = { - 'openai': ['gpt-4', 'gpt-4o', 'gpt-4o-mini', 'o1-mini', 'o1-preview'], - 'anthropic': ['claude-3-5-sonnet-20240620', 'claude-3-sonnet-20240229', 'claude-3-opus-20240229', 'claude-3-haiku-20240307'], - 'gemini': ['gemini-1.5-flash', 'gemini-1.5-pro', 'gemini-gemma-2-9b-it', 'gemini-gemma-2-27b-it'], - 'groq': ['llama-3.1-8b-instant', 'llama-3.1-70b-versatile', 'llama-3.1-405b-reasoning', 'gemma2-9b-it', 'gemma-7b-it'], - 'ollama': ['llama3.1', 'mixtral'], + "openai": ["gpt-4", "gpt-4o", "gpt-4o-mini", "o1-mini", "o1-preview"], + "anthropic": [ + "claude-3-5-sonnet-20240620", + "claude-3-sonnet-20240229", + "claude-3-opus-20240229", + "claude-3-haiku-20240307", + ], + "gemini": [ + "gemini/gemini-1.5-flash", + "gemini/gemini-1.5-pro", + "gemini/gemini-gemma-2-9b-it", + "gemini/gemini-gemma-2-27b-it", + ], + "groq": [ + "groq/llama-3.1-8b-instant", + "groq/llama-3.1-70b-versatile", + "groq/llama-3.1-405b-reasoning", + "groq/gemma2-9b-it", + "groq/gemma-7b-it", + ], + "ollama": ["ollama/llama3.1", "ollama/mixtral"], + "watson": [ + "watsonx/google/flan-t5-xxl", + "watsonx/google/flan-ul2", + "watsonx/bigscience/mt0-xxl", + "watsonx/eleutherai/gpt-neox-20b", + "watsonx/ibm/mpt-7b-instruct2", + "watsonx/bigcode/starcoder", + "watsonx/meta-llama/llama-2-70b-chat", + "watsonx/meta-llama/llama-2-13b-chat", + "watsonx/ibm/granite-13b-instruct-v1", + "watsonx/ibm/granite-13b-chat-v1", + "watsonx/google/flan-t5-xl", + "watsonx/ibm/granite-13b-chat-v2", + "watsonx/ibm/granite-13b-instruct-v2", + "watsonx/elyza/elyza-japanese-llama-2-7b-instruct", + "watsonx/ibm-mistralai/mixtral-8x7b-instruct-v01-q", + ], + "bedrock": [ + "bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0", + "bedrock/anthropic.claude-3-sonnet-20240229-v1:0", + "bedrock/anthropic.claude-3-haiku-20240307-v1:0", + "bedrock/anthropic.claude-3-opus-20240229-v1:0", + "bedrock/anthropic.claude-v2:1", + "bedrock/anthropic.claude-v2", + "bedrock/anthropic.claude-instant-v1", + "bedrock/meta.llama3-1-405b-instruct-v1:0", + "bedrock/meta.llama3-1-70b-instruct-v1:0", + "bedrock/meta.llama3-1-8b-instruct-v1:0", + "bedrock/meta.llama3-70b-instruct-v1:0", + "bedrock/meta.llama3-8b-instruct-v1:0", + "bedrock/amazon.titan-text-lite-v1", + "bedrock/amazon.titan-text-express-v1", + "bedrock/cohere.command-text-v14", + "bedrock/ai21.j2-mid-v1", + "bedrock/ai21.j2-ultra-v1", + "bedrock/ai21.jamba-instruct-v1:0", + "bedrock/meta.llama2-13b-chat-v1", + "bedrock/meta.llama2-70b-chat-v1", + "bedrock/mistral.mistral-7b-instruct-v0:2", + "bedrock/mistral.mixtral-8x7b-instruct-v0:1", + ], } -JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json" \ No newline at end of file +JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json" diff --git a/src/crewai/cli/create_crew.py b/src/crewai/cli/create_crew.py index 5767b82a1f..06440d74e9 100644 --- a/src/crewai/cli/create_crew.py +++ b/src/crewai/cli/create_crew.py @@ -1,11 +1,11 @@ +import shutil import sys from pathlib import Path import click -from crewai.cli.constants import ENV_VARS +from crewai.cli.constants import ENV_VARS, MODELS from crewai.cli.provider import ( - PROVIDERS, get_provider_data, select_model, select_provider, @@ -29,20 +29,20 @@ def create_folder_structure(name, parent_folder=None): click.secho("Operation cancelled.", fg="yellow") sys.exit(0) click.secho(f"Overriding folder {folder_name}...", fg="green", bold=True) - else: - click.secho( - f"Creating {'crew' if parent_folder else 'folder'} {folder_name}...", - fg="green", - bold=True, - ) - - if not folder_path.exists(): - folder_path.mkdir(parents=True) - (folder_path / "tests").mkdir(exist_ok=True) - if not parent_folder: - (folder_path / "src" / folder_name).mkdir(parents=True) - (folder_path / "src" / folder_name / "tools").mkdir(parents=True) - (folder_path / "src" / folder_name / "config").mkdir(parents=True) + shutil.rmtree(folder_path) # Delete the existing folder and its contents + + click.secho( + f"Creating {'crew' if parent_folder else 'folder'} {folder_name}...", + fg="green", + bold=True, + ) + + folder_path.mkdir(parents=True) + (folder_path / "tests").mkdir(exist_ok=True) + if not parent_folder: + (folder_path / "src" / folder_name).mkdir(parents=True) + (folder_path / "src" / folder_name / "tools").mkdir(parents=True) + (folder_path / "src" / folder_name / "config").mkdir(parents=True) return folder_path, folder_name, class_name @@ -92,7 +92,10 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None): existing_provider = None for provider, env_keys in ENV_VARS.items(): - if any(key in env_vars for key in env_keys): + if any( + "key_name" in details and details["key_name"] in env_vars + for details in env_keys + ): existing_provider = provider break @@ -118,47 +121,48 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None): "No provider selected. Please try again or press 'q' to exit.", fg="red" ) - while True: - selected_model = select_model(selected_provider, provider_models) - if selected_model is None: # User typed 'q' - click.secho("Exiting...", fg="yellow") - sys.exit(0) - if selected_model: # Valid selection - break - click.secho( - "No model selected. Please try again or press 'q' to exit.", fg="red" - ) - - if selected_provider in PROVIDERS: - api_key_var = ENV_VARS[selected_provider][0] - else: - api_key_var = click.prompt( - f"Enter the environment variable name for your {selected_provider.capitalize()} API key", - type=str, - default="", - ) - - api_key_value = "" - click.echo( - f"Enter your {selected_provider.capitalize()} API key (press Enter to skip): ", - nl=False, - ) - try: - api_key_value = input() - except (KeyboardInterrupt, EOFError): - api_key_value = "" - - if api_key_value.strip(): - env_vars = {api_key_var: api_key_value} + # Check if the selected provider has predefined models + if selected_provider in MODELS and MODELS[selected_provider]: + while True: + selected_model = select_model(selected_provider, provider_models) + if selected_model is None: # User typed 'q' + click.secho("Exiting...", fg="yellow") + sys.exit(0) + if selected_model: # Valid selection + break + click.secho( + "No model selected. Please try again or press 'q' to exit.", + fg="red", + ) + env_vars["MODEL"] = selected_model + + # Check if the selected provider requires API keys + if selected_provider in ENV_VARS: + provider_env_vars = ENV_VARS[selected_provider] + for details in provider_env_vars: + if details.get("default", False): + # Automatically add default key-value pairs + for key, value in details.items(): + if key not in ["prompt", "key_name", "default"]: + env_vars[key] = value + elif "key_name" in details: + # Prompt for non-default key-value pairs + prompt = details["prompt"] + key_name = details["key_name"] + api_key_value = click.prompt(prompt, default="", show_default=False) + + if api_key_value.strip(): + env_vars[key_name] = api_key_value + + if env_vars: write_env_file(folder_path, env_vars) - click.secho("API key saved to .env file", fg="green") + click.secho("API keys and model saved to .env file", fg="green") else: click.secho( - "No API key provided. Skipping .env file creation.", fg="yellow" + "No API keys provided. Skipping .env file creation.", fg="yellow" ) - env_vars["MODEL"] = selected_model - click.secho(f"Selected model: {selected_model}", fg="green") + click.secho(f"Selected model: {env_vars.get('MODEL', 'N/A')}", fg="green") package_dir = Path(__file__).parent templates_dir = package_dir / "templates" / "crew" diff --git a/src/crewai/cli/templates/crew/crew.py b/src/crewai/cli/templates/crew/crew.py index f950d13d43..392e29edde 100644 --- a/src/crewai/cli/templates/crew/crew.py +++ b/src/crewai/cli/templates/crew/crew.py @@ -48,4 +48,4 @@ def crew(self) -> Crew: process=Process.sequential, verbose=True, # process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/ - ) \ No newline at end of file + ) diff --git a/src/crewai/cli/templates/crew/main.py b/src/crewai/cli/templates/crew/main.py index 88edfcbffc..d441fa0fa3 100644 --- a/src/crewai/cli/templates/crew/main.py +++ b/src/crewai/cli/templates/crew/main.py @@ -1,7 +1,11 @@ #!/usr/bin/env python import sys +import warnings + from {{folder_name}}.crew import {{crew_name}}Crew +warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd") + # This main file is intended to be a way for you to run your # crew locally, so refrain from adding unnecessary logic into this file. # Replace with inputs you want to test with, it will automatically diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index e7231e13f5..16f1cf9f0b 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -1,8 +1,20 @@ import asyncio import inspect -from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union - -from pydantic import BaseModel +from typing import ( + Any, + Callable, + Dict, + Generic, + List, + Optional, + Set, + Type, + TypeVar, + Union, + cast, +) + +from pydantic import BaseModel, ValidationError from crewai.flow.flow_visualizer import plot_flow from crewai.flow.utils import get_possible_return_constants @@ -191,10 +203,66 @@ def method_outputs(self) -> List[Any]: """Returns the list of all outputs from executed methods.""" return self._method_outputs - def kickoff(self) -> Any: + def _initialize_state(self, inputs: Dict[str, Any]) -> None: + """ + Initializes or updates the state with the provided inputs. + + Args: + inputs: Dictionary of inputs to initialize or update the state. + + Raises: + ValueError: If inputs do not match the structured state model. + TypeError: If state is neither a BaseModel instance nor a dictionary. + """ + if isinstance(self._state, BaseModel): + # Structured state management + try: + M = self._state.__class__ + + # Dynamically create a new model class with 'extra' set to 'forbid' + class ModelWithExtraForbid(M): + model_config = M.model_config.copy() + model_config["extra"] = "forbid" + + # Create a new instance using the combined state and inputs + self._state = cast( + T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs}) + ) + + except ValidationError as e: + raise ValueError(f"Invalid inputs for structured state: {e}") from e + elif isinstance(self._state, dict): + # Unstructured state management + self._state.update(inputs) + else: + raise TypeError("State must be a BaseModel instance or a dictionary.") + + def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any: + """ + Starts the execution of the flow synchronously. + + Args: + inputs: Optional dictionary of inputs to initialize or update the state. + + Returns: + The final output from the flow execution. + """ + if inputs is not None: + self._initialize_state(inputs) return asyncio.run(self.kickoff_async()) - async def kickoff_async(self) -> Any: + async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any: + """ + Starts the execution of the flow asynchronously. + + Args: + inputs: Optional dictionary of inputs to initialize or update the state. + + Returns: + The final output from the flow execution. + """ + if inputs is not None: + self._initialize_state(inputs) if not self._start_methods: raise ValueError("No start method defined")