From c5dad57a671a6c1af7b5e561e2585579f383e6b4 Mon Sep 17 00:00:00 2001 From: Vivian Nguyen Date: Tue, 5 Nov 2024 12:36:16 -0800 Subject: [PATCH] Add testing for updating config; release GIL in ManagedQuery methods --- apis/python/src/tiledbsoma/_read_iters.py | 8 +++- apis/python/src/tiledbsoma/managed_query.cc | 13 +++++- apis/python/tests/test_dataframe.py | 47 +++++++++++++++++++++ apis/python/tests/test_dense_nd_array.py | 42 ++++++++++++++++++ apis/python/tests/test_sparse_nd_array.py | 44 +++++++++++++++++++ 5 files changed, 150 insertions(+), 4 deletions(-) diff --git a/apis/python/src/tiledbsoma/_read_iters.py b/apis/python/src/tiledbsoma/_read_iters.py index 534dab122c..90ead8e57a 100644 --- a/apis/python/src/tiledbsoma/_read_iters.py +++ b/apis/python/src/tiledbsoma/_read_iters.py @@ -482,7 +482,9 @@ def __init__( sr = array._handle._handle if platform_config is not None: - ctx = clib.SOMAContext(platform_config) + cfg = sr.context().config() + cfg.update(platform_config) + ctx = clib.SOMAContext(cfg) else: ctx = sr.context() @@ -553,7 +555,9 @@ def _arrow_table_reader( sr = array._handle._handle if platform_config is not None: - ctx = clib.SOMAContext(platform_config) + cfg = sr.context().config() + cfg.update(platform_config) + ctx = clib.SOMAContext(cfg) else: ctx = sr.context() diff --git a/apis/python/src/tiledbsoma/managed_query.cc b/apis/python/src/tiledbsoma/managed_query.cc index 7de08619bc..64e20d2123 100644 --- a/apis/python/src/tiledbsoma/managed_query.cc +++ b/apis/python/src/tiledbsoma/managed_query.cc @@ -117,7 +117,10 @@ void load_managed_query(py::module& m) { "names"_a, "if_not_empty"_a = false) - .def("submit_read", &ManagedQuery::submit_read) + .def( + "submit_read", + &ManagedQuery::submit_read, + py::call_guard()) .def( "results", [](ManagedQuery& mq) -> std::optional { @@ -143,6 +146,7 @@ void load_managed_query(py::module& m) { py_batch.attr("_export_to_c")( arrow_array_ptr, arrow_schema_ptr); + py::gil_scoped_release release; try { mq.set_array_data( std::make_unique(arrow_schema), @@ -150,6 +154,7 @@ void load_managed_query(py::module& m) { } catch (const std::exception& e) { TPY_ERROR_LOC(e.what()); } + py::gil_scoped_acquire acquire; arrow_schema.release(&arrow_schema); arrow_array.release(&arrow_array); @@ -158,17 +163,21 @@ void load_managed_query(py::module& m) { "set_soma_data", [](ManagedQuery& mq, py::array data) { py::buffer_info data_info = data.request(); + + py::gil_scoped_release release; mq.setup_write_column( "soma_data", data.size(), (const void*)data_info.ptr, static_cast(nullptr), static_cast(nullptr)); + py::gil_scoped_acquire acquire; }) .def( "submit_write", &ManagedQuery::submit_write, - "sort_coords"_a = false) + "sort_coords"_a = false, + py::call_guard()) .def("reset", &ManagedQuery::reset) .def("close", &ManagedQuery::close) diff --git a/apis/python/tests/test_dataframe.py b/apis/python/tests/test_dataframe.py index b1f5103323..b40daa5b28 100644 --- a/apis/python/tests/test_dataframe.py +++ b/apis/python/tests/test_dataframe.py @@ -1899,3 +1899,50 @@ def test_bounds_on_somajoinid_domain(tmp_path): ) assert soma.DataFrame.exists(uri) + + +def test_pass_configs(tmp_path, arrow_schema): + uri = tmp_path.as_posix() + + with soma.DataFrame.create(uri, schema=arrow_schema()) as sdf: + pydict = {} + pydict["soma_joinid"] = [0, 1, 2, 3, 4] + pydict["foo"] = [10, 20, 30, 40, 50] + pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5] + pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"] + pydict["quux"] = [True, False, False, True, False] + rb = pa.Table.from_pydict(pydict) + + if soma._flags.NEW_SHAPE_FEATURE_FLAG_ENABLED: + sdf.tiledbsoma_resize_soma_joinid_shape(len(rb)) + + sdf.write(rb) + + # Pass a custom config to open + with soma.DataFrame.open( + uri, + "r", + context=soma.SOMATileDBContext( + {"sm.mem.total_budget": "0", "sm.io_concurrency_level": "0"} + ), + ) as sdf: + + # This error out as 0 are not valid values to set the total memory + # budget or nummber of threads + with pytest.raises(soma.SOMAError): + next(sdf.read()) + + # This still errors out because read still sees that the number of + # threads is 0 and therefore invalid + with pytest.raises(soma.SOMAError): + next(sdf.read(platform_config={"sm.mem.total_budget": "10000"})) + + # With correct values, this reads without issue + next( + sdf.read( + platform_config={ + "sm.mem.total_budget": "10000", + "sm.io_concurrency_level": "1", + } + ) + ) diff --git a/apis/python/tests/test_dense_nd_array.py b/apis/python/tests/test_dense_nd_array.py index d347f95625..14092a9e36 100644 --- a/apis/python/tests/test_dense_nd_array.py +++ b/apis/python/tests/test_dense_nd_array.py @@ -504,3 +504,45 @@ def test_read_to_unwritten_array(tmp_path, shape): with soma.DenseNDArray.open(uri, "r") as A: assert np.array_equal(np.ones(shape) * 255, A.read().to_numpy()) + + +def test_pass_configs(tmp_path): + uri = tmp_path.as_posix() + + with soma.DenseNDArray.create( + tmp_path.as_posix(), + type=pa.uint8(), + shape=(2, 2), + context=SOMATileDBContext(timestamp=1), + ) as a: + a.write( + (slice(0, 2), slice(0, 2)), + pa.Tensor.from_numpy(np.zeros((2, 2), dtype=np.uint8)), + ) + + # Pass a custom config to open + with soma.DenseNDArray.open( + uri, + "r", + context=soma.SOMATileDBContext( + {"sm.mem.total_budget": "0", "sm.io_concurrency_level": "0"} + ), + ) as sdf: + + # This error out as 0 are not valid values to set the total memory + # budget or nummber of threads + with pytest.raises(soma.SOMAError): + sdf.read() + + # This still errors out because read still sees that the number of + # threads is 0 and therefore invalid + with pytest.raises(soma.SOMAError): + sdf.read(platform_config={"sm.mem.total_budget": "10000"}) + + # With correct values, this reads without issue + sdf.read( + platform_config={ + "sm.mem.total_budget": "10000", + "sm.io_concurrency_level": "1", + } + ) diff --git a/apis/python/tests/test_sparse_nd_array.py b/apis/python/tests/test_sparse_nd_array.py index 403e806b6d..8cc309389c 100644 --- a/apis/python/tests/test_sparse_nd_array.py +++ b/apis/python/tests/test_sparse_nd_array.py @@ -1883,3 +1883,47 @@ def test_global_writes(tmp_path): data, platform_config=soma.TileDBCreateOptions(), ) + + +def test_pass_configs(tmp_path): + uri = tmp_path.as_posix() + + with soma.SparseNDArray.create( + tmp_path.as_posix(), type=pa.uint8(), shape=(3,) + ) as a: + data = pa.Table.from_pydict( + { + "soma_dim_0": pa.array([0, 1, 2], type=pa.int64()), + "soma_data": pa.array([1, 2, 3], type=pa.uint8()), + } + ) + a.write(data) + + # Pass a custom config to open + with soma.SparseNDArray.open( + uri, + "r", + context=soma.SOMATileDBContext( + {"sm.mem.total_budget": "0", "sm.io_concurrency_level": "0"} + ), + ) as sdf: + + # This error out as 0 are not valid values to set the total memory + # budget or nummber of threads + with pytest.raises(soma.SOMAError): + next(sdf.read().tables()) + + # This still errors out because read still sees that the number of + # threads is 0 and therefore invalid + with pytest.raises(soma.SOMAError): + next(sdf.read(platform_config={"sm.mem.total_budget": "10000"}).tables()) + + # With correct values, this reads without issue + next( + sdf.read( + platform_config={ + "sm.mem.total_budget": "10000", + "sm.io_concurrency_level": "1", + } + ).tables() + )