Skip to content

Commit

Permalink
Merge pull request #18 from ami-iit/to_file
Browse files Browse the repository at this point in the history
  • Loading branch information
S-Dafarra authored Jul 31, 2024
2 parents f6ccb4f + d6a40bf commit 063c614
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: Dependencies
shell: bash -l {0}
run: |
mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py git
mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py git hdf5storage
mamba list
- name: Install
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ hippopt is an open-source framework for generating whole-body trajectories for l
## Installation
It is suggested to use [``mamba``](https://github.com/conda-forge/miniforge).
```bash
conda install -c conda-forge -c robotology python=3.11 casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py
conda install -c conda-forge -c robotology python=3.11 casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py hdf5storage
pip install --no-deps -e .[all]
```

Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ robot_planning=
turnkey_planners=
idyntree
resolve-robotics-uri-py
hdf5storage
visualization=
ffmpeg-python
idyntree
Expand Down
119 changes: 97 additions & 22 deletions src/hippopt/base/optimization_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,44 @@ def _scan(
input_dict: dict | None = None,
output_filter: Callable[[str, Any, dict], bool] | None = None,
input_conversion: Callable[[str, Any], Any] | None = None,
) -> (dict, dict):
output_conversion: Callable[[str, Any], Any] | None = None,
output_flat: bool = True,
) -> tuple[dict, dict] | tuple[list, list]:
output_dict = {}
metadata_dict = {}
if isinstance(input_object, list):
assert all(
if not all(
isinstance(elem, OptimizationObject) or isinstance(elem, list)
for elem in input_object
)
):
raise ValueError(
"The input object is a list, but not all elements are"
" OptimizationObject instances."
)
output_list = []
output_metadata_list = []
if not output_flat and name_prefix != "":
output_dict[name_prefix] = output_list
metadata_dict[name_prefix] = output_metadata_list

for i, elem in enumerate(input_object):
inner_dict, inner_metadata = OptimizationObject._scan(
input_object=elem,
name_prefix=name_prefix + f"[{str(i)}].",
name_prefix=name_prefix + f"[{str(i)}]." if output_flat else "",
parent_metadata=parent_metadata,
input_dict=input_dict,
output_filter=output_filter,
input_conversion=input_conversion,
output_conversion=output_conversion,
output_flat=output_flat,
)
output_dict.update(inner_dict)
output_list.append(inner_dict)
metadata_dict.update(inner_metadata)
output_metadata_list.append(inner_metadata)

if not output_flat and name_prefix == "":
return output_list, output_metadata_list
return output_dict, metadata_dict

