]> git.madduck.net Git - etc/vim.git/blob - src/blib2to3/pgen2/driver.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Bump sphinx from 7.2.5 to 7.2.6 in /docs (#3895)
[etc/vim.git] / src / blib2to3 / pgen2 / driver.py
1 # Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 # Modifications:
5 # Copyright 2006 Google, Inc. All Rights Reserved.
6 # Licensed to PSF under a Contributor Agreement.
7
8 """Parser driver.
9
10 This provides a high-level interface to parse a file into a syntax tree.
11
12 """
13
14 __author__ = "Guido van Rossum <guido@python.org>"
15
16 __all__ = ["Driver", "load_grammar"]
17
18 # Python imports
19 import io
20 import logging
21 import os
22 import pkgutil
23 import sys
24 from contextlib import contextmanager
25 from dataclasses import dataclass, field
26 from logging import Logger
27 from typing import IO, Any, Iterable, Iterator, List, Optional, Tuple, Union, cast
28
29 from blib2to3.pgen2.grammar import Grammar
30 from blib2to3.pgen2.tokenize import GoodTokenInfo
31 from blib2to3.pytree import NL
32
33 # Pgen imports
34 from . import grammar, parse, pgen, token, tokenize
35
36 Path = Union[str, "os.PathLike[str]"]
37
38
39 @dataclass
40 class ReleaseRange:
41     start: int
42     end: Optional[int] = None
43     tokens: List[Any] = field(default_factory=list)
44
45     def lock(self) -> None:
46         total_eaten = len(self.tokens)
47         self.end = self.start + total_eaten
48
49
50 class TokenProxy:
51     def __init__(self, generator: Any) -> None:
52         self._tokens = generator
53         self._counter = 0
54         self._release_ranges: List[ReleaseRange] = []
55
56     @contextmanager
57     def release(self) -> Iterator["TokenProxy"]:
58         release_range = ReleaseRange(self._counter)
59         self._release_ranges.append(release_range)
60         try:
61             yield self
62         finally:
63             # Lock the last release range to the final position that
64             # has been eaten.
65             release_range.lock()
66
67     def eat(self, point: int) -> Any:
68         eaten_tokens = self._release_ranges[-1].tokens
69         if point < len(eaten_tokens):
70             return eaten_tokens[point]
71         else:
72             while point >= len(eaten_tokens):
73                 token = next(self._tokens)
74                 eaten_tokens.append(token)
75             return token
76
77     def __iter__(self) -> "TokenProxy":
78         return self
79
80     def __next__(self) -> Any:
81         # If the current position is already compromised (looked up)
82         # return the eaten token, if not just go further on the given
83         # token producer.
84         for release_range in self._release_ranges:
85             assert release_range.end is not None
86
87             start, end = release_range.start, release_range.end
88             if start <= self._counter < end:
89                 token = release_range.tokens[self._counter - start]
90                 break
91         else:
92             token = next(self._tokens)
93         self._counter += 1
94         return token
95
96     def can_advance(self, to: int) -> bool:
97         # Try to eat, fail if it can't. The eat operation is cached
98         # so there won't be any additional cost of eating here
99         try:
100             self.eat(to)
101         except StopIteration:
102             return False
103         else:
104             return True
105
106
107 class Driver:
108     def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
109         self.grammar = grammar
110         if logger is None:
111             logger = logging.getLogger(__name__)
112         self.logger = logger
113
114     def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
115         """Parse a series of tokens and return the syntax tree."""
116         # XXX Move the prefix computation into a wrapper around tokenize.
117         proxy = TokenProxy(tokens)
118
119         p = parse.Parser(self.grammar)
120         p.setup(proxy=proxy)
121
122         lineno = 1
123         column = 0
124         indent_columns: List[int] = []
125         type = value = start = end = line_text = None
126         prefix = ""
127
128         for quintuple in proxy:
129             type, value, start, end, line_text = quintuple
130             if start != (lineno, column):
131                 assert (lineno, column) <= start, ((lineno, column), start)
132                 s_lineno, s_column = start
133                 if lineno < s_lineno:
134                     prefix += "\n" * (s_lineno - lineno)
135                     lineno = s_lineno
136                     column = 0
137                 if column < s_column:
138                     prefix += line_text[column:s_column]
139                     column = s_column
140             if type in (tokenize.COMMENT, tokenize.NL):
141                 prefix += value
142                 lineno, column = end
143                 if value.endswith("\n"):
144                     lineno += 1
145                     column = 0
146                 continue
147             if type == token.OP:
148                 type = grammar.opmap[value]
149             if debug:
150                 assert type is not None
151                 self.logger.debug(
152                     "%s %r (prefix=%r)", token.tok_name[type], value, prefix
153                 )
154             if type == token.INDENT:
155                 indent_columns.append(len(value))
156                 _prefix = prefix + value
157                 prefix = ""
158                 value = ""
159             elif type == token.DEDENT:
160                 _indent_col = indent_columns.pop()
161                 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
162             if p.addtoken(cast(int, type), value, (prefix, start)):
163                 if debug:
164                     self.logger.debug("Stop.")
165                 break
166             prefix = ""
167             if type in {token.INDENT, token.DEDENT}:
168                 prefix = _prefix
169             lineno, column = end
170             if value.endswith("\n"):
171                 lineno += 1
172                 column = 0
173         else:
174             # We never broke out -- EOF is too soon (how can this happen???)
175             assert start is not None
176             raise parse.ParseError("incomplete input", type, value, (prefix, start))
177         assert p.rootnode is not None
178         return p.rootnode
179
180     def parse_stream_raw(self, stream: IO[str], debug: bool = False) -> NL:
181         """Parse a stream and return the syntax tree."""
182         tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
183         return self.parse_tokens(tokens, debug)
184
185     def parse_stream(self, stream: IO[str], debug: bool = False) -> NL:
186         """Parse a stream and return the syntax tree."""
187         return self.parse_stream_raw(stream, debug)
188
189     def parse_file(
190         self, filename: Path, encoding: Optional[str] = None, debug: bool = False
191     ) -> NL:
192         """Parse a file and return the syntax tree."""
193         with open(filename, encoding=encoding) as stream:
194             return self.parse_stream(stream, debug)
195
196     def parse_string(self, text: str, debug: bool = False) -> NL:
197         """Parse a string and return the syntax tree."""
198         tokens = tokenize.generate_tokens(
199             io.StringIO(text).readline, grammar=self.grammar
200         )
201         return self.parse_tokens(tokens, debug)
202
203     def _partially_consume_prefix(self, prefix: str, column: int) -> Tuple[str, str]:
204         lines: List[str] = []
205         current_line = ""
206         current_column = 0
207         wait_for_nl = False
208         for char in prefix:
209             current_line += char
210             if wait_for_nl:
211                 if char == "\n":
212                     if current_line.strip() and current_column < column:
213                         res = "".join(lines)
214                         return res, prefix[len(res) :]
215
216                     lines.append(current_line)
217                     current_line = ""
218                     current_column = 0
219                     wait_for_nl = False
220             elif char in " \t":
221                 current_column += 1
222             elif char == "\n":
223                 # unexpected empty line
224                 current_column = 0
225             else:
226                 # indent is finished
227                 wait_for_nl = True
228         return "".join(lines), current_line
229
230
231 def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str:
232     head, tail = os.path.splitext(gt)
233     if tail == ".txt":
234         tail = ""
235     name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
236     if cache_dir:
237         return os.path.join(cache_dir, os.path.basename(name))
238     else:
239         return name
240
241
242 def load_grammar(
243     gt: str = "Grammar.txt",
244     gp: Optional[str] = None,
245     save: bool = True,
246     force: bool = False,
247     logger: Optional[Logger] = None,
248 ) -> Grammar:
249     """Load the grammar (maybe from a pickle)."""
250     if logger is None:
251         logger = logging.getLogger(__name__)
252     gp = _generate_pickle_name(gt) if gp is None else gp
253     if force or not _newer(gp, gt):
254         g: grammar.Grammar = pgen.generate_grammar(gt)
255         if save:
256             try:
257                 g.dump(gp)
258             except OSError:
259                 # Ignore error, caching is not vital.
260                 pass
261     else:
262         g = grammar.Grammar()
263         g.load(gp)
264     return g
265
266
267 def _newer(a: str, b: str) -> bool:
268     """Inquire whether file a was written since file b."""
269     if not os.path.exists(a):
270         return False
271     if not os.path.exists(b):
272         return True
273     return os.path.getmtime(a) >= os.path.getmtime(b)
274
275
276 def load_packaged_grammar(
277     package: str, grammar_source: str, cache_dir: Optional[Path] = None
278 ) -> grammar.Grammar:
279     """Normally, loads a pickled grammar by doing
280         pkgutil.get_data(package, pickled_grammar)
281     where *pickled_grammar* is computed from *grammar_source* by adding the
282     Python version and using a ``.pickle`` extension.
283
284     However, if *grammar_source* is an extant file, load_grammar(grammar_source)
285     is called instead. This facilitates using a packaged grammar file when needed
286     but preserves load_grammar's automatic regeneration behavior when possible.
287
288     """
289     if os.path.isfile(grammar_source):
290         gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
291         return load_grammar(grammar_source, gp=gp)
292     pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
293     data = pkgutil.get_data(package, pickled_name)
294     assert data is not None
295     g = grammar.Grammar()
296     g.loads(data)
297     return g
298
299
300 def main(*args: str) -> bool:
301     """Main program, when run as a script: produce grammar pickle files.
302
303     Calls load_grammar for each argument, a path to a grammar text file.
304     """
305     if not args:
306         args = tuple(sys.argv[1:])
307     logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
308     for gt in args:
309         load_grammar(gt, save=True, force=True)
310     return True
311
312
313 if __name__ == "__main__":
314     sys.exit(int(not main()))