Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brandon/cre 402 add input into flow kickoff #1552

Closed
wants to merge 12 commits into from
129 changes: 62 additions & 67 deletions docs/concepts/flows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,63 @@ Flows allow you to create structured, event-driven workflows. They provide a sea

4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.

5. **Input Flexibility**: Flows can accept inputs to initialize or update their state, with different handling for structured and unstructured state management.

## Getting Started

Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.

```python Code
### Passing Inputs to Flows

from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
Flows can accept inputs to initialize or update their state before execution. The way inputs are handled depends on whether the flow uses structured or unstructured state management.

#### Structured State Management

In structured state management, the flow's state is defined using a Pydantic `BaseModel`. Inputs must match the model's schema, and any updates will overwrite the default values.

```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleFlow(Flow):
model = "gpt-4o-mini"
class ExampleState(BaseModel):
counter: int = 0
message: str = ""

class StructuredExampleFlow(Flow[ExampleState]):
@start()
def generate_city(self):
print("Starting flow")

response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
def first_method(self):
# Implementation

random_city = response["choices"][0]["message"]["content"]
print(f"Random City: {random_city}")

return random_city

@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
flow = StructuredExampleFlow()
flow.kickoff(inputs={"counter": 10})
```

In this example, the `counter` is initialized to `10`, while `message` retains its default value.

#### Unstructured State Management

In unstructured state management, the flow's state is a dictionary. You can pass any dictionary to update the state.

```python
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# Implementation

fun_fact = response["choices"][0]["message"]["content"]
return fun_fact
flow = UnstructuredExampleFlow()
flow.kickoff(inputs={"counter": 5, "message": "Initial message"})
```

Here, both `counter` and `message` are updated based on the provided inputs.

**Note:** Ensure that inputs for structured state management adhere to the defined schema to avoid validation errors.

flow = ExampleFlow()
result = flow.kickoff()
### Example Flow

print(f"Generated fun fact: {result}")
```python
# Existing example code
```

In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
Expand All @@ -94,14 +97,14 @@ The `@listen()` decorator can be used in several ways:

1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.

```python Code
```python
@listen("generate_city")
def generate_fun_fact(self, random_city):
# Implementation
```

2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
```python Code
```python
@listen(generate_city)
def generate_fun_fact(self, random_city):
# Implementation
Expand All @@ -118,7 +121,7 @@ When you run a Flow, the final output is determined by the last method that comp
Here's how you can access the final output:

<CodeGroup>
```python Code
```python
from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
Expand All @@ -130,18 +133,17 @@ class OutputExampleFlow(Flow):
def second_method(self, first_output):
return f"Second method received: {first_output}"


flow = OutputExampleFlow()
final_output = flow.kickoff()

print("---- Final Output ----")
print(final_output)
````
```

``` text Output
```text
---- Final Output ----
Second method received: Output from first_method
````
```

</CodeGroup>

Expand All @@ -156,7 +158,7 @@ Here's an example of how to update and access the state:

<CodeGroup>

```python Code
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

Expand Down Expand Up @@ -184,7 +186,7 @@ print("Final State:")
print(flow.state)
```

```text Output
```text
Final Output: Hello from first_method - updated by second_method
Final State:
counter=2 message='Hello from first_method - updated by second_method'
Expand All @@ -208,10 +210,10 @@ allowing developers to choose the approach that best fits their application's ne
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.

```python Code
```python
from crewai.flow.flow import Flow, listen, start

class UntructuredExampleFlow(Flow):
class UnstructuredExampleFlow(Flow):

@start()
def first_method(self):
Expand All @@ -230,8 +232,7 @@ class UntructuredExampleFlow(Flow):

print(f"State after third_method: {self.state}")


flow = UntructuredExampleFlow()
flow = UnstructuredExampleFlow()
flow.kickoff()
```

Expand All @@ -245,16 +246,14 @@ flow.kickoff()
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.

```python Code
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
counter: int = 0
message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

@start()
Expand All @@ -273,7 +272,6 @@ class StructuredExampleFlow(Flow[ExampleState]):

print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
```
Expand Down Expand Up @@ -307,7 +305,7 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger

<CodeGroup>

```python Code
```python
from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):
Expand All @@ -324,13 +322,11 @@ class OrExampleFlow(Flow):
def logger(self, result):
print(f"Logger: {result}")



flow = OrExampleFlow()
flow.kickoff()
```

```text Output
```text
Logger: Hello from the start method
Logger: Hello from the second method
```
Expand All @@ -346,7 +342,7 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge

<CodeGroup>

```python Code
```python
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):
Expand All @@ -368,7 +364,7 @@ flow = AndExampleFlow()
flow.kickoff()
```

```text Output
```text
---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
```
Expand All @@ -385,7 +381,7 @@ You can specify different routes based on the output of the method, allowing you

<CodeGroup>

```python Code
```python
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
Expand Down Expand Up @@ -416,12 +412,11 @@ class RouterFlow(Flow[ExampleState]):
def fourth_method(self):
print("Fourth method running")


flow = RouterFlow()
flow.kickoff()
```

```text Output
```text
Starting the structured flow
Third method running
Fourth method running
Expand Down Expand Up @@ -484,7 +479,7 @@ The `main.py` file is where you create your flow and connect the crews together.

Here's an example of how you can connect the `poem_crew` in the `main.py` file:

```python Code
```python
#!/usr/bin/env python
from random import randint

Expand Down Expand Up @@ -612,7 +607,7 @@ CrewAI provides two convenient methods to generate plots of your flows:

If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow.

```python Code
```python
# Assuming you have a flow instance
flow.plot("my_flow_plot")
```
Expand Down
45 changes: 40 additions & 5 deletions src/crewai/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.cli.constants import ENV_VARS
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.tools.agent_tools.agent_tools import AgentTools
Expand Down Expand Up @@ -131,8 +132,12 @@ def post_init_setup(self):
# If it's already an LLM instance, keep it as is
pass
elif self.llm is None:
# If it's None, use environment variables or default
model_name = os.environ.get("OPENAI_MODEL_NAME", "gpt-4o-mini")
# Determine the model name from environment variables or use default
model_name = (
os.environ.get("OPENAI_MODEL_NAME")
or os.environ.get("MODEL")
or "gpt-4o-mini"
)
llm_params = {"model": model_name}

api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get(
Expand All @@ -141,9 +146,39 @@ def post_init_setup(self):
if api_base:
llm_params["base_url"] = api_base

api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
llm_params["api_key"] = api_key
# Iterate over all environment variables to find matching API keys or use defaults
for provider, env_vars in ENV_VARS.items():
for env_var in env_vars:
# Check if the environment variable is set
if "key_name" in env_var:
env_value = os.environ.get(env_var["key_name"])
if env_value:
# Map key names containing "API_KEY" to "api_key"
key_name = (
"api_key"
if "API_KEY" in env_var["key_name"]
else env_var["key_name"]
)
# Map key names containing "API_BASE" to "api_base"
key_name = (
"api_base"
if "API_BASE" in env_var["key_name"]
else key_name
)
# Map key names containing "API_VERSION" to "api_version"
key_name = (
"api_version"
if "API_VERSION" in env_var["key_name"]
else key_name
)
llm_params[key_name] = env_value
# Check for default values if the environment variable is not set
elif env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
# Only add default if the key is already set in os.environ
if key in os.environ:
llm_params[key] = value

self.llm = LLM(**llm_params)
else:
Expand Down
Loading
Loading