Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to run experiments in parallel with different hyper params #1141

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 153 additions & 46 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... create_5_000_run_trees: Mean +- std dev: 565 ms +- 41 ms ......................................... create_10_000_run_trees: Mean +- std dev: 1.14 sec +- 0.05 sec ......................................... create_20_000_run_trees: Mean +- std dev: 1.11 sec +- 0.06 sec ......................................... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 778 us +- 24 us ......................................... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 27.2 ms +- 0.2 ms ......................................... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 112 ms +- 2 ms ......................................... dumps_dataclass_nested_50x100: Mean +- std dev: 27.6 ms +- 0.5 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (6.44 ms) is 11% of the mean (57.3 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 57.3 ms +- 6.4 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (29.0 ms) is 14% of the mean (210 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydanticv1_nested_50x100: Mean +- std dev: 210 ms +- 29 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +===============================================+=========+=======================+ | dumps_class_nested_py_leaf_100x200 | 111 ms | 112 ms: 1.01x slower | +-----------------------------------------------+---------+-----------------------+ | dumps_class_nested_py_leaf_50x100 | 26.8 ms | 27.2 ms: 1.02x slower | +-----------------------------------------------+---------+-----------------------+ | dumps_dataclass_nested_50x100 | 27.1 ms | 27.6 ms: 1.02x slower | +-----------------------------------------------+---------+-----------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 763 us | 778 us: 1.02x slower | +-----------------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (5): dumps_pydanticv1_nested_50x100, dumps_pydantic_nested_50x100, create_5_000_run_trees, create_20_000_run_trees, create_10_000_run_trees

from __future__ import annotations

import asyncio
import concurrent.futures as cf
import datetime
import itertools
import logging
import pathlib
import uuid
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -67,7 +70,8 @@
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
) -> AsyncExperimentResults:
hyper_params: Optional[Dict[str, List[Any]]] = None,
) -> AsyncExperimentResults | CombinedAsyncExperimentResults:
r"""Evaluate an async target system or function on a given dataset.

Args:
Expand Down Expand Up @@ -95,6 +99,8 @@
experiment (Optional[schemas.TracerSession]): An existing experiment to
extend. If provided, experiment_prefix is ignored. For advanced
usage only.
hyper_params (Optional[Dict]): A set of hyper parameters to run your target
function over. Will run over all possible combinations.

Returns:
AsyncIterator[ExperimentResultRow]: An async iterator over the experiment results.
Expand Down Expand Up @@ -244,6 +250,7 @@
client=client,
blocking=blocking,
experiment=experiment,
hyper_params=hyper_params,
)


Expand Down Expand Up @@ -336,16 +343,19 @@
)
data_map = await aitertools.aio_to_thread(_load_examples_map, client, project)
data = [data_map[run.reference_example_id] for run in runs]
return await _aevaluate(
runs,
data=data,
evaluators=evaluators,
summary_evaluators=summary_evaluators,
metadata=metadata,
max_concurrency=max_concurrency,
client=client,
blocking=blocking,
experiment=project,
return cast(
AsyncExperimentResults,
await _aevaluate(
runs,
data=data,
evaluators=evaluators,
summary_evaluators=summary_evaluators,
metadata=metadata,
max_concurrency=max_concurrency,
client=client,
blocking=blocking,
experiment=project,
),
)


Expand All @@ -363,48 +373,145 @@
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
) -> AsyncExperimentResults:
hyper_params: Optional[Dict[str, List[Any]]] = None,
) -> AsyncExperimentResults | CombinedAsyncExperimentResults:
is_async_target = asyncio.iscoroutinefunction(target) or (
hasattr(target, "__aiter__") and asyncio.iscoroutine(target.__aiter__())
)
client = client or rt.get_cached_client()
runs = None if is_async_target else cast(Iterable[schemas.Run], target)
experiment_, runs = await aitertools.aio_to_thread(
_resolve_experiment,
experiment,
runs,
client,
)
manager = await _AsyncExperimentManager(
data,
client=client,
metadata=metadata,
experiment=experiment_ or experiment_prefix,
description=description,
num_repetitions=num_repetitions,
runs=runs,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
if cache_dir is not None:
dsid = await manager.get_dataset_id()
cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml"
if hyper_params is None:
runs = None if is_async_target else cast(Iterable[schemas.Run], target)
experiment_, runs = await aitertools.aio_to_thread(
_resolve_experiment,
experiment,
runs,
client,
)
manager = await _AsyncExperimentManager(
data,
client=client,
metadata=metadata,
experiment=experiment_ or experiment_prefix,
description=description,
num_repetitions=num_repetitions,
runs=runs,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
if cache_dir is not None:
dsid = await manager.get_dataset_id()
cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml"
else:
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
return results
else:
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
param_names = list(hyper_params.keys())
param_values = list(hyper_params.values())
param_combinations = [
dict(zip(param_names, combo)) for combo in itertools.product(*param_values)
]

managers = []
cache_dir = ls_utils.get_cache_dir(None)

for params in param_combinations:
param_str = "-".join(f"{k}={v}" for k, v in sorted(params.items()))
current_prefix = (
f"{experiment_prefix}-{param_str}"
if experiment_prefix
else f"experiment-{param_str}"
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency

# Create a unique cache path for each parameter combination
cache_path = (
pathlib.Path(cache_dir) / f"{current_prefix}.yaml"
if cache_dir
else None
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
return results

current_metadata = {**(metadata or {}), "hyper_params": params}

with ls_utils.with_optional_cache(
cache_path, ignore_hosts=[client.api_url]
):
manager = await _AsyncExperimentManager(
data,
client=client,
metadata=current_metadata,
experiment=current_prefix,
description=description,
num_repetitions=num_repetitions,
).astart()

if asyncio.iscoroutinefunction(target):
# Wrap target with current params
async def make_wrapped_target(fixed_params):
async def wrapped_target(inputs: dict) -> dict:
return await target(inputs, **fixed_params)

return wrapped_target

# And then use it like this:
wrapped_target = await make_wrapped_target(params.copy())

manager = await manager.awith_predictions(
wrapped_target, max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)

managers.append(manager)

return CombinedAsyncExperimentResults(managers)


class CombinedAsyncExperimentResults:
"""Container for multiple async experiment results from hyperparameter search."""

def __init__(
self,
managers: List[_AsyncExperimentManager],
):
"""Initialize the combined results."""
self.managers = managers
self._results = [AsyncExperimentResults(manager) for manager in managers]

async def wait(self) -> None:
"""Wait for all experiments to complete."""
await asyncio.gather(*(result.wait() for result in self._results))

def __getitem__(self, idx: int) -> AsyncExperimentResults:
"""Get results for a specific parameter combination."""
return self._results[idx]

def __len__(self) -> int:
"""Get the number of parameter combinations."""
return len(self._results)

def __iter__(self) -> Iterator[AsyncExperimentResults]:
"""Iterate over results for each parameter combination."""
return iter(self._results)

def __repr__(self) -> str:
"""Get string representation."""
return f"<CombinedAsyncExperimentResults with {len(self)} experiments>"


class _AsyncExperimentManager(_ExperimentManagerMixin):
Expand Down
Loading
Loading