Spaces:
Runtime error
Runtime error
# Copyright (c) 2006, Mathieu Fenniak | |
# All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions are | |
# met: | |
# | |
# * Redistributions of source code must retain the above copyright notice, | |
# this list of conditions and the following disclaimer. | |
# * Redistributions in binary form must reproduce the above copyright notice, | |
# this list of conditions and the following disclaimer in the documentation | |
# and/or other materials provided with the distribution. | |
# * The name of the author may not be used to endorse or promote products | |
# derived from this software without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
# POSSIBILITY OF SUCH DAMAGE. | |
import codecs | |
import decimal | |
import hashlib | |
import re | |
from binascii import unhexlify | |
from typing import Any, Callable, List, Optional, Tuple, Union, cast | |
from .._codecs import _pdfdoc_encoding_rev | |
from .._protocols import PdfObjectProtocol, PdfWriterProtocol | |
from .._utils import ( | |
StreamType, | |
b_, | |
deprecation_with_replacement, | |
hex_str, | |
hexencode, | |
logger_warning, | |
read_non_whitespace, | |
read_until_regex, | |
str_, | |
) | |
from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError | |
__author__ = "Mathieu Fenniak" | |
__author_email__ = "[email protected]" | |
class PdfObject(PdfObjectProtocol): | |
# function for calculating a hash value | |
hash_func: Callable[..., "hashlib._Hash"] = hashlib.sha1 | |
indirect_reference: Optional["IndirectObject"] | |
def hash_value_data(self) -> bytes: | |
return ("%s" % self).encode() | |
def hash_value(self) -> bytes: | |
return ( | |
"%s:%s" | |
% ( | |
self.__class__.__name__, | |
self.hash_func(self.hash_value_data()).hexdigest(), | |
) | |
).encode() | |
def clone( | |
self, | |
pdf_dest: PdfWriterProtocol, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "PdfObject": | |
""" | |
clone object into pdf_dest (PdfWriterProtocol which is an interface for PdfWriter) | |
force_duplicate: in standard if the object has been already cloned and reference, | |
the copy is returned; when force_duplicate == True, a new copy is always performed | |
ignore_fields : list/tuple of Fields names (for dictionaries that will be ignored during cloning (apply also to childs duplication) | |
in standard, clone function call _reference_clone (see _reference) | |
""" | |
raise Exception("clone PdfObject") | |
def _reference_clone( | |
self, clone: Any, pdf_dest: PdfWriterProtocol | |
) -> PdfObjectProtocol: | |
""" | |
reference the object within the _objects of pdf_dest only if | |
indirect_reference attribute exists (which means the objects | |
was already identified in xref/xobjstm) | |
if object has been already referenced do nothing | |
""" | |
try: | |
if clone.indirect_reference.pdf == pdf_dest: | |
return clone | |
except Exception: | |
pass | |
if hasattr(self, "indirect_reference"): | |
ind = self.indirect_reference | |
i = len(pdf_dest._objects) + 1 | |
if ind is not None: | |
if id(ind.pdf) not in pdf_dest._id_translated: | |
pdf_dest._id_translated[id(ind.pdf)] = {} | |
if ind.idnum in pdf_dest._id_translated[id(ind.pdf)]: | |
obj = pdf_dest.get_object( | |
pdf_dest._id_translated[id(ind.pdf)][ind.idnum] | |
) | |
assert obj is not None | |
return obj | |
pdf_dest._id_translated[id(ind.pdf)][ind.idnum] = i | |
pdf_dest._objects.append(clone) | |
clone.indirect_reference = IndirectObject(i, 0, pdf_dest) | |
return clone | |
def get_object(self) -> Optional["PdfObject"]: | |
"""Resolve indirect references.""" | |
return self | |
def getObject(self) -> Optional["PdfObject"]: # pragma: no cover | |
deprecation_with_replacement("getObject", "get_object", "3.0.0") | |
return self.get_object() | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
raise NotImplementedError | |
class NullObject(PdfObject): | |
def clone( | |
self, | |
pdf_dest: PdfWriterProtocol, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "NullObject": | |
"""clone object into pdf_dest""" | |
return cast("NullObject", self._reference_clone(NullObject(), pdf_dest)) | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
stream.write(b"null") | |
def read_from_stream(stream: StreamType) -> "NullObject": | |
nulltxt = stream.read(4) | |
if nulltxt != b"null": | |
raise PdfReadError("Could not read Null object") | |
return NullObject() | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
def __repr__(self) -> str: | |
return "NullObject" | |
def readFromStream(stream: StreamType) -> "NullObject": # pragma: no cover | |
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0") | |
return NullObject.read_from_stream(stream) | |
class BooleanObject(PdfObject): | |
def __init__(self, value: Any) -> None: | |
self.value = value | |
def clone( | |
self, | |
pdf_dest: PdfWriterProtocol, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "BooleanObject": | |
"""clone object into pdf_dest""" | |
return cast( | |
"BooleanObject", self._reference_clone(BooleanObject(self.value), pdf_dest) | |
) | |
def __eq__(self, __o: object) -> bool: | |
if isinstance(__o, BooleanObject): | |
return self.value == __o.value | |
elif isinstance(__o, bool): | |
return self.value == __o | |
else: | |
return False | |
def __repr__(self) -> str: | |
return "True" if self.value else "False" | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
if self.value: | |
stream.write(b"true") | |
else: | |
stream.write(b"false") | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
def read_from_stream(stream: StreamType) -> "BooleanObject": | |
word = stream.read(4) | |
if word == b"true": | |
return BooleanObject(True) | |
elif word == b"fals": | |
stream.read(1) | |
return BooleanObject(False) | |
else: | |
raise PdfReadError("Could not read Boolean object") | |
def readFromStream(stream: StreamType) -> "BooleanObject": # pragma: no cover | |
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0") | |
return BooleanObject.read_from_stream(stream) | |
class IndirectObject(PdfObject): | |
def __init__(self, idnum: int, generation: int, pdf: Any) -> None: # PdfReader | |
self.idnum = idnum | |
self.generation = generation | |
self.pdf = pdf | |
def clone( | |
self, | |
pdf_dest: PdfWriterProtocol, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "IndirectObject": | |
"""clone object into pdf_dest""" | |
if self.pdf == pdf_dest and not force_duplicate: | |
# Already duplicated and no extra duplication required | |
return self | |
if id(self.pdf) not in pdf_dest._id_translated: | |
pdf_dest._id_translated[id(self.pdf)] = {} | |
if not force_duplicate and self.idnum in pdf_dest._id_translated[id(self.pdf)]: | |
dup = pdf_dest.get_object(pdf_dest._id_translated[id(self.pdf)][self.idnum]) | |
else: | |
obj = self.get_object() | |
assert obj is not None | |
dup = obj.clone(pdf_dest, force_duplicate, ignore_fields) | |
assert dup is not None | |
assert dup.indirect_reference is not None | |
return dup.indirect_reference | |
def indirect_reference(self) -> "IndirectObject": # type: ignore[override] | |
return self | |
def get_object(self) -> Optional["PdfObject"]: | |
obj = self.pdf.get_object(self) | |
if obj is None: | |
return None | |
return obj.get_object() | |
def __repr__(self) -> str: | |
return f"IndirectObject({self.idnum!r}, {self.generation!r}, {id(self.pdf)})" | |
def __eq__(self, other: Any) -> bool: | |
return ( | |
other is not None | |
and isinstance(other, IndirectObject) | |
and self.idnum == other.idnum | |
and self.generation == other.generation | |
and self.pdf is other.pdf | |
) | |
def __ne__(self, other: Any) -> bool: | |
return not self.__eq__(other) | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
stream.write(b_(f"{self.idnum} {self.generation} R")) | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
def read_from_stream(stream: StreamType, pdf: Any) -> "IndirectObject": # PdfReader | |
idnum = b"" | |
while True: | |
tok = stream.read(1) | |
if not tok: | |
raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) | |
if tok.isspace(): | |
break | |
idnum += tok | |
generation = b"" | |
while True: | |
tok = stream.read(1) | |
if not tok: | |
raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) | |
if tok.isspace(): | |
if not generation: | |
continue | |
break | |
generation += tok | |
r = read_non_whitespace(stream) | |
if r != b"R": | |
raise PdfReadError( | |
f"Error reading indirect object reference at byte {hex_str(stream.tell())}" | |
) | |
return IndirectObject(int(idnum), int(generation), pdf) | |
def readFromStream( | |
stream: StreamType, pdf: Any # PdfReader | |
) -> "IndirectObject": # pragma: no cover | |
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0") | |
return IndirectObject.read_from_stream(stream, pdf) | |
class FloatObject(decimal.Decimal, PdfObject): | |
def __new__( | |
cls, value: Union[str, Any] = "0", context: Optional[Any] = None | |
) -> "FloatObject": | |
try: | |
return decimal.Decimal.__new__(cls, str_(value), context) | |
except Exception: | |
# If this isn't a valid decimal (happens in malformed PDFs) | |
# fallback to 0 | |
logger_warning(f"FloatObject ({value}) invalid; use 0.0 instead", __name__) | |
return decimal.Decimal.__new__(cls, "0.0") | |
def clone( | |
self, | |
pdf_dest: Any, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "FloatObject": | |
"""clone object into pdf_dest""" | |
return cast("FloatObject", self._reference_clone(FloatObject(self), pdf_dest)) | |
def __repr__(self) -> str: | |
if self == self.to_integral(): | |
# If this is an integer, format it with no decimal place. | |
return str(self.quantize(decimal.Decimal(1))) | |
else: | |
# Otherwise, format it with a decimal place, taking care to | |
# remove any extraneous trailing zeros. | |
return f"{self:f}".rstrip("0") | |
def as_numeric(self) -> float: | |
return float(repr(self).encode("utf8")) | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
stream.write(repr(self).encode("utf8")) | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
class NumberObject(int, PdfObject): | |
NumberPattern = re.compile(b"[^+-.0-9]") | |
def __new__(cls, value: Any) -> "NumberObject": | |
try: | |
return int.__new__(cls, int(value)) | |
except ValueError: | |
logger_warning(f"NumberObject({value}) invalid; use 0 instead", __name__) | |
return int.__new__(cls, 0) | |
def clone( | |
self, | |
pdf_dest: Any, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "NumberObject": | |
"""clone object into pdf_dest""" | |
return cast("NumberObject", self._reference_clone(NumberObject(self), pdf_dest)) | |
def as_numeric(self) -> int: | |
return int(repr(self).encode("utf8")) | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
stream.write(repr(self).encode("utf8")) | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
def read_from_stream(stream: StreamType) -> Union["NumberObject", "FloatObject"]: | |
num = read_until_regex(stream, NumberObject.NumberPattern) | |
if num.find(b".") != -1: | |
return FloatObject(num) | |
return NumberObject(num) | |
def readFromStream( | |
stream: StreamType, | |
) -> Union["NumberObject", "FloatObject"]: # pragma: no cover | |
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0") | |
return NumberObject.read_from_stream(stream) | |
class ByteStringObject(bytes, PdfObject): | |
""" | |
Represents a string object where the text encoding could not be determined. | |
This occurs quite often, as the PDF spec doesn't provide an alternate way to | |
represent strings -- for example, the encryption data stored in files (like | |
/O) is clearly not text, but is still stored in a "String" object. | |
""" | |
def clone( | |
self, | |
pdf_dest: Any, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "ByteStringObject": | |
"""clone object into pdf_dest""" | |
return cast( | |
"ByteStringObject", | |
self._reference_clone(ByteStringObject(bytes(self)), pdf_dest), | |
) | |
def original_bytes(self) -> bytes: | |
"""For compatibility with TextStringObject.original_bytes.""" | |
return self | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
bytearr = self | |
if encryption_key: | |
from .._security import RC4_encrypt | |
bytearr = RC4_encrypt(encryption_key, bytearr) # type: ignore | |
stream.write(b"<") | |
stream.write(hexencode(bytearr)) | |
stream.write(b">") | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
class TextStringObject(str, PdfObject): | |
""" | |
Represents a string object that has been decoded into a real unicode string. | |
If read from a PDF document, this string appeared to match the | |
PDFDocEncoding, or contained a UTF-16BE BOM mark to cause UTF-16 decoding to | |
occur. | |
""" | |
def clone( | |
self, | |
pdf_dest: Any, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "TextStringObject": | |
"""clone object into pdf_dest""" | |
obj = TextStringObject(self) | |
obj.autodetect_pdfdocencoding = self.autodetect_pdfdocencoding | |
obj.autodetect_utf16 = self.autodetect_utf16 | |
return cast("TextStringObject", self._reference_clone(obj, pdf_dest)) | |
autodetect_pdfdocencoding = False | |
autodetect_utf16 = False | |
def original_bytes(self) -> bytes: | |
""" | |
It is occasionally possible that a text string object gets created where | |
a byte string object was expected due to the autodetection mechanism -- | |
if that occurs, this "original_bytes" property can be used to | |
back-calculate what the original encoded bytes were. | |
""" | |
return self.get_original_bytes() | |
def get_original_bytes(self) -> bytes: | |
# We're a text string object, but the library is trying to get our raw | |
# bytes. This can happen if we auto-detected this string as text, but | |
# we were wrong. It's pretty common. Return the original bytes that | |
# would have been used to create this object, based upon the autodetect | |
# method. | |
if self.autodetect_utf16: | |
return codecs.BOM_UTF16_BE + self.encode("utf-16be") | |
elif self.autodetect_pdfdocencoding: | |
return encode_pdfdocencoding(self) | |
else: | |
raise Exception("no information about original bytes") | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
# Try to write the string out as a PDFDocEncoding encoded string. It's | |
# nicer to look at in the PDF file. Sadly, we take a performance hit | |
# here for trying... | |
try: | |
bytearr = encode_pdfdocencoding(self) | |
except UnicodeEncodeError: | |
bytearr = codecs.BOM_UTF16_BE + self.encode("utf-16be") | |
if encryption_key: | |
from .._security import RC4_encrypt | |
bytearr = RC4_encrypt(encryption_key, bytearr) | |
obj = ByteStringObject(bytearr) | |
obj.write_to_stream(stream, None) | |
else: | |
stream.write(b"(") | |
for c in bytearr: | |
if not chr(c).isalnum() and c != b" ": | |
# This: | |
# stream.write(b_(rf"\{c:0>3o}")) | |
# gives | |
# https://github.com/davidhalter/parso/issues/207 | |
stream.write(b_("\\%03o" % c)) | |
else: | |
stream.write(b_(chr(c))) | |
stream.write(b")") | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
class NameObject(str, PdfObject): | |
delimiter_pattern = re.compile(rb"\s+|[\(\)<>\[\]{}/%]") | |
surfix = b"/" | |
renumber_table = { | |
"#": b"#23", | |
"(": b"#28", | |
")": b"#29", | |
"/": b"#2F", | |
**{chr(i): f"#{i:02X}".encode() for i in range(33)}, | |
} | |
def clone( | |
self, | |
pdf_dest: Any, | |
force_duplicate: bool = False, | |
ignore_fields: Union[Tuple[str, ...], List[str], None] = (), | |
) -> "NameObject": | |
"""clone object into pdf_dest""" | |
return cast("NameObject", self._reference_clone(NameObject(self), pdf_dest)) | |
def write_to_stream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: | |
stream.write(self.renumber()) # b_(renumber(self))) | |
def writeToStream( | |
self, stream: StreamType, encryption_key: Union[None, str, bytes] | |
) -> None: # pragma: no cover | |
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0") | |
self.write_to_stream(stream, encryption_key) | |
def renumber(self) -> bytes: | |
out = self[0].encode("utf-8") | |
if out != b"/": | |
logger_warning(f"Incorrect first char in NameObject:({self})", __name__) | |
for c in self[1:]: | |
if c > "~": | |
for x in c.encode("utf-8"): | |
out += f"#{x:02X}".encode() | |
else: | |
try: | |
out += self.renumber_table[c] | |
except KeyError: | |
out += c.encode("utf-8") | |
return out | |
def unnumber(sin: bytes) -> bytes: | |
i = sin.find(b"#", 0) | |
while i >= 0: | |
try: | |
sin = sin[:i] + unhexlify(sin[i + 1 : i + 3]) + sin[i + 3 :] | |
i = sin.find(b"#", i + 1) | |
except ValueError: | |
# if the 2 characters after # can not be converted to hexa | |
# we change nothing and carry on | |
i = i + 1 | |
return sin | |
def read_from_stream(stream: StreamType, pdf: Any) -> "NameObject": # PdfReader | |
name = stream.read(1) | |
if name != NameObject.surfix: | |
raise PdfReadError("name read error") | |
name += read_until_regex(stream, NameObject.delimiter_pattern, ignore_eof=True) | |
try: | |
# Name objects should represent irregular characters | |
# with a '#' followed by the symbol's hex number | |
name = NameObject.unnumber(name) | |
for enc in ("utf-8", "gbk"): | |
try: | |
ret = name.decode(enc) | |
return NameObject(ret) | |
except Exception: | |
pass | |
raise UnicodeDecodeError("", name, 0, 0, "Code Not Found") | |
except (UnicodeEncodeError, UnicodeDecodeError) as e: | |
if not pdf.strict: | |
logger_warning( | |
f"Illegal character in Name Object ({repr(name)})", __name__ | |
) | |
return NameObject(name.decode("charmap")) | |
else: | |
raise PdfReadError( | |
f"Illegal character in Name Object ({repr(name)})" | |
) from e | |
def readFromStream( | |
stream: StreamType, pdf: Any # PdfReader | |
) -> "NameObject": # pragma: no cover | |
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0") | |
return NameObject.read_from_stream(stream, pdf) | |
def encode_pdfdocencoding(unicode_string: str) -> bytes: | |
retval = b"" | |
for c in unicode_string: | |
try: | |
retval += b_(chr(_pdfdoc_encoding_rev[c])) | |
except KeyError: | |
raise UnicodeEncodeError( | |
"pdfdocencoding", c, -1, -1, "does not exist in translation table" | |
) | |
return retval | |