assert isinstance(input_object, OptimizationObject)
Expand Down Expand Up @@ -131,14 +150,24 @@ def _scan(
separator = "" if list_of_optimization_objects else "."
inner_dict, inner_metadata = OptimizationObject._scan(
input_object=composite_value,
name_prefix=name_prefix + field.name + separator,
name_prefix=(
name_prefix + field.name + separator if output_flat else ""
),
parent_metadata=new_parent_metadata,
input_dict=input_dict,
output_filter=output_filter,
input_conversion=input_conversion,
output_conversion=output_conversion,
output_flat=output_flat,
)
output_dict.update(inner_dict)
metadata_dict.update(inner_metadata)

if output_flat:
output_dict.update(inner_dict)
metadata_dict.update(inner_metadata)
else:
output_dict[field.name] = inner_dict
metadata_dict[field.name] = inner_metadata

continue

if OptimizationObject.StorageTypeField in field.metadata:
Expand All @@ -157,15 +186,20 @@ def _scan(
parent_metadata[OptimizationObject.StorageTypeField]
)

composite_value = OptimizationObject._convert_to_np_array(
composite_value_edited = OptimizationObject._convert_to_np_array(
composite_value
)
value_is_list = isinstance(composite_value, list)
value_is_list = isinstance(composite_value_edited, list)
value_list = composite_value if value_is_list else [composite_value]
name_radix = name_prefix + field.name
name_radix = name_prefix + field.name if output_flat else field.name
value_from_dict = []

if not output_flat and value_is_list:
output_dict[field.name] = []
metadata_dict[field.name] = []

for i, val in enumerate(value_list):
postfix = f"[{i}]" if value_is_list else ""
postfix = f"[{i}]" if value_is_list and output_flat else ""
full_name = name_radix + postfix

if input_dict is not None and full_name in input_dict:
Expand All @@ -177,17 +211,27 @@ def _scan(
value_from_dict.append(converted_input)

output_value = (
OptimizationObject._convert_to_np_array(composite_value[i])
if value_is_list
else composite_value
composite_value[i] if value_is_list else composite_value
)

output_value = (
output_conversion(full_name, output_value)
if output_conversion is not None
else output_value
)

output_value = OptimizationObject._convert_to_np_array(output_value)

if output_filter is not None:
if not output_filter(full_name, output_value, value_metadata):
continue

metadata_dict[full_name] = value_metadata
output_dict[full_name] = output_value
if not output_flat and value_is_list:
output_dict[full_name].append(output_value)
metadata_dict[full_name].append(value_metadata)
else:
output_dict[full_name] = output_value
metadata_dict[full_name] = value_metadata

if len(value_from_dict) > 0:
input_object.__setattr__(
Expand All @@ -197,25 +241,42 @@ def _scan(

continue

if not output_flat and name_prefix != "":
nested_output = {name_prefix: output_dict}
nested_metadata = {name_prefix: metadata_dict}
return nested_output, nested_metadata

return output_dict, metadata_dict

def to_dict(
self,
prefix: str = "",
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
flatten: bool = True,
) -> dict:
output_dict, _ = OptimizationObject._scan(
input_object=self, name_prefix=prefix, output_filter=output_filter
input_object=self,
name_prefix=prefix,
output_filter=output_filter,
output_conversion=output_conversion,
output_flat=flatten,
)
return output_dict

def to_dicts(
self,
prefix: str = "",
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
flatten: bool = True,
) -> (dict, dict):
output_dict, metadata_dict = OptimizationObject._scan(
input_object=self, name_prefix=prefix, output_filter=output_filter
input_object=self,
name_prefix=prefix,
output_filter=output_filter,
output_conversion=output_conversion,
output_flat=flatten,
)
return output_dict, metadata_dict

Expand All @@ -232,16 +293,30 @@ def from_dict(
input_conversion=input_conversion,
)

def to_list(self) -> list:
def to_list(
self,
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
) -> list:
output_list = []
as_dict = self.to_dict()
as_dict = self.to_dict(
output_filter=output_filter, output_conversion=output_conversion
)
for key in sorted(as_dict.keys()):
output_list.append(as_dict[key])

return output_list

def to_mx(self) -> cs.MX:
return cs.vertcat(*self.to_list())
def to_mx(
self,
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
) -> cs.MX:
return cs.vertcat(
*self.to_list(
output_filter=output_filter, output_conversion=output_conversion
)
)

@classmethod
def default_storage_metadata(cls, **kwargs) -> dict:
Expand Down
23 changes: 23 additions & 0 deletions src/hippopt/base/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ def __post_init__(
self.cost_values = _cost_values
self.constraint_multipliers = _constraint_multipliers

def to_dict(self) -> dict:
def set_nested_value(d, input_key, value):
keys = input_key.split(".")
assert all(isinstance(k, str) and len(k) > 0 for k in keys)
for key in keys[:-1]:
d = d.setdefault(key, {})
d[keys[-1]] = value

def flatten_to_nested_dict(flat_dict):
nested_dict = {}
for key, value in flat_dict.items():
set_nested_value(nested_dict, key, value)
return nested_dict

return {
"values": self.values.to_dict(flatten=False),
"cost_value": self.cost_value,
"cost_values": flatten_to_nested_dict(self.cost_values),
"constraint_multipliers": flatten_to_nested_dict(
self.constraint_multipliers
),
}


@dataclasses.dataclass
class Problem(abc.ABC, Generic[TGenericSolver, TInputObjects]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
frames/*
*.png
*.mp4
*.mat
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -496,3 +497,17 @@ def get_references(
save=True,
file_name_stem="humanoid_walking_periodic",
)

print("Saving data to humanoid_walking_periodic.mat")

humanoid_walking_periodic = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion
),
}
hdf5storage.savemat(
file_name="humanoid_walking_periodic.mat",
mdict=humanoid_walking_periodic,
truncate_existing=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -603,6 +604,20 @@ def get_references(
file_name_stem="humanoid_single_jump_flat",
)

print("Saving data to humanoid_single_jump_flat.mat")

humanoid_single_jump_flat = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion
),
}
hdf5storage.savemat(
file_name="humanoid_single_jump_flat.mat",
mdict=humanoid_single_jump_flat,
truncate_existing=True,
)

plotter_settings = hp_rp.FootContactStatePlotterSettings()
plotter_settings.terrain = planner_settings.terrain
left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -592,6 +593,20 @@ def get_references(
file_name_stem="humanoid_walking_ramp",
)

print("Saving data to humanoid_walking_ramp.mat")

humanoid_walking_ramp = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion
),
}
hdf5storage.savemat(
file_name="humanoid_walking_ramp.mat",
mdict=humanoid_walking_ramp,
truncate_existing=True,
)

plotter_settings = hp_rp.FootContactStatePlotterSettings()
plotter_settings.terrain = planner_settings.terrain
left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -584,6 +585,21 @@ def get_references(
file_name_stem="humanoid_walking_step",
)

print("Saving data to humanoid_walking_step.mat")

humanoid_walking_step = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False,
output_conversion=hippopt.OptimizationObject.DMConversion,
),
}
hdf5storage.savemat(
file_name="humanoid_walking_step.mat",
mdict=humanoid_walking_step,
truncate_existing=True,
)

plotter_settings = hp_rp.FootContactStatePlotterSettings()
plotter_settings.terrain = planner_settings.terrain
left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings)
Expand Down
Loading

0 comments on commit 063c614

Please sign in to comment.