Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to run experiments in parallel with different hyper params #1141

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: tests lint format build publish doctest integration_tests integration_tests_fast evals benchmark benchmark-fast

Check notice on line 1 in python/Makefile

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... create_5_000_run_trees: Mean +- std dev: 564 ms +- 42 ms ......................................... create_10_000_run_trees: Mean +- std dev: 1.11 sec +- 0.05 sec ......................................... create_20_000_run_trees: Mean +- std dev: 1.11 sec +- 0.06 sec ......................................... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 763 us +- 11 us ......................................... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 26.9 ms +- 0.3 ms ......................................... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 111 ms +- 3 ms ......................................... dumps_dataclass_nested_50x100: Mean +- std dev: 27.2 ms +- 0.3 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (6.91 ms) is 12% of the mean (58.1 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 58.1 ms +- 6.9 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (30.6 ms) is 14% of the mean (216 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydanticv1_nested_50x100: Mean +- std dev: 216 ms +- 31 ms

Check notice on line 1 in python/Makefile

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------+----------+------------------------+ | Benchmark | main | changes | +===================================+==========+========================+ | create_10_000_run_trees | 1.13 sec | 1.11 sec: 1.01x faster | +-----------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 26.8 ms | 26.9 ms: 1.00x slower | +-----------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x slower | +-----------------------------------+----------+------------------------+ Benchmark hidden because not significant (7): create_5_000_run_trees, create_20_000_run_trees, dumps_class_nested_py_branch_and_leaf_200x400, dumps_dataclass_nested_50x100, dumps_class_nested_py_leaf_100x200, dumps_pydantic_nested_50x100, dumps_pydanticv1_nested_50x100


OUTPUT ?= out/benchmark.json
Expand Down Expand Up @@ -36,7 +36,7 @@
poetry run python -m pytest -n auto --durations=10 --doctest-modules langsmith

evals:
poetry run python -m pytest tests/evaluation
poetry run python -m pytest tests/evaluation/test_evaluation.py::test_aevaluate_with_hyper_params
isahers1 marked this conversation as resolved.
Show resolved Hide resolved

lint:
poetry run ruff check .
Expand Down
174 changes: 139 additions & 35 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
import asyncio
import concurrent.futures as cf
import datetime
import itertools
import logging
import pathlib
import uuid
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -67,6 +70,7 @@ async def aevaluate(
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
hyper_params: Optional[Dict[str, List[Any]]] = None,
) -> AsyncExperimentResults:
r"""Evaluate an async target system or function on a given dataset.

Expand Down Expand Up @@ -95,6 +99,8 @@ async def aevaluate(
experiment (Optional[schemas.TracerSession]): An existing experiment to
extend. If provided, experiment_prefix is ignored. For advanced
usage only.
hyper_params (Optional[Dict]): A set of hyper parameters to run your target
function over. Will run over all possible combinations.

Returns:
AsyncIterator[ExperimentResultRow]: An async iterator over the experiment results.
Expand Down Expand Up @@ -244,6 +250,7 @@ async def aevaluate(
client=client,
blocking=blocking,
experiment=experiment,
hyper_params=hyper_params,
)


Expand Down Expand Up @@ -363,48 +370,145 @@ async def _aevaluate(
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
) -> AsyncExperimentResults:
hyper_params: Optional[Dict[str, List[Any]]] = None,
) -> AsyncExperimentResults | CombinedAsyncExperimentResults:
is_async_target = asyncio.iscoroutinefunction(target) or (
hasattr(target, "__aiter__") and asyncio.iscoroutine(target.__aiter__())
)
client = client or rt.get_cached_client()
runs = None if is_async_target else cast(Iterable[schemas.Run], target)
experiment_, runs = await aitertools.aio_to_thread(
_resolve_experiment,
experiment,
runs,
client,
)
manager = await _AsyncExperimentManager(
data,
client=client,
metadata=metadata,
experiment=experiment_ or experiment_prefix,
description=description,
num_repetitions=num_repetitions,
runs=runs,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
if cache_dir is not None:
dsid = await manager.get_dataset_id()
cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml"
if hyper_params is None:
runs = None if is_async_target else cast(Iterable[schemas.Run], target)
experiment_, runs = await aitertools.aio_to_thread(
_resolve_experiment,
experiment,
runs,
client,
)
manager = await _AsyncExperimentManager(
data,
client=client,
metadata=metadata,
experiment=experiment_ or experiment_prefix,
description=description,
num_repetitions=num_repetitions,
runs=runs,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
if cache_dir is not None:
dsid = await manager.get_dataset_id()
cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml"
else:
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
return results
else:
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
param_names = list(hyper_params.keys())
param_values = list(hyper_params.values())
param_combinations = [
dict(zip(param_names, combo)) for combo in itertools.product(*param_values)
]

managers = []
cache_dir = ls_utils.get_cache_dir(None)

for params in param_combinations:
param_str = "-".join(f"{k}={v}" for k, v in sorted(params.items()))
current_prefix = (
f"{experiment_prefix}-{param_str}"
if experiment_prefix
else f"experiment-{param_str}"
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency

# Create a unique cache path for each parameter combination
cache_path = (
pathlib.Path(cache_dir) / f"{current_prefix}.yaml"
if cache_dir
else None
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
return results

current_metadata = {**(metadata or {}), "hyper_params": params}

with ls_utils.with_optional_cache(
cache_path, ignore_hosts=[client.api_url]
):
manager = await _AsyncExperimentManager(
data,
client=client,
metadata=current_metadata,
experiment=current_prefix,
description=description,
num_repetitions=num_repetitions,
).astart()

if asyncio.iscoroutinefunction(target) or hasattr(target, "__aiter__"):
# Wrap target with current params
async def make_wrapped_target(fixed_params):
async def wrapped_target(inputs: dict) -> dict:
return await target(inputs, **fixed_params)

return wrapped_target

# And then use it like this:
wrapped_target = await make_wrapped_target(params.copy())

manager = await manager.awith_predictions(
wrapped_target, max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)

managers.append(manager)

return CombinedAsyncExperimentResults(managers)


class CombinedAsyncExperimentResults:
"""Container for multiple async experiment results from hyperparameter search."""

def __init__(
self,
managers: List[_AsyncExperimentManager],
):
"""Initialize the combined results."""
self.managers = managers
self._results = [AsyncExperimentResults(manager) for manager in managers]

async def wait(self) -> None:
"""Wait for all experiments to complete."""
await asyncio.gather(*(result.wait() for result in self._results))

def __getitem__(self, idx: int) -> AsyncExperimentResults:
"""Get results for a specific parameter combination."""
return self._results[idx]

def __len__(self) -> int:
"""Get the number of parameter combinations."""
return len(self._results)

def __iter__(self) -> Iterator[AsyncExperimentResults]:
"""Iterate over results for each parameter combination."""
return iter(self._results)

def __repr__(self) -> str:
"""Get string representation."""
return f"<CombinedAsyncExperimentResults with {len(self)} experiments>"


class _AsyncExperimentManager(_ExperimentManagerMixin):
Expand Down
Loading
Loading