Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support bytestream in Unstructured API #1082

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import os
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.components.converters.utils import normalize_metadata
from haystack.dataclasses.byte_stream import ByteStream
from haystack.utils import Secret, deserialize_secrets_inplace
from tqdm import tqdm

Expand Down Expand Up @@ -123,47 +124,42 @@ def from_dict(cls, data: Dict[str, Any]) -> "UnstructuredFileConverter":
@component.output_types(documents=List[Document])
def run(
self,
paths: Union[List[str], List[os.PathLike]],
sources: Union[List[Union[str, os.PathLike, ByteStream]]],
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
"""
Convert files to Haystack Documents using the Unstructured API.
Convert files or byte streams to Haystack Documents using the Unstructured API.

:param paths: List of paths to convert. Paths can be files or directories.
If a path is a directory, all files in the directory are converted. Subdirectories are ignored.
:param sources: List of file paths or byte streams to convert.
Paths can be files or directories. Byte streams are also supported.
:param meta: Optional metadata to attach to the Documents.
This value can be either a list of dictionaries or a single dictionary.
If it's a single dictionary, its content is added to the metadata of all produced Documents.
If it's a list, the length of the list must match the number of paths, because the two lists will be zipped.
Please note that if the paths contain directories, `meta` can only be a single dictionary
(same metadata for all files).

This value can be a single dictionary or a list of dictionaries, matching the number of sources.
:returns: A dictionary with the following key:
- `documents`: List of Haystack Documents.

:raises ValueError: If `meta` is a list and `paths` contains directories.
:raises ValueError: If `meta` is a list and `sources` contains directories.
silvanocerza marked this conversation as resolved.
Show resolved Hide resolved
"""
paths_obj = [Path(path) for path in paths]
filepaths = [path for path in paths_obj if path.is_file()]
filepaths_in_directories = [
filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file()
]

# Separate file paths and byte streams
filepaths, filepaths_in_directories, byte_streams = self._get_sources(sources)


if filepaths_in_directories and isinstance(meta, list):
error = """"If providing directories in the `paths` parameter,
`meta` can only be a dictionary (metadata applied to every file),
and not a list. To specify different metadata for each file,
provide an explicit list of direct paths instead."""
raise ValueError(error)

# Combine file paths and directories for processing
all_filepaths = filepaths + filepaths_in_directories
# currently, the files are converted sequentially to gently handle API failures
documents = []
meta_list = normalize_metadata(meta, sources_count=len(all_filepaths))
meta_list = normalize_metadata(meta, sources_count=len(all_filepaths) + len(byte_streams))

# Process file paths
for filepath, metadata in tqdm(
zip(all_filepaths, meta_list), desc="Converting files to Haystack Documents", disable=not self.progress_bar
zip(all_filepaths, meta_list[:len(all_filepaths)]), desc="Converting files to Haystack Documents"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong.

If you have a combination of paths and ByteStream in your sources and a list for meta you risk assigning the wrong meta to the wrong Document.

You can verify with something like this:

from pathlib import Path

from haystack.dataclasses import ByteStream
from haystack_integrations.components.converters.unstructured import (
    UnstructuredFileConverter,
)

converter = UnstructuredFileConverter()

sources = [
    "README.md",
    ByteStream(data=b"content", meta={"file_path": "some_file.md"}),
    Path(__file__),
    ByteStream(data=b"content", meta={"file_path": "yet_another_file.md"}),
    ByteStream(data=b"content", meta={"file_path": "my_file.md"}),
]

meta = [
    {"type": "str"},
    {"type": "ByteStream"},
    {"type": "Path"},
    {"type": "ByteStream"},
    {"type": "ByteStream"},
]

res = converter.run(sources=sources, meta=meta)

Also I notice that some meta fields are completely lost.

):
elements = self._partition_file_into_elements(filepath=filepath)
elements = self._partition_source_into_elements(source=filepath)
docs_for_file = self._create_documents(
filepath=filepath,
elements=elements,
Expand All @@ -172,15 +168,46 @@ def run(
meta=metadata,
)
documents.extend(docs_for_file)

# Process byte streams
for bytestream in byte_streams:
elements = self._partition_source_into_elements(source=bytestream)
docs_for_stream = self._create_documents(
elements=elements,
document_creation_mode=self.document_creation_mode,
separator=self.separator,
meta=bytestream.meta,
)
documents.extend(docs_for_stream)

return {"documents": documents}

def _get_sources(
self,
sources: Union[List[str], List[os.PathLike], List[ByteStream]]
) -> Tuple[List[Path], List[Path], List[ByteStream]]:
"""
Helper function to process and return file paths, directories, and byte streams separately.
"""
paths_obj = [Path(source) for source in sources if isinstance(source, (str, os.PathLike))]
byte_streams = [source for source in sources if isinstance(source, ByteStream)]

# Separate files and directories
filepaths = [path for path in paths_obj if path.is_file()]

filepaths_in_directories = [
filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file()
]

return filepaths, filepaths_in_directories, byte_streams

@staticmethod
def _create_documents(
filepath: Path,
elements: List[Element],
document_creation_mode: Literal["one-doc-per-file", "one-doc-per-page", "one-doc-per-element"],
separator: str,
meta: Dict[str, Any],
filepath: Optional[Path] = None,
) -> List[Document]:
"""
Create Haystack Documents from the elements returned by Unstructured.
Expand All @@ -190,15 +217,17 @@ def _create_documents(
if document_creation_mode == "one-doc-per-file":
text = separator.join([str(el) for el in elements])
metadata = copy.deepcopy(meta)
metadata["file_path"] = str(filepath)
if filepath:
metadata["file_path"] = str(filepath) # Only include file path if provided
docs = [Document(content=text, meta=metadata)]

elif document_creation_mode == "one-doc-per-page":
texts_per_page: defaultdict[int, str] = defaultdict(str)
meta_per_page: defaultdict[int, dict] = defaultdict(dict)
for el in elements:
metadata = copy.deepcopy(meta)
metadata["file_path"] = str(filepath)
if filepath:
metadata["file_path"] = str(filepath)
if hasattr(el, "metadata"):
metadata.update(el.metadata.to_dict())
page_number = int(metadata.get("page_number", 1))
Expand All @@ -211,28 +240,40 @@ def _create_documents(
elif document_creation_mode == "one-doc-per-element":
for index, el in enumerate(elements):
metadata = copy.deepcopy(meta)
metadata["file_path"] = str(filepath)
if filepath:
metadata["file_path"] = str(filepath)
metadata["element_index"] = index
if hasattr(el, "metadata"):
metadata.update(el.metadata.to_dict())
if hasattr(el, "category"):
metadata["category"] = el.category
doc = Document(content=str(el), meta=metadata)
docs.append(doc)

return docs

def _partition_file_into_elements(self, filepath: Path) -> List[Element]:

def _partition_source_into_elements(self, source: Union[Path, ByteStream]) -> List[Element]:
"""
Partition a file into elements using the Unstructured API.
"""
elements = []
try:
elements = partition_via_api(
filename=str(filepath),
api_url=self.api_url,
api_key=self.api_key.resolve_value() if self.api_key else None,
**self.unstructured_kwargs,
)
if isinstance(source, Path):
elements = partition_via_api(
filename=str(source),
api_url=self.api_url,
api_key=self.api_key.resolve_value() if self.api_key else None,
**self.unstructured_kwargs,
)
else:
elements = partition_via_api(
file=source.data,
metadata_filename=str(source.meta),
api_url=self.api_url,
api_key=self.api_key.resolve_value() if self.api_key else None,
**self.unstructured_kwargs,
)
except Exception as e:
logger.warning(f"Unstructured could not process file {filepath}. Error: {e}")
logger.warning(f"Unstructured could not process source {source}. Error: {e}")
return elements
42 changes: 36 additions & 6 deletions integrations/unstructured/tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#
# SPDX-License-Identifier: Apache-2.0
import pytest
from haystack.dataclasses.byte_stream import ByteStream

from haystack_integrations.components.converters.unstructured import UnstructuredFileConverter


Expand Down Expand Up @@ -86,6 +88,34 @@ def test_run_one_doc_per_file(self, samples_path):
assert len(documents) == 1
assert documents[0].meta == {"file_path": str(pdf_path)}

@pytest.mark.integration
def test_run_one_doc_per_file_bytestream(self, samples_path):
pdf_path = samples_path / "sample_pdf.pdf"
pdf_stream = ByteStream.from_file_path(pdf_path)

local_converter = UnstructuredFileConverter(
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file"
)

documents = local_converter.run([pdf_stream])["documents"]

assert len(documents) == 1

@pytest.mark.integration
def test_run_one_doc_per_page_bytestream(self, samples_path):
pdf_path = samples_path / "sample_pdf.pdf"
pdf_stream = ByteStream.from_file_path(pdf_path)

local_converter = UnstructuredFileConverter(
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page"
)

documents = local_converter.run([pdf_stream])["documents"]

assert len(documents) == 4
for i, doc in enumerate(documents, start=1):
assert doc.meta["page_number"] == i

@pytest.mark.integration
def test_run_one_doc_per_page(self, samples_path):
pdf_path = samples_path / "sample_pdf.pdf"
Expand Down Expand Up @@ -127,7 +157,7 @@ def test_run_one_doc_per_file_with_meta(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file"
)

documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"]
documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"]

assert len(documents) == 1
assert documents[0].meta["file_path"] == str(pdf_path)
Expand All @@ -143,7 +173,7 @@ def test_run_one_doc_per_page_with_meta(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page"
)

documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"]
documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"]
assert len(documents) == 4
for i, doc in enumerate(documents, start=1):
assert doc.meta["file_path"] == str(pdf_path)
Expand All @@ -159,7 +189,7 @@ def test_run_one_doc_per_element_with_meta(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)

documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"]
documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"]

assert len(documents) > 4
first_element_index = 0
Expand All @@ -185,7 +215,7 @@ def test_run_one_doc_per_element_with_meta_list_two_files(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)

documents = local_converter.run(paths=pdf_path, meta=meta)["documents"]
documents = local_converter.run(sources=pdf_path, meta=meta)["documents"]

assert len(documents) > 4
for doc in documents:
Expand All @@ -205,7 +235,7 @@ def test_run_one_doc_per_element_with_meta_list_folder_fail(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)
with pytest.raises(ValueError):
local_converter.run(paths=pdf_path, meta=meta)["documents"]
local_converter.run(sources=pdf_path, meta=meta)["documents"]

@pytest.mark.integration
def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path):
Expand All @@ -216,7 +246,7 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)

documents = local_converter.run(paths=pdf_path, meta=meta)["documents"]
documents = local_converter.run(sources=pdf_path, meta=meta)["documents"]

assert len(documents) > 4
for doc in documents:
Expand Down
Loading