]> git.madduck.net Git - etc/vim.git/blobdiff - src/blib2to3/pgen2/driver.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Delay worker count determination
[etc/vim.git] / src / blib2to3 / pgen2 / driver.py
index 052c94883cf5ae6eca9c8676d95ebc50586cd1c6..daf271dfa9ab50d8b22c58405b694edfff3ede41 100644 (file)
@@ -16,7 +16,6 @@ __author__ = "Guido van Rossum <guido@python.org>"
 __all__ = ["Driver", "load_grammar"]
 
 # Python imports
-import codecs
 import io
 import os
 import logging
@@ -24,50 +23,121 @@ import pkgutil
 import sys
 from typing import (
     Any,
-    Callable,
+    cast,
     IO,
     Iterable,
     List,
     Optional,
     Text,
+    Iterator,
     Tuple,
+    TypeVar,
+    Generic,
     Union,
-    Sequence,
 )
+from contextlib import contextmanager
+from dataclasses import dataclass, field
 
 # Pgen imports
 from . import grammar, parse, token, tokenize, pgen
 from logging import Logger
-from blib2to3.pytree import _Convert, NL
+from blib2to3.pytree import NL
 from blib2to3.pgen2.grammar import Grammar
+from blib2to3.pgen2.tokenize import GoodTokenInfo
 
 Path = Union[str, "os.PathLike[str]"]
 
 
+@dataclass
+class ReleaseRange:
+    start: int
+    end: Optional[int] = None
+    tokens: List[Any] = field(default_factory=list)
+
+    def lock(self) -> None:
+        total_eaten = len(self.tokens)
+        self.end = self.start + total_eaten
+
+
+class TokenProxy:
+    def __init__(self, generator: Any) -> None:
+        self._tokens = generator
+        self._counter = 0
+        self._release_ranges: List[ReleaseRange] = []
+
+    @contextmanager
+    def release(self) -> Iterator["TokenProxy"]:
+        release_range = ReleaseRange(self._counter)
+        self._release_ranges.append(release_range)
+        try:
+            yield self
+        finally:
+            # Lock the last release range to the final position that
+            # has been eaten.
+            release_range.lock()
+
+    def eat(self, point: int) -> Any:
+        eaten_tokens = self._release_ranges[-1].tokens
+        if point < len(eaten_tokens):
+            return eaten_tokens[point]
+        else:
+            while point >= len(eaten_tokens):
+                token = next(self._tokens)
+                eaten_tokens.append(token)
+            return token
+
+    def __iter__(self) -> "TokenProxy":
+        return self
+
+    def __next__(self) -> Any:
+        # If the current position is already compromised (looked up)
+        # return the eaten token, if not just go further on the given
+        # token producer.
+        for release_range in self._release_ranges:
+            assert release_range.end is not None
+
+            start, end = release_range.start, release_range.end
+            if start <= self._counter < end:
+                token = release_range.tokens[self._counter - start]
+                break
+        else:
+            token = next(self._tokens)
+        self._counter += 1
+        return token
+
+    def can_advance(self, to: int) -> bool:
+        # Try to eat, fail if it can't. The eat operation is cached
+        # so there wont be any additional cost of eating here
+        try:
+            self.eat(to)
+        except StopIteration:
+            return False
+        else:
+            return True
+
+
 class Driver(object):
-    def __init__(
-        self,
-        grammar: Grammar,
-        convert: Optional[_Convert] = None,
-        logger: Optional[Logger] = None,
-    ) -> None:
+    def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
         self.grammar = grammar
         if logger is None:
             logger = logging.getLogger(__name__)
         self.logger = logger
-        self.convert = convert
 
-    def parse_tokens(self, tokens: Iterable[Any], debug: bool = False) -> NL:
+    def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
         """Parse a series of tokens and return the syntax tree."""
         # XXX Move the prefix computation into a wrapper around tokenize.
-        p = parse.Parser(self.grammar, self.convert)
-        p.setup()
+        proxy = TokenProxy(tokens)
+
+        p = parse.Parser(self.grammar)
+        p.setup(proxy=proxy)
+
         lineno = 1
         column = 0
-        indent_columns = []
+        indent_columns: List[int] = []
         type = value = start = end = line_text = None
         prefix = ""
-        for quintuple in tokens:
+
+        for quintuple in proxy:
             type, value, start, end, line_text = quintuple
             if start != (lineno, column):
                 assert (lineno, column) <= start, ((lineno, column), start)
@@ -89,6 +159,7 @@ class Driver(object):
             if type == token.OP:
                 type = grammar.opmap[value]
             if debug:
+                assert type is not None
                 self.logger.debug(
                     "%s %r (prefix=%r)", token.tok_name[type], value, prefix
                 )
@@ -100,7 +171,7 @@ class Driver(object):
             elif type == token.DEDENT:
                 _indent_col = indent_columns.pop()
                 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
-            if p.addtoken(type, value, (prefix, start)):
+            if p.addtoken(cast(int, type), value, (prefix, start)):
                 if debug:
                     self.logger.debug("Stop.")
                 break
@@ -128,7 +199,7 @@ class Driver(object):
         return self.parse_stream_raw(stream, debug)
 
     def parse_file(
-        self, filename: Path, encoding: Optional[Text] = None, debug: bool = False,
+        self, filename: Path, encoding: Optional[Text] = None, debug: bool = False
     ) -> NL:
         """Parse a file and return the syntax tree."""
         with io.open(filename, "r", encoding=encoding) as stream:
@@ -192,14 +263,13 @@ def load_grammar(
         logger = logging.getLogger(__name__)
     gp = _generate_pickle_name(gt) if gp is None else gp
     if force or not _newer(gp, gt):
-        logger.info("Generating grammar tables from %s", gt)
         g: grammar.Grammar = pgen.generate_grammar(gt)
         if save:
-            logger.info("Writing grammar tables to %s", gp)
             try:
                 g.dump(gp)
-            except OSError as e:
-                logger.info("Writing failed: %s", e)
+            except OSError:
+                # Ignore error, caching is not vital.
+                pass
     else:
         g = grammar.Grammar()
         g.load(gp)