__all__ = ["Driver", "load_grammar"]
# Python imports
-import codecs
import io
-import os
import logging
+import os
import pkgutil
import sys
-from typing import (
- Any,
- Callable,
- IO,
- Iterable,
- List,
- Optional,
- Text,
- Tuple,
- Union,
- Sequence,
-)
-
-# Pgen imports
-from . import grammar, parse, token, tokenize, pgen
+from contextlib import contextmanager
+from dataclasses import dataclass, field
from logging import Logger
-from blib2to3.pytree import _Convert, NL
+from typing import IO, Any, Iterable, Iterator, List, Optional, Tuple, Union, cast
+
from blib2to3.pgen2.grammar import Grammar
+from blib2to3.pgen2.tokenize import GoodTokenInfo
+from blib2to3.pytree import NL
+
+# Pgen imports
+from . import grammar, parse, pgen, token, tokenize
Path = Union[str, "os.PathLike[str]"]
-class Driver(object):
- def __init__(
- self,
- grammar: Grammar,
- convert: Optional[_Convert] = None,
- logger: Optional[Logger] = None,
- ) -> None:
+@dataclass
+class ReleaseRange:
+ start: int
+ end: Optional[int] = None
+ tokens: List[Any] = field(default_factory=list)
+
+ def lock(self) -> None:
+ total_eaten = len(self.tokens)
+ self.end = self.start + total_eaten
+
+
+class TokenProxy:
+ def __init__(self, generator: Any) -> None:
+ self._tokens = generator
+ self._counter = 0
+ self._release_ranges: List[ReleaseRange] = []
+
+ @contextmanager
+ def release(self) -> Iterator["TokenProxy"]:
+ release_range = ReleaseRange(self._counter)
+ self._release_ranges.append(release_range)
+ try:
+ yield self
+ finally:
+ # Lock the last release range to the final position that
+ # has been eaten.
+ release_range.lock()
+
+ def eat(self, point: int) -> Any:
+ eaten_tokens = self._release_ranges[-1].tokens
+ if point < len(eaten_tokens):
+ return eaten_tokens[point]
+ else:
+ while point >= len(eaten_tokens):
+ token = next(self._tokens)
+ eaten_tokens.append(token)
+ return token
+
+ def __iter__(self) -> "TokenProxy":
+ return self
+
+ def __next__(self) -> Any:
+ # If the current position is already compromised (looked up)
+ # return the eaten token, if not just go further on the given
+ # token producer.
+ for release_range in self._release_ranges:
+ assert release_range.end is not None
+
+ start, end = release_range.start, release_range.end
+ if start <= self._counter < end:
+ token = release_range.tokens[self._counter - start]
+ break
+ else:
+ token = next(self._tokens)
+ self._counter += 1
+ return token
+
+ def can_advance(self, to: int) -> bool:
+ # Try to eat, fail if it can't. The eat operation is cached
+ # so there won't be any additional cost of eating here
+ try:
+ self.eat(to)
+ except StopIteration:
+ return False
+ else:
+ return True
+
+
+class Driver:
+ def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
self.grammar = grammar
if logger is None:
logger = logging.getLogger(__name__)
self.logger = logger
- self.convert = convert
- def parse_tokens(self, tokens: Iterable[Any], debug: bool = False) -> NL:
+ def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
"""Parse a series of tokens and return the syntax tree."""
# XXX Move the prefix computation into a wrapper around tokenize.
- p = parse.Parser(self.grammar, self.convert)
- p.setup()
+ proxy = TokenProxy(tokens)
+
+ p = parse.Parser(self.grammar)
+ p.setup(proxy=proxy)
+
lineno = 1
column = 0
- indent_columns = []
+ indent_columns: List[int] = []
type = value = start = end = line_text = None
prefix = ""
- for quintuple in tokens:
+
+ for quintuple in proxy:
type, value, start, end, line_text = quintuple
if start != (lineno, column):
assert (lineno, column) <= start, ((lineno, column), start)
if type == token.OP:
type = grammar.opmap[value]
if debug:
+ assert type is not None
self.logger.debug(
"%s %r (prefix=%r)", token.tok_name[type], value, prefix
)
elif type == token.DEDENT:
_indent_col = indent_columns.pop()
prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
- if p.addtoken(type, value, (prefix, start)):
+ if p.addtoken(cast(int, type), value, (prefix, start)):
if debug:
self.logger.debug("Stop.")
break
assert p.rootnode is not None
return p.rootnode
- def parse_stream_raw(self, stream: IO[Text], debug: bool = False) -> NL:
+ def parse_stream_raw(self, stream: IO[str], debug: bool = False) -> NL:
"""Parse a stream and return the syntax tree."""
tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
return self.parse_tokens(tokens, debug)
- def parse_stream(self, stream: IO[Text], debug: bool = False) -> NL:
+ def parse_stream(self, stream: IO[str], debug: bool = False) -> NL:
"""Parse a stream and return the syntax tree."""
return self.parse_stream_raw(stream, debug)
def parse_file(
- self, filename: Path, encoding: Optional[Text] = None, debug: bool = False,
+ self, filename: Path, encoding: Optional[str] = None, debug: bool = False
) -> NL:
"""Parse a file and return the syntax tree."""
- with io.open(filename, "r", encoding=encoding) as stream:
+ with open(filename, encoding=encoding) as stream:
return self.parse_stream(stream, debug)
- def parse_string(self, text: Text, debug: bool = False) -> NL:
+ def parse_string(self, text: str, debug: bool = False) -> NL:
"""Parse a string and return the syntax tree."""
tokens = tokenize.generate_tokens(
io.StringIO(text).readline, grammar=self.grammar
)
return self.parse_tokens(tokens, debug)
- def _partially_consume_prefix(self, prefix: Text, column: int) -> Tuple[Text, Text]:
+ def _partially_consume_prefix(self, prefix: str, column: int) -> Tuple[str, str]:
lines: List[str] = []
current_line = ""
current_column = 0
return "".join(lines), current_line
-def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> Text:
+def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str:
head, tail = os.path.splitext(gt)
if tail == ".txt":
tail = ""
def load_grammar(
- gt: Text = "Grammar.txt",
- gp: Optional[Text] = None,
+ gt: str = "Grammar.txt",
+ gp: Optional[str] = None,
save: bool = True,
force: bool = False,
logger: Optional[Logger] = None,
logger = logging.getLogger(__name__)
gp = _generate_pickle_name(gt) if gp is None else gp
if force or not _newer(gp, gt):
- logger.info("Generating grammar tables from %s", gt)
g: grammar.Grammar = pgen.generate_grammar(gt)
if save:
- logger.info("Writing grammar tables to %s", gp)
try:
g.dump(gp)
- except OSError as e:
- logger.info("Writing failed: %s", e)
+ except OSError:
+ # Ignore error, caching is not vital.
+ pass
else:
g = grammar.Grammar()
g.load(gp)
return g
-def _newer(a: Text, b: Text) -> bool:
+def _newer(a: str, b: str) -> bool:
"""Inquire whether file a was written since file b."""
if not os.path.exists(a):
return False
def load_packaged_grammar(
- package: str, grammar_source: Text, cache_dir: Optional[Path] = None
+ package: str, grammar_source: str, cache_dir: Optional[Path] = None
) -> grammar.Grammar:
"""Normally, loads a pickled grammar by doing
pkgutil.get_data(package, pickled_grammar)
return g
-def main(*args: Text) -> bool:
+def main(*args: str) -> bool:
"""Main program, when run as a script: produce grammar pickle files.
Calls load_grammar for each argument, a path to a grammar text file.