Skip to content

Commit

Permalink
STY: Minor code-style improvements for _reader.py (#2847)
Browse files Browse the repository at this point in the history
* STY: Minor code-style improvements for _reader.py

* Fix tests

* Update pypdf/_reader.py

Co-authored-by: pubpub-zz <[email protected]>

* fix doc building warning

* Undo is_null_or_none

* Undo

* Undo

* TypeGuard refinement for is_null_or_none

Co-authored-by: pubpub-zz <[email protected]>

* Move function to bottom for type annotations

---------

Co-authored-by: pubpub-zz <[email protected]>
  • Loading branch information
MartinThoma and pubpub-zz authored Sep 20, 2024
1 parent a337664 commit 8dd9fcb
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 59 deletions.
108 changes: 61 additions & 47 deletions pypdf/_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,19 @@ def __init__(
self.xref_objStm: Dict[int, Tuple[Any, Any]] = {}
self.trailer = DictionaryObject()

self._page_id2num: Optional[
Dict[Any, Any]
] = None # map page indirect_reference number to Page Number
# map page indirect_reference number to page number
self._page_id2num: Optional[Dict[Any, Any]] = None

self._initialize_stream(stream)

self._override_encryption = False
self._encryption: Optional[Encryption] = None
if self.is_encrypted:
self._handle_encryption(password)
elif password is not None:
raise PdfReadError("Not an encrypted file")

def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None:
if hasattr(stream, "mode") and "b" not in stream.mode:
logger_warning(
"PdfReader stream/file object is not in binary mode. "
Expand All @@ -142,31 +152,25 @@ def __init__(
self.read(stream)
self.stream = stream

def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None:
self._override_encryption = True
# Some documents may not have a /ID, use two empty
# byte strings instead. Solves
# https://github.com/py-pdf/pypdf/issues/608
id_entry = self.trailer.get(TK.ID)
id1_entry = id_entry[0].get_object().original_bytes if id_entry else b""
encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object())
self._encryption = Encryption.read(encrypt_entry, id1_entry)

# try empty password if no password provided
pwd = password if password is not None else b""
if (
self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED
and password is not None
):
# raise if password provided
raise WrongPasswordError("Wrong password")
self._override_encryption = False
self._encryption: Optional[Encryption] = None
if self.is_encrypted:
self._override_encryption = True
# Some documents may not have a /ID, use two empty
# byte strings instead. Solves
# https://github.com/py-pdf/pypdf/issues/608
id_entry = self.trailer.get(TK.ID)
id1_entry = id_entry[0].get_object().original_bytes if id_entry else b""
encrypt_entry = cast(
DictionaryObject, self.trailer[TK.ENCRYPT].get_object()
)
self._encryption = Encryption.read(encrypt_entry, id1_entry)

# try empty password if no password provided
pwd = password if password is not None else b""
if (
self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED
and password is not None
):
# raise if password provided
raise WrongPasswordError("Wrong password")
self._override_encryption = False
elif password is not None:
raise PdfReadError("Not encrypted file")

def __enter__(self) -> "PdfReader":
return self
Expand Down Expand Up @@ -286,13 +290,13 @@ def _get_page_number_by_indirect(
self, indirect_reference: Union[None, int, NullObject, IndirectObject]
) -> Optional[int]:
"""
Generate _page_id2num.
Retrieve the page number from an indirect reference.
Args:
indirect_reference:
indirect_reference: The indirect reference to locate.
Returns:
The page number or None
Page number or None.
"""
if self._page_id2num is None:
self._page_id2num = {
Expand Down Expand Up @@ -562,6 +566,12 @@ def _replace_object(self, indirect: IndirectObject, obj: PdfObject) -> PdfObject
return obj

def read(self, stream: StreamType) -> None:
"""
Read and process the PDF stream, extracting necessary data.
Args:
stream: The PDF file stream.
"""
self._basic_validation(stream)
self._find_eof_marker(stream)
startxref = self._find_startxref_pos(stream)
Expand Down Expand Up @@ -621,7 +631,7 @@ def read(self, stream: StreamType) -> None:
stream.seek(loc, 0) # return to where it was

def _basic_validation(self, stream: StreamType) -> None:
"""Ensure file is not empty. Read at most 5 bytes."""
"""Ensure the stream is valid and not empty."""
stream.seek(0, os.SEEK_SET)
try:
header_byte = stream.read(5)
Expand Down Expand Up @@ -819,6 +829,7 @@ def _read_standard_xref_table(self, stream: StreamType) -> None:
def _read_xref_tables_and_trailers(
self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int
) -> None:
"""Read the cross-reference tables and trailers in the PDF stream."""
self.xref = {}
self.xref_free_entry = {}
self.xref_objStm = {}
Expand All @@ -843,28 +854,31 @@ def _read_xref_tables_and_trailers(
except Exception as e:
if TK.ROOT in self.trailer:
logger_warning(
f"Previous trailer can not be read {e.args}",
__name__,
f"Previous trailer cannot be read: {e.args}", __name__
)
break
else:
raise PdfReadError(f"trailer can not be read {e.args}")
trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE
for key in trailer_keys:
if key in xrefstream and key not in self.trailer:
self.trailer[NameObject(key)] = xrefstream.raw_get(key)
if "/XRefStm" in xrefstream:
p = stream.tell()
stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0)
self._read_pdf15_xref_stream(stream)
stream.seek(p, 0)
raise PdfReadError(f"Trailer cannot be read: {e.args}")
self._process_xref_stream(xrefstream)
if "/Prev" in xrefstream:
startxref = cast(int, xrefstream["/Prev"])
else:
break
else:
startxref = self._read_xref_other_error(stream, startxref)

def _process_xref_stream(self, xrefstream: DictionaryObject) -> None:
"""Process and handle the xref stream."""
trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE
for key in trailer_keys:
if key in xrefstream and key not in self.trailer:
self.trailer[NameObject(key)] = xrefstream.raw_get(key)
if "/XRefStm" in xrefstream:
p = self.stream.tell()
self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0)
self._read_pdf15_xref_stream(self.stream)
self.stream.seek(p, 0)

def _read_xref(self, stream: StreamType) -> Optional[int]:
self._read_standard_xref_table(stream)
if stream.read(1) == b"":
Expand Down Expand Up @@ -937,7 +951,7 @@ def _read_xref_other_error(
def _read_pdf15_xref_stream(
self, stream: StreamType
) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]:
# PDF 1.5+ Cross-Reference Stream
"""Read the cross-reference stream for PDF 1.5+."""
stream.seek(-1, 1)
idnum, generation = self.read_object_header(stream)
xrefstream = cast(ContentStream, read_object(stream, self))
Expand Down Expand Up @@ -1065,6 +1079,7 @@ def _read_xref_subsections(
get_entry: Callable[[int], Union[int, Tuple[int, ...]]],
used_before: Callable[[int, Union[int, Tuple[int, ...]]], bool],
) -> None:
"""Read and process the subsections of the xref."""
for start, size in self._pairs(idx_pairs):
# The subsections must increase
for num in range(start, start + size):
Expand Down Expand Up @@ -1094,12 +1109,11 @@ def _read_xref_subsections(
raise PdfReadError(f"Unknown xref type: {xref_type}")

def _pairs(self, array: List[int]) -> Iterable[Tuple[int, int]]:
"""Iterate over pairs in the array."""
i = 0
while True:
while i + 1 < len(array):
yield array[i], array[i + 1]
i += 2
if (i + 1) >= len(array):
break

def decrypt(self, password: Union[str, bytes]) -> PasswordType:
"""
Expand Down
26 changes: 16 additions & 10 deletions pypdf/generic/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@
import codecs
import hashlib
import re
import sys
from binascii import unhexlify
from math import log10
from struct import iter_unpack
from typing import Any, Callable, ClassVar, Dict, Optional, Sequence, Union, cast

if sys.version_info[:2] >= (3, 10):
from typing import TypeGuard
else:
from typing_extensions import TypeGuard # PEP 647

from .._codecs import _pdfdoc_encoding_rev
from .._protocols import PdfObjectProtocol, PdfWriterProtocol
from .._utils import (
Expand Down Expand Up @@ -214,16 +220,6 @@ def __repr__(self) -> str:
return "NullObject"


def is_null_or_none(x: Any) -> bool:
"""
Returns:
True if x is None or NullObject.
"""
return x is None or (
isinstance(x, PdfObject) and isinstance(x.get_object(), NullObject)
)


class BooleanObject(PdfObject):
def __init__(self, value: Any) -> None:
self.value = value
Expand Down Expand Up @@ -853,3 +849,13 @@ def encode_pdfdocencoding(unicode_string: str) -> bytes:
-1,
"does not exist in translation table",
)


def is_null_or_none(x: Any) -> TypeGuard[Union[None, NullObject, IndirectObject]]:
"""
Returns:
True if x is None or NullObject.
"""
return x is None or (
isinstance(x, PdfObject) and isinstance(x.get_object(), NullObject)
)
2 changes: 1 addition & 1 deletion tests/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def test_attempt_decrypt_unencrypted_pdf():
path = RESOURCE_ROOT / "crazyones.pdf"
with pytest.raises(PdfReadError) as exc:
PdfReader(path, password="nonexistent")
assert exc.value.args[0] == "Not encrypted file"
assert exc.value.args[0] == "Not an encrypted file"


@pytest.mark.skipif(not HAS_AES, reason="No AES implementation")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ def test_reader(caplog):
url = "https://github.com/py-pdf/pypdf/files/9464742/shiv_resume.pdf"
name = "shiv_resume.pdf"
reader = PdfReader(BytesIO(get_data_from_url(url, name=name)))
assert "Previous trailer can not be read" in caplog.text
assert "Previous trailer cannot be read" in caplog.text
caplog.clear()
# first call requires some reparations...
reader.pages[0].extract_text()
Expand Down

0 comments on commit 8dd9fcb

Please sign in to comment.