#!/usr/bin/env python3 import contextlib import io import logging import re from collections.abc import Iterator from typing import ( Any, BinaryIO, Generic, TypeVar, Union, ) from pdfminer import psexceptions, settings from pdfminer.utils import choplist log = logging.getLogger(__name__) # Adding aliases for these exceptions for backwards compatibility PSException = psexceptions.PSException PSEOF = psexceptions.PSEOF PSSyntaxError = psexceptions.PSSyntaxError PSTypeError = psexceptions.PSTypeError PSValueError = psexceptions.PSValueError class PSObject: """Base class for all PS or PDF-related data types.""" class PSLiteral(PSObject): """A class that represents a PostScript literal. Postscript literals are used as identifiers, such as variable names, property names and dictionary keys. Literals are case sensitive and denoted by a preceding slash sign (e.g. "/Name") Note: Do not create an instance of PSLiteral directly. Always use PSLiteralTable.intern(). """ NameType = Union[str, bytes] def __init__(self, name: NameType) -> None: self.name = name def __repr__(self) -> str: name = self.name return f"/{name!r}" class PSKeyword(PSObject): """A class that represents a PostScript keyword. PostScript keywords are a dozen of predefined words. Commands and directives in PostScript are expressed by keywords. They are also used to denote the content boundaries. Note: Do not create an instance of PSKeyword directly. Always use PSKeywordTable.intern(). """ def __init__(self, name: bytes) -> None: self.name = name def __repr__(self) -> str: name = self.name return f"/{name!r}" _SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword) class PSSymbolTable(Generic[_SymbolT]): """A utility class for storing PSLiteral/PSKeyword objects. Interned objects can be checked its identity with "is" operator. """ def __init__(self, klass: type[_SymbolT]) -> None: self.dict: dict[PSLiteral.NameType, _SymbolT] = {} self.klass: type[_SymbolT] = klass def intern(self, name: PSLiteral.NameType) -> _SymbolT: if name in self.dict: lit = self.dict[name] else: # Type confusion issue: PSKeyword always takes bytes as name # PSLiteral uses either str or bytes lit = self.klass(name) # type: ignore[arg-type] self.dict[name] = lit return lit PSLiteralTable = PSSymbolTable(PSLiteral) PSKeywordTable = PSSymbolTable(PSKeyword) LIT = PSLiteralTable.intern KWD = PSKeywordTable.intern KEYWORD_PROC_BEGIN = KWD(b"{") KEYWORD_PROC_END = KWD(b"}") KEYWORD_ARRAY_BEGIN = KWD(b"[") KEYWORD_ARRAY_END = KWD(b"]") KEYWORD_DICT_BEGIN = KWD(b"<<") KEYWORD_DICT_END = KWD(b">>") def literal_name(x: Any) -> str: if isinstance(x, PSLiteral): if isinstance(x.name, str): return x.name try: return str(x.name, "utf-8") except UnicodeDecodeError: return str(x.name) else: if settings.STRICT: raise PSTypeError(f"Literal required: {x!r}") return str(x) def keyword_name(x: Any) -> Any: if not isinstance(x, PSKeyword): if settings.STRICT: raise PSTypeError(f"Keyword required: {x!r}") else: name = x else: name = str(x.name, "utf-8", "ignore") return name EOL = re.compile(rb"[\r\n]") SPC = re.compile(rb"\s") NONSPC = re.compile(rb"\S") HEX = re.compile(rb"[0-9a-fA-F]") END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]") END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]") HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.") END_NUMBER = re.compile(rb"[^0-9]") END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]") END_STRING = re.compile(rb"[()\134]") OCT_STRING = re.compile(rb"[0-7]") ESC_STRING = { b"b": 8, b"t": 9, b"n": 10, b"f": 12, b"r": 13, b"(": 40, b")": 41, b"\\": 92, } PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes] class PSBaseParser: """Most basic PostScript parser that performs only tokenization.""" BUFSIZ = 4096 def __init__(self, fp: BinaryIO) -> None: self.fp = fp self.eof = False self.seek(0) def __repr__(self) -> str: return f"<{self.__class__.__name__}: {self.fp!r}, bufpos={self.bufpos}>" def flush(self) -> None: pass def close(self) -> None: self.flush() def tell(self) -> int: return self.bufpos + self.charpos def poll(self, pos: int | None = None, n: int = 80) -> None: pos0 = self.fp.tell() if not pos: pos = self.bufpos + self.charpos self.fp.seek(pos) log.debug(f"poll({pos}): {self.fp.read(n)!r}") self.fp.seek(pos0) def seek(self, pos: int) -> None: """Seeks the parser to the given position.""" log.debug(f"seek: {pos!r}") self.fp.seek(pos) # reset the status for nextline() self.bufpos = pos self.buf = b"" self.charpos = 0 # reset the status for nexttoken() self._parse1 = self._parse_main self._curtoken = b"" self._curtokenpos = 0 self._tokens: list[tuple[int, PSBaseParserToken]] = [] self.eof = False def fillbuf(self) -> bool: if self.charpos < len(self.buf): return False # fetch next chunk. self.bufpos = self.fp.tell() self.buf = self.fp.read(self.BUFSIZ) if not self.buf: raise PSEOF("Unexpected EOF") self.charpos = 0 return False def nextline(self) -> tuple[int, bytes]: """Fetches a next line that ends either with \\r or \\n.""" linebuf = b"" linepos = self.bufpos + self.charpos eol = False while 1: self.fillbuf() if eol: c = self.buf[self.charpos : self.charpos + 1] # handle b'\r\n' if c == b"\n": linebuf += c self.charpos += 1 break m = EOL.search(self.buf, self.charpos) if m: linebuf += self.buf[self.charpos : m.end(0)] self.charpos = m.end(0) if linebuf[-1:] == b"\r": eol = True else: break else: linebuf += self.buf[self.charpos :] self.charpos = len(self.buf) log.debug(f"nextline: {linepos!r}, {linebuf!r}") return (linepos, linebuf) def revreadlines(self) -> Iterator[bytes]: """Fetches a next line backward. This is used to locate the trailers at the end of a file. """ self.fp.seek(0, io.SEEK_END) pos = self.fp.tell() buf = b"" while pos > 0: prevpos = pos pos = max(0, pos - self.BUFSIZ) self.fp.seek(pos) s = self.fp.read(prevpos - pos) if not s: break while 1: n = max(s.rfind(b"\r"), s.rfind(b"\n")) if n == -1: buf = s + buf break yield s[n:] + buf s = s[:n] buf = b"" def _parse_main(self, s: bytes, i: int) -> int: m = NONSPC.search(s, i) if not m: return len(s) j = m.start(0) c = s[j : j + 1] self._curtokenpos = self.bufpos + j if c == b"%": self._curtoken = b"%" self._parse1 = self._parse_comment return j + 1 elif c == b"/": self._curtoken = b"" self._parse1 = self._parse_literal return j + 1 elif c in b"-+" or c.isdigit(): self._curtoken = c self._parse1 = self._parse_number return j + 1 elif c == b".": self._curtoken = c self._parse1 = self._parse_float return j + 1 elif c.isalpha(): self._curtoken = c self._parse1 = self._parse_keyword return j + 1 elif c == b"(": self._curtoken = b"" self.paren = 1 self._parse1 = self._parse_string return j + 1 elif c == b"<": self._curtoken = b"" self._parse1 = self._parse_wopen return j + 1 elif c == b">": self._curtoken = b"" self._parse1 = self._parse_wclose return j + 1 elif c == b"\x00": return j + 1 else: self._add_token(KWD(c)) return j + 1 def _add_token(self, obj: PSBaseParserToken) -> None: self._tokens.append((self._curtokenpos, obj)) def _parse_comment(self, s: bytes, i: int) -> int: m = EOL.search(s, i) if not m: self._curtoken += s[i:] return len(s) j = m.start(0) self._curtoken += s[i:j] self._parse1 = self._parse_main # We ignore comments. # self._tokens.append(self._curtoken) return j def _parse_literal(self, s: bytes, i: int) -> int: m = END_LITERAL.search(s, i) if not m: self._curtoken += s[i:] return len(s) j = m.start(0) self._curtoken += s[i:j] c = s[j : j + 1] if c == b"#": self.hex = b"" self._parse1 = self._parse_literal_hex return j + 1 try: name: str | bytes = str(self._curtoken, "utf-8") except Exception: name = self._curtoken self._add_token(LIT(name)) self._parse1 = self._parse_main return j def _parse_literal_hex(self, s: bytes, i: int) -> int: c = s[i : i + 1] if HEX.match(c) and len(self.hex) < 2: self.hex += c return i + 1 if self.hex: self._curtoken += bytes((int(self.hex, 16),)) self._parse1 = self._parse_literal return i def _parse_number(self, s: bytes, i: int) -> int: m = END_NUMBER.search(s, i) if not m: self._curtoken += s[i:] return len(s) j = m.start(0) self._curtoken += s[i:j] c = s[j : j + 1] if c == b".": self._curtoken += c self._parse1 = self._parse_float return j + 1 with contextlib.suppress(ValueError): self._add_token(int(self._curtoken)) self._parse1 = self._parse_main return j def _parse_float(self, s: bytes, i: int) -> int: m = END_NUMBER.search(s, i) if not m: self._curtoken += s[i:] return len(s) j = m.start(0) self._curtoken += s[i:j] with contextlib.suppress(ValueError): self._add_token(float(self._curtoken)) self._parse1 = self._parse_main return j def _parse_keyword(self, s: bytes, i: int) -> int: m = END_KEYWORD.search(s, i) if m: j = m.start(0) self._curtoken += s[i:j] else: self._curtoken += s[i:] return len(s) if self._curtoken == b"true": token: bool | PSKeyword = True elif self._curtoken == b"false": token = False else: token = KWD(self._curtoken) self._add_token(token) self._parse1 = self._parse_main return j def _parse_string(self, s: bytes, i: int) -> int: m = END_STRING.search(s, i) if not m: self._curtoken += s[i:] return len(s) j = m.start(0) self._curtoken += s[i:j] c = s[j : j + 1] if c == b"\\": self.oct = b"" self._parse1 = self._parse_string_1 return j + 1 if c == b"(": self.paren += 1 self._curtoken += c return j + 1 if c == b")": self.paren -= 1 if self.paren: # WTF, they said balanced parens need no special treatment. self._curtoken += c return j + 1 self._add_token(self._curtoken) self._parse1 = self._parse_main return j + 1 def _parse_string_1(self, s: bytes, i: int) -> int: """Parse literal strings PDF Reference 3.2.3 """ c = s[i : i + 1] if OCT_STRING.match(c) and len(self.oct) < 3: self.oct += c return i + 1 elif self.oct: chrcode = int(self.oct, 8) assert chrcode < 256, f"Invalid octal {self.oct!r} ({chrcode})" self._curtoken += bytes((chrcode,)) self._parse1 = self._parse_string return i elif c in ESC_STRING: self._curtoken += bytes((ESC_STRING[c],)) elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n": # If current and next character is \r\n skip both because enters # after a \ are ignored i += 1 # default action self._parse1 = self._parse_string return i + 1 def _parse_wopen(self, s: bytes, i: int) -> int: c = s[i : i + 1] if c == b"<": self._add_token(KEYWORD_DICT_BEGIN) self._parse1 = self._parse_main i += 1 else: self._parse1 = self._parse_hexstring return i def _parse_wclose(self, s: bytes, i: int) -> int: c = s[i : i + 1] if c == b">": self._add_token(KEYWORD_DICT_END) i += 1 self._parse1 = self._parse_main return i def _parse_hexstring(self, s: bytes, i: int) -> int: m = END_HEX_STRING.search(s, i) if not m: self._curtoken += s[i:] return len(s) j = m.start(0) self._curtoken += s[i:j] token = HEX_PAIR.sub( lambda m: bytes((int(m.group(0), 16),)), SPC.sub(b"", self._curtoken), ) self._add_token(token) self._parse1 = self._parse_main return j def nexttoken(self) -> tuple[int, PSBaseParserToken]: if self.eof: # It's not really unexpected, come on now... raise PSEOF("Unexpected EOF") while not self._tokens: try: changed_stream = self.fillbuf() if changed_stream and self._curtoken: # Fixes #1157: if the stream is changed in the middle of a token, # try to parse it by tacking on whitespace. self._parse1(b"\n", 0) else: self.charpos = self._parse1(self.buf, self.charpos) except PSEOF: # If we hit EOF in the middle of a token, try to parse # it by tacking on whitespace, and delay raising PSEOF # until next time around self.charpos = self._parse1(b"\n", 0) self.eof = True # Oh, so there wasn't actually a token there? OK. if not self._tokens: raise token = self._tokens.pop(0) log.debug(f"nexttoken: {token!r}") return token # Stack slots may by occupied by any of: # * the name of a literal # * the PSBaseParserToken types # * list (via KEYWORD_ARRAY) # * dict (via KEYWORD_DICT) # * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT ExtraT = TypeVar("ExtraT") PSStackType = Union[ str, float, bool, PSLiteral, bytes, list[Any], dict[Any, Any], ExtraT ] PSStackEntry = tuple[int, PSStackType[ExtraT]] class PSStackParser(PSBaseParser, Generic[ExtraT]): def __init__(self, fp: BinaryIO) -> None: PSBaseParser.__init__(self, fp) self.reset() def reset(self) -> None: self.context: list[tuple[int, str | None, list[PSStackEntry[ExtraT]]]] = [] self.curtype: str | None = None self.curstack: list[PSStackEntry[ExtraT]] = [] self.results: list[PSStackEntry[ExtraT]] = [] def seek(self, pos: int) -> None: PSBaseParser.seek(self, pos) self.reset() def push(self, *objs: PSStackEntry[ExtraT]) -> None: self.curstack.extend(objs) def pop(self, n: int) -> list[PSStackEntry[ExtraT]]: objs = self.curstack[-n:] self.curstack[-n:] = [] return objs def popall(self) -> list[PSStackEntry[ExtraT]]: objs = self.curstack self.curstack = [] return objs def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: try: log.debug(f"add_results: {objs!r}") except Exception: log.debug("add_results: (unprintable object)") self.results.extend(objs) def start_type(self, pos: int, type: str) -> None: self.context.append((pos, self.curtype, self.curstack)) (self.curtype, self.curstack) = (type, []) log.debug(f"start_type: pos={pos!r}, type={type!r}") def end_type(self, type: str) -> tuple[int, list[PSStackType[ExtraT]]]: if self.curtype != type: raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}") objs = [obj for (_, obj) in self.curstack] (pos, self.curtype, self.curstack) = self.context.pop() log.debug(f"end_type: pos={pos!r}, type={type!r}, objs={objs!r}") return (pos, objs) def do_keyword(self, pos: int, token: PSKeyword) -> None: pass def nextobject(self) -> PSStackEntry[ExtraT]: """Yields a list of objects. Arrays and dictionaries are represented as Python lists and dictionaries. :return: keywords, literals, strings, numbers, arrays and dictionaries. """ while not self.results: (pos, token) = self.nexttoken() if isinstance(token, (int, float, bool, str, bytes, PSLiteral)): # normal token self.push((pos, token)) elif token == KEYWORD_ARRAY_BEGIN: # begin array self.start_type(pos, "a") elif token == KEYWORD_ARRAY_END: # end array try: self.push(self.end_type("a")) except PSTypeError: if settings.STRICT: raise elif token == KEYWORD_DICT_BEGIN: # begin dictionary self.start_type(pos, "d") elif token == KEYWORD_DICT_END: # end dictionary try: (pos, objs) = self.end_type("d") if len(objs) % 2 != 0: error_msg = f"Invalid dictionary construct: {objs!r}" raise PSSyntaxError(error_msg) d = { literal_name(k): v for (k, v) in choplist(2, objs) if v is not None } self.push((pos, d)) except PSTypeError: if settings.STRICT: raise elif token == KEYWORD_PROC_BEGIN: # begin proc self.start_type(pos, "p") elif token == KEYWORD_PROC_END: # end proc try: self.push(self.end_type("p")) except PSTypeError: if settings.STRICT: raise elif isinstance(token, PSKeyword): log.debug( f"do_keyword: pos={pos!r}, token={token!r}, stack={self.curstack!r}" ) self.do_keyword(pos, token) else: log.error( f"unknown token: pos={pos!r}, " f"token={token!r}, stack={self.curstack!r}" ) self.do_keyword(pos, token) raise PSException if self.context: continue else: self.flush() obj = self.results.pop(0) try: log.debug(f"nextobject: {obj!r}") except Exception: log.debug("nextobject: (unprintable object)") return obj