]> git.madduck.net Git - etc/vim.git/blob - src/blib2to3/pgen2/driver.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Fix an infinite loop when using `# fmt: on/off` ... (#3158)
[etc/vim.git] / src / blib2to3 / pgen2 / driver.py
1 # Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 # Modifications:
5 # Copyright 2006 Google, Inc. All Rights Reserved.
6 # Licensed to PSF under a Contributor Agreement.
7
8 """Parser driver.
9
10 This provides a high-level interface to parse a file into a syntax tree.
11
12 """
13
14 __author__ = "Guido van Rossum <guido@python.org>"
15
16 __all__ = ["Driver", "load_grammar"]
17
18 # Python imports
19 import io
20 import os
21 import logging
22 import pkgutil
23 import sys
24 from typing import (
25     Any,
26     cast,
27     IO,
28     Iterable,
29     List,
30     Optional,
31     Text,
32     Iterator,
33     Tuple,
34     TypeVar,
35     Generic,
36     Union,
37 )
38 from contextlib import contextmanager
39 from dataclasses import dataclass, field
40
41 # Pgen imports
42 from . import grammar, parse, token, tokenize, pgen
43 from logging import Logger
44 from blib2to3.pytree import NL
45 from blib2to3.pgen2.grammar import Grammar
46 from blib2to3.pgen2.tokenize import GoodTokenInfo
47
48 Path = Union[str, "os.PathLike[str]"]
49
50
51 @dataclass
52 class ReleaseRange:
53     start: int
54     end: Optional[int] = None
55     tokens: List[Any] = field(default_factory=list)
56
57     def lock(self) -> None:
58         total_eaten = len(self.tokens)
59         self.end = self.start + total_eaten
60
61
62 class TokenProxy:
63     def __init__(self, generator: Any) -> None:
64         self._tokens = generator
65         self._counter = 0
66         self._release_ranges: List[ReleaseRange] = []
67
68     @contextmanager
69     def release(self) -> Iterator["TokenProxy"]:
70         release_range = ReleaseRange(self._counter)
71         self._release_ranges.append(release_range)
72         try:
73             yield self
74         finally:
75             # Lock the last release range to the final position that
76             # has been eaten.
77             release_range.lock()
78
79     def eat(self, point: int) -> Any:
80         eaten_tokens = self._release_ranges[-1].tokens
81         if point < len(eaten_tokens):
82             return eaten_tokens[point]
83         else:
84             while point >= len(eaten_tokens):
85                 token = next(self._tokens)
86                 eaten_tokens.append(token)
87             return token
88
89     def __iter__(self) -> "TokenProxy":
90         return self
91
92     def __next__(self) -> Any:
93         # If the current position is already compromised (looked up)
94         # return the eaten token, if not just go further on the given
95         # token producer.
96         for release_range in self._release_ranges:
97             assert release_range.end is not None
98
99             start, end = release_range.start, release_range.end
100             if start <= self._counter < end:
101                 token = release_range.tokens[self._counter - start]
102                 break
103         else:
104             token = next(self._tokens)
105         self._counter += 1
106         return token
107
108     def can_advance(self, to: int) -> bool:
109         # Try to eat, fail if it can't. The eat operation is cached
110         # so there wont be any additional cost of eating here
111         try:
112             self.eat(to)
113         except StopIteration:
114             return False
115         else:
116             return True
117
118
119 class Driver(object):
120     def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
121         self.grammar = grammar
122         if logger is None:
123             logger = logging.getLogger(__name__)
124         self.logger = logger
125
126     def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
127         """Parse a series of tokens and return the syntax tree."""
128         # XXX Move the prefix computation into a wrapper around tokenize.
129         proxy = TokenProxy(tokens)
130
131         p = parse.Parser(self.grammar)
132         p.setup(proxy=proxy)
133
134         lineno = 1
135         column = 0
136         indent_columns: List[int] = []
137         type = value = start = end = line_text = None
138         prefix = ""
139
140         for quintuple in proxy:
141             type, value, start, end, line_text = quintuple
142             if start != (lineno, column):
143                 assert (lineno, column) <= start, ((lineno, column), start)
144                 s_lineno, s_column = start
145                 if lineno < s_lineno:
146                     prefix += "\n" * (s_lineno - lineno)
147                     lineno = s_lineno
148                     column = 0
149                 if column < s_column:
150                     prefix += line_text[column:s_column]
151                     column = s_column
152             if type in (tokenize.COMMENT, tokenize.NL):
153                 prefix += value
154                 lineno, column = end
155                 if value.endswith("\n"):
156                     lineno += 1
157                     column = 0
158                 continue
159             if type == token.OP:
160                 type = grammar.opmap[value]
161             if debug:
162                 assert type is not None
163                 self.logger.debug(
164                     "%s %r (prefix=%r)", token.tok_name[type], value, prefix
165                 )
166             if type == token.INDENT:
167                 indent_columns.append(len(value))
168                 _prefix = prefix + value
169                 prefix = ""
170                 value = ""
171             elif type == token.DEDENT:
172                 _indent_col = indent_columns.pop()
173                 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
174             if p.addtoken(cast(int, type), value, (prefix, start)):
175                 if debug:
176                     self.logger.debug("Stop.")
177                 break
178             prefix = ""
179             if type in {token.INDENT, token.DEDENT}:
180                 prefix = _prefix
181             lineno, column = end
182             if value.endswith("\n"):
183                 lineno += 1
184                 column = 0
185         else:
186             # We never broke out -- EOF is too soon (how can this happen???)
187             assert start is not None
188             raise parse.ParseError("incomplete input", type, value, (prefix, start))
189         assert p.rootnode is not None
190         return p.rootnode
191
192     def parse_stream_raw(self, stream: IO[Text], debug: bool = False) -> NL:
193         """Parse a stream and return the syntax tree."""
194         tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
195         return self.parse_tokens(tokens, debug)
196
197     def parse_stream(self, stream: IO[Text], debug: bool = False) -> NL:
198         """Parse a stream and return the syntax tree."""
199         return self.parse_stream_raw(stream, debug)
200
201     def parse_file(
202         self, filename: Path, encoding: Optional[Text] = None, debug: bool = False
203     ) -> NL:
204         """Parse a file and return the syntax tree."""
205         with io.open(filename, "r", encoding=encoding) as stream:
206             return self.parse_stream(stream, debug)
207
208     def parse_string(self, text: Text, debug: bool = False) -> NL:
209         """Parse a string and return the syntax tree."""
210         tokens = tokenize.generate_tokens(
211             io.StringIO(text).readline, grammar=self.grammar
212         )
213         return self.parse_tokens(tokens, debug)
214
215     def _partially_consume_prefix(self, prefix: Text, column: int) -> Tuple[Text, Text]:
216         lines: List[str] = []
217         current_line = ""
218         current_column = 0
219         wait_for_nl = False
220         for char in prefix:
221             current_line += char
222             if wait_for_nl:
223                 if char == "\n":
224                     if current_line.strip() and current_column < column:
225                         res = "".join(lines)
226                         return res, prefix[len(res) :]
227
228                     lines.append(current_line)
229                     current_line = ""
230                     current_column = 0
231                     wait_for_nl = False
232             elif char in " \t":
233                 current_column += 1
234             elif char == "\n":
235                 # unexpected empty line
236                 current_column = 0
237             else:
238                 # indent is finished
239                 wait_for_nl = True
240         return "".join(lines), current_line
241
242
243 def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> Text:
244     head, tail = os.path.splitext(gt)
245     if tail == ".txt":
246         tail = ""
247     name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
248     if cache_dir:
249         return os.path.join(cache_dir, os.path.basename(name))
250     else:
251         return name
252
253
254 def load_grammar(
255     gt: Text = "Grammar.txt",
256     gp: Optional[Text] = None,
257     save: bool = True,
258     force: bool = False,
259     logger: Optional[Logger] = None,
260 ) -> Grammar:
261     """Load the grammar (maybe from a pickle)."""
262     if logger is None:
263         logger = logging.getLogger(__name__)
264     gp = _generate_pickle_name(gt) if gp is None else gp
265     if force or not _newer(gp, gt):
266         logger.info("Generating grammar tables from %s", gt)
267         g: grammar.Grammar = pgen.generate_grammar(gt)
268         if save:
269             logger.info("Writing grammar tables to %s", gp)
270             try:
271                 g.dump(gp)
272             except OSError as e:
273                 logger.info("Writing failed: %s", e)
274     else:
275         g = grammar.Grammar()
276         g.load(gp)
277     return g
278
279
280 def _newer(a: Text, b: Text) -> bool:
281     """Inquire whether file a was written since file b."""
282     if not os.path.exists(a):
283         return False
284     if not os.path.exists(b):
285         return True
286     return os.path.getmtime(a) >= os.path.getmtime(b)
287
288
289 def load_packaged_grammar(
290     package: str, grammar_source: Text, cache_dir: Optional[Path] = None
291 ) -> grammar.Grammar:
292     """Normally, loads a pickled grammar by doing
293         pkgutil.get_data(package, pickled_grammar)
294     where *pickled_grammar* is computed from *grammar_source* by adding the
295     Python version and using a ``.pickle`` extension.
296
297     However, if *grammar_source* is an extant file, load_grammar(grammar_source)
298     is called instead. This facilitates using a packaged grammar file when needed
299     but preserves load_grammar's automatic regeneration behavior when possible.
300
301     """
302     if os.path.isfile(grammar_source):
303         gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
304         return load_grammar(grammar_source, gp=gp)
305     pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
306     data = pkgutil.get_data(package, pickled_name)
307     assert data is not None
308     g = grammar.Grammar()
309     g.loads(data)
310     return g
311
312
313 def main(*args: Text) -> bool:
314     """Main program, when run as a script: produce grammar pickle files.
315
316     Calls load_grammar for each argument, a path to a grammar text file.
317     """
318     if not args:
319         args = tuple(sys.argv[1:])
320     logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
321     for gt in args:
322         load_grammar(gt, save=True, force=True)
323     return True
324
325
326 if __name__ == "__main__":
327     sys.exit(int(not main()))