X-Git-Url: https://git.madduck.net/etc/vim.git/blobdiff_plain/788268bc39a87d37a24d203fa5ee7b3953af3446..7d062ecd5f14124a99daf452c46054ada656ad8b:/src/blib2to3/pgen2/driver.py

diff --git a/src/blib2to3/pgen2/driver.py b/src/blib2to3/pgen2/driver.py
index 81940f7..daf271d 100644
--- a/src/blib2to3/pgen2/driver.py
+++ b/src/blib2to3/pgen2/driver.py
@@ -16,7 +16,6 @@ __author__ = "Guido van Rossum <guido@python.org>"
 __all__ = ["Driver", "load_grammar"]
 
 # Python imports
-import codecs
 import io
 import os
 import logging
@@ -24,50 +23,121 @@ import pkgutil
 import sys
 from typing import (
     Any,
-    Callable,
+    cast,
     IO,
     Iterable,
     List,
     Optional,
     Text,
+    Iterator,
     Tuple,
+    TypeVar,
+    Generic,
     Union,
-    Sequence,
 )
+from contextlib import contextmanager
+from dataclasses import dataclass, field
 
 # Pgen imports
 from . import grammar, parse, token, tokenize, pgen
 from logging import Logger
-from blib2to3.pytree import _Convert, NL
+from blib2to3.pytree import NL
 from blib2to3.pgen2.grammar import Grammar
+from blib2to3.pgen2.tokenize import GoodTokenInfo
 
 Path = Union[str, "os.PathLike[str]"]
 
 
+@dataclass
+class ReleaseRange:
+    start: int
+    end: Optional[int] = None
+    tokens: List[Any] = field(default_factory=list)
+
+    def lock(self) -> None:
+        total_eaten = len(self.tokens)
+        self.end = self.start + total_eaten
+
+
+class TokenProxy:
+    def __init__(self, generator: Any) -> None:
+        self._tokens = generator
+        self._counter = 0
+        self._release_ranges: List[ReleaseRange] = []
+
+    @contextmanager
+    def release(self) -> Iterator["TokenProxy"]:
+        release_range = ReleaseRange(self._counter)
+        self._release_ranges.append(release_range)
+        try:
+            yield self
+        finally:
+            # Lock the last release range to the final position that
+            # has been eaten.
+            release_range.lock()
+
+    def eat(self, point: int) -> Any:
+        eaten_tokens = self._release_ranges[-1].tokens
+        if point < len(eaten_tokens):
+            return eaten_tokens[point]
+        else:
+            while point >= len(eaten_tokens):
+                token = next(self._tokens)
+                eaten_tokens.append(token)
+            return token
+
+    def __iter__(self) -> "TokenProxy":
+        return self
+
+    def __next__(self) -> Any:
+        # If the current position is already compromised (looked up)
+        # return the eaten token, if not just go further on the given
+        # token producer.
+        for release_range in self._release_ranges:
+            assert release_range.end is not None
+
+            start, end = release_range.start, release_range.end
+            if start <= self._counter < end:
+                token = release_range.tokens[self._counter - start]
+                break
+        else:
+            token = next(self._tokens)
+        self._counter += 1
+        return token
+
+    def can_advance(self, to: int) -> bool:
+        # Try to eat, fail if it can't. The eat operation is cached
+        # so there wont be any additional cost of eating here
+        try:
+            self.eat(to)
+        except StopIteration:
+            return False
+        else:
+            return True
+
+
 class Driver(object):
-    def __init__(
-        self,
-        grammar: Grammar,
-        convert: Optional[_Convert] = None,
-        logger: Optional[Logger] = None,
-    ) -> None:
+    def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
         self.grammar = grammar
         if logger is None:
             logger = logging.getLogger(__name__)
         self.logger = logger
-        self.convert = convert
 
-    def parse_tokens(self, tokens: Iterable[Any], debug: bool = False) -> NL:
+    def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
         """Parse a series of tokens and return the syntax tree."""
         # XXX Move the prefix computation into a wrapper around tokenize.
-        p = parse.Parser(self.grammar, self.convert)
-        p.setup()
+        proxy = TokenProxy(tokens)
+
+        p = parse.Parser(self.grammar)
+        p.setup(proxy=proxy)
+
         lineno = 1
         column = 0
-        indent_columns = []
+        indent_columns: List[int] = []
         type = value = start = end = line_text = None
         prefix = ""
-        for quintuple in tokens:
+
+        for quintuple in proxy:
             type, value, start, end, line_text = quintuple
             if start != (lineno, column):
                 assert (lineno, column) <= start, ((lineno, column), start)
@@ -89,6 +159,7 @@ class Driver(object):
             if type == token.OP:
                 type = grammar.opmap[value]
             if debug:
+                assert type is not None
                 self.logger.debug(
                     "%s %r (prefix=%r)", token.tok_name[type], value, prefix
                 )
@@ -100,7 +171,7 @@ class Driver(object):
             elif type == token.DEDENT:
                 _indent_col = indent_columns.pop()
                 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
-            if p.addtoken(type, value, (prefix, start)):
+            if p.addtoken(cast(int, type), value, (prefix, start)):
                 if debug:
                     self.logger.debug("Stop.")
                 break
@@ -192,14 +263,13 @@ def load_grammar(
         logger = logging.getLogger(__name__)
     gp = _generate_pickle_name(gt) if gp is None else gp
     if force or not _newer(gp, gt):
-        logger.info("Generating grammar tables from %s", gt)
         g: grammar.Grammar = pgen.generate_grammar(gt)
         if save:
-            logger.info("Writing grammar tables to %s", gp)
             try:
                 g.dump(gp)
-            except OSError as e:
-                logger.info("Writing failed: %s", e)
+            except OSError:
+                # Ignore error, caching is not vital.
+                pass
     else:
         g = grammar.Grammar()
         g.load(gp)