Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Improve PdfWriter handing of context manager #2913

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
78 changes: 48 additions & 30 deletions pypdf/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,16 @@ class PdfWriter(PdfDocCommon):
Typically data is added from a :class:`PdfReader<pypdf.PdfReader>`.

Args:
* : 1st argument is assigned to fileobj or clone_from based on context:
pubpub-zz marked this conversation as resolved.
Show resolved Hide resolved
assigned to clone_from if str/path to a non empty file or stream or PdfReader
pubpub-zz marked this conversation as resolved.
Show resolved Hide resolved
else assigned to fileobj.
pubpub-zz marked this conversation as resolved.
Show resolved Hide resolved

fileobj: output file/stream. To be used with context manager only.
pubpub-zz marked this conversation as resolved.
Show resolved Hide resolved

clone_from: identical to fileobj (for compatibility)

incremental: If true, loads the document and set the PdfWriter in incremental mode.


When writing incrementally, the original document is written first and new/modified
content is appended. To be used for signed document/forms to keep signature valid.

Expand All @@ -166,6 +171,7 @@ class PdfWriter(PdfDocCommon):

def __init__(
self,
*args: Any,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only as unnamed argument?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the unnamed parameter will be assigned to fileobj or clone_from. I do not really understand your question

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about what would be the best way to handle such cases and whether we really need this whole PR - IMHO keyword arguments should always be preferred over positional arguments, while keyword arguments have always worked correctly.

My recommendation would be to keep the old behavior, but deprecate unnamed arguments and make the constructor keyword-only in the future. This way, we force users to clearly express their intents without having to introduce further magic on our side.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it as it is now : first for many cases it is easier to not need to add a parameter name when typing. second we have this syntax for some time. Let's open a discussion about it and will see what is the feedback.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With your proposed approach, we get some sort of mixed handling which I consider even more harmful: Previously, you would be able to pass all parameters as positional ones, but allowing for keyword-based and mixed ones as well. The current state of this PR will drop all positional parameters except the first one. This is an undocumented breaking change which would need a deprecation process as well - thus switching to keyword-only arguments with a deprecation process is not much different while enforcing users to actually think of what they want to do.

The amount of characters to type should not really matter as well. A few libraries already migrated to keyword-only arguments in the past as this makes everything more readable. Additionally, every modern IDE (and even some regular text editors) provide support for autocompletion based upon the method signature.

fileobj: Union[None, PdfReader, StrByteType, Path] = "",
clone_from: Union[None, PdfReader, StrByteType, Path] = None,
incremental: bool = False,
Expand Down Expand Up @@ -202,50 +208,65 @@ def __init__(
self._ID: Union[ArrayObject, None] = None
self._info_obj: Optional[PdfObject]

if self.incremental:
if isinstance(fileobj, (str, Path)):
with open(fileobj, "rb") as f:
fileobj = BytesIO(f.read(-1))
if isinstance(fileobj, BytesIO):
fileobj = PdfReader(fileobj)
if not isinstance(fileobj, PdfReader):
raise PyPdfError("Invalid type for incremental mode")
self._reader = fileobj # prev content is in _reader.stream
self._header = fileobj.pdf_header.encode()
self._readonly = True # !!!TODO: to be analysed
else:
self._header = b"%PDF-1.3"
self._info_obj = self._add_object(
DictionaryObject(
{NameObject("/Producer"): create_string_object("pypdf")}
)
)
manualset_fileobj = True
pubpub-zz marked this conversation as resolved.
Show resolved Hide resolved
if len(args) > 0:
if fileobj == "":
fileobj = args[0]
manualset_fileobj = False
elif clone_from is None:
clone_from = args[0]

def _get_clone_from(
fileobj: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
clone_from: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
) -> Union[None, PdfReader, str, Path, IO[Any], BytesIO]:
if isinstance(fileobj, (str, Path, IO, BytesIO)) and (
fileobj == "" or clone_from is not None
manualset_fileobj: bool,
) -> Tuple[
Union[None, PdfReader, str, Path, IO[Any], BytesIO],
Union[None, PdfReader, str, Path, IO[Any], BytesIO],
]:
if manualset_fileobj or (
isinstance(fileobj, (str, Path, IO, BytesIO))
and (fileobj in ("", None) or clone_from is not None)
):
return clone_from
return clone_from, fileobj
cloning = True
if isinstance(fileobj, (str, Path)) and (
not Path(str(fileobj)).exists()
or Path(str(fileobj)).stat().st_size == 0
):
cloning = False

if isinstance(fileobj, (IO, BytesIO)):
t = fileobj.tell()
fileobj.seek(-1, 2)
if fileobj.tell() == 0:
cloning = False
fileobj.seek(t, 0)
if cloning:
clone_from = fileobj
return clone_from
return fileobj, None
return clone_from, fileobj

clone_from, fileobj = _get_clone_from(fileobj, clone_from, manualset_fileobj)

if self.incremental:
if isinstance(clone_from, (str, Path)):
with open(clone_from, "rb") as f:
clone_from = BytesIO(f.read(-1))
if isinstance(clone_from, (IO, BytesIO)):
clone_from = PdfReader(clone_from)
if not isinstance(clone_from, PdfReader):
raise PyPdfError("Invalid type for incremental mode")
self._reader = clone_from # prev content is in _reader.stream
self._header = clone_from.pdf_header.encode()
self._readonly = True # !!!TODO: to be analysed
else:
self._header = b"%PDF-1.3"
self._info_obj = self._add_object(
DictionaryObject(
{NameObject("/Producer"): create_string_object("pypdf")}
)
)

clone_from = _get_clone_from(fileobj, clone_from)
# to prevent overwriting
self.temp_fileobj = fileobj
self.fileobj = ""
Expand Down Expand Up @@ -354,10 +375,7 @@ def xmp_metadata(self, value: Optional[XmpInformation]) -> None:

def __enter__(self) -> "PdfWriter":
"""Store that writer is initialized by 'with'."""
t = self.temp_fileobj
self.__init__() # type: ignore
self.with_as_usage = True
self.fileobj = t # type: ignore
return self

def __exit__(
Expand Down Expand Up @@ -1393,7 +1411,7 @@ def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO[Any]]:

self.write_stream(stream)

if self.with_as_usage:
if my_file:
stream.close()

return my_file, stream
Expand Down
60 changes: 60 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2480,3 +2480,63 @@ def test_append_pdf_with_dest_without_page(caplog):
writer.append(reader)
assert "/__WKANCHOR_8" not in writer.named_destinations
assert len(writer.named_destinations) == 3


def test_writer_contextmanager():
"""To test the writer with context manager, cf #2912"""
pdf_path = str(RESOURCE_ROOT / "crazyones.pdf")
with PdfWriter(pdf_path) as w:
assert len(w.pages) > 0
assert not w.fileobj
with open(pdf_path, "rb") as f, PdfWriter(f) as w:
assert len(w.pages) > 0
assert not w.fileobj
with open(pdf_path, "rb") as f, PdfWriter(BytesIO(f.read(-1))) as w:
assert len(w.pages) > 0
assert not w.fileobj

try:
with NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
stefan6419846 marked this conversation as resolved.
Show resolved Hide resolved
tmp_file = Path(tmp.name)
with PdfWriter(tmp_file) as w:
assert len(w.pages) == 0

with open(tmp_file, "wb") as f1, open(pdf_path, "rb") as f:
f1.write(f.read(-1))
with PdfWriter(tmp_file) as w:
assert len(w.pages) > 0
assert tmp_file.stat().st_size > 0

with PdfWriter(tmp_file, incremental=True) as w:
assert w._reader
assert not w.fileobj
assert tmp_file.stat().st_size > 0

with PdfWriter(clone_from=tmp_file) as w:
assert len(w.pages) > 0
assert not w.fileobj
assert tmp_file.stat().st_size > 0

with PdfWriter(fileobj=tmp_file) as w:
assert len(w.pages) == 0
assert 8 <= tmp_file.stat().st_size <= 1024

b = BytesIO()
with PdfWriter(fileobj=b) as w:
assert len(w.pages) == 0
assert not b.closed
assert 8 <= len(b.getbuffer()) <= 1024

with NamedTemporaryFile(mode="wb", suffix=".pdf", delete=True) as tmp:
with PdfWriter(pdf_path, fileobj=tmp, incremental=True) as w:
assert w._reader
assert not tmp.closed
assert Path(tmp.name).stat().st_size == Path(pdf_path).stat().st_size

with PdfWriter(tmp_file) as w:
assert len(w.pages) == 0

except Exception as e:
raise e
finally:
tmp_file.unlink()
Loading