All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
1 # Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
5 # Copyright 2006 Google, Inc. All Rights Reserved.
6 # Licensed to PSF under a Contributor Agreement.
10 This provides a high-level interface to parse a file into a syntax tree.
14 __author__ = "Guido van Rossum <guido@python.org>"
16 __all__ = ["Driver", "load_grammar"]
37 from dataclasses import dataclass, field
40 from . import grammar, parse, token, tokenize, pgen
41 from logging import Logger
42 from blib2to3.pytree import _Convert, NL
43 from blib2to3.pgen2.grammar import Grammar
44 from contextlib import contextmanager
46 Path = Union[str, "os.PathLike[str]"]
52 end: Optional[int] = None
53 tokens: List[Any] = field(default_factory=list)
55 def lock(self) -> None:
56 total_eaten = len(self.tokens)
57 self.end = self.start + total_eaten
61 def __init__(self, generator: Any) -> None:
62 self._tokens = generator
64 self._release_ranges: List[ReleaseRange] = []
67 def release(self) -> Iterator["TokenProxy"]:
68 release_range = ReleaseRange(self._counter)
69 self._release_ranges.append(release_range)
73 # Lock the last release range to the final position that
77 def eat(self, point: int) -> Any:
78 eaten_tokens = self._release_ranges[-1].tokens
79 if point < len(eaten_tokens):
80 return eaten_tokens[point]
82 while point >= len(eaten_tokens):
83 token = next(self._tokens)
84 eaten_tokens.append(token)
87 def __iter__(self) -> "TokenProxy":
90 def __next__(self) -> Any:
91 # If the current position is already compromised (looked up)
92 # return the eaten token, if not just go further on the given
94 for release_range in self._release_ranges:
95 assert release_range.end is not None
97 start, end = release_range.start, release_range.end
98 if start <= self._counter < end:
99 token = release_range.tokens[self._counter - start]
102 token = next(self._tokens)
106 def can_advance(self, to: int) -> bool:
107 # Try to eat, fail if it can't. The eat operation is cached
108 # so there wont be any additional cost of eating here
111 except StopIteration:
117 class Driver(object):
121 convert: Optional[_Convert] = None,
122 logger: Optional[Logger] = None,
124 self.grammar = grammar
126 logger = logging.getLogger(__name__)
128 self.convert = convert
130 def parse_tokens(self, tokens: Iterable[Any], debug: bool = False) -> NL:
131 """Parse a series of tokens and return the syntax tree."""
132 # XXX Move the prefix computation into a wrapper around tokenize.
133 proxy = TokenProxy(tokens)
135 p = parse.Parser(self.grammar, self.convert)
141 type = value = start = end = line_text = None
144 for quintuple in proxy:
145 type, value, start, end, line_text = quintuple
146 if start != (lineno, column):
147 assert (lineno, column) <= start, ((lineno, column), start)
148 s_lineno, s_column = start
149 if lineno < s_lineno:
150 prefix += "\n" * (s_lineno - lineno)
153 if column < s_column:
154 prefix += line_text[column:s_column]
156 if type in (tokenize.COMMENT, tokenize.NL):
159 if value.endswith("\n"):
164 type = grammar.opmap[value]
167 "%s %r (prefix=%r)", token.tok_name[type], value, prefix
169 if type == token.INDENT:
170 indent_columns.append(len(value))
171 _prefix = prefix + value
174 elif type == token.DEDENT:
175 _indent_col = indent_columns.pop()
176 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
177 if p.addtoken(type, value, (prefix, start)):
179 self.logger.debug("Stop.")
182 if type in {token.INDENT, token.DEDENT}:
185 if value.endswith("\n"):
189 # We never broke out -- EOF is too soon (how can this happen???)
190 assert start is not None
191 raise parse.ParseError("incomplete input", type, value, (prefix, start))
192 assert p.rootnode is not None
195 def parse_stream_raw(self, stream: IO[Text], debug: bool = False) -> NL:
196 """Parse a stream and return the syntax tree."""
197 tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
198 return self.parse_tokens(tokens, debug)
200 def parse_stream(self, stream: IO[Text], debug: bool = False) -> NL:
201 """Parse a stream and return the syntax tree."""
202 return self.parse_stream_raw(stream, debug)
205 self, filename: Path, encoding: Optional[Text] = None, debug: bool = False
207 """Parse a file and return the syntax tree."""
208 with io.open(filename, "r", encoding=encoding) as stream:
209 return self.parse_stream(stream, debug)
211 def parse_string(self, text: Text, debug: bool = False) -> NL:
212 """Parse a string and return the syntax tree."""
213 tokens = tokenize.generate_tokens(
214 io.StringIO(text).readline, grammar=self.grammar
216 return self.parse_tokens(tokens, debug)
218 def _partially_consume_prefix(self, prefix: Text, column: int) -> Tuple[Text, Text]:
219 lines: List[str] = []
227 if current_line.strip() and current_column < column:
229 return res, prefix[len(res) :]
231 lines.append(current_line)
238 # unexpected empty line
243 return "".join(lines), current_line
246 def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> Text:
247 head, tail = os.path.splitext(gt)
250 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
252 return os.path.join(cache_dir, os.path.basename(name))
258 gt: Text = "Grammar.txt",
259 gp: Optional[Text] = None,
262 logger: Optional[Logger] = None,
264 """Load the grammar (maybe from a pickle)."""
266 logger = logging.getLogger(__name__)
267 gp = _generate_pickle_name(gt) if gp is None else gp
268 if force or not _newer(gp, gt):
269 logger.info("Generating grammar tables from %s", gt)
270 g: grammar.Grammar = pgen.generate_grammar(gt)
272 logger.info("Writing grammar tables to %s", gp)
276 logger.info("Writing failed: %s", e)
278 g = grammar.Grammar()
283 def _newer(a: Text, b: Text) -> bool:
284 """Inquire whether file a was written since file b."""
285 if not os.path.exists(a):
287 if not os.path.exists(b):
289 return os.path.getmtime(a) >= os.path.getmtime(b)
292 def load_packaged_grammar(
293 package: str, grammar_source: Text, cache_dir: Optional[Path] = None
294 ) -> grammar.Grammar:
295 """Normally, loads a pickled grammar by doing
296 pkgutil.get_data(package, pickled_grammar)
297 where *pickled_grammar* is computed from *grammar_source* by adding the
298 Python version and using a ``.pickle`` extension.
300 However, if *grammar_source* is an extant file, load_grammar(grammar_source)
301 is called instead. This facilitates using a packaged grammar file when needed
302 but preserves load_grammar's automatic regeneration behavior when possible.
305 if os.path.isfile(grammar_source):
306 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
307 return load_grammar(grammar_source, gp=gp)
308 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
309 data = pkgutil.get_data(package, pickled_name)
310 assert data is not None
311 g = grammar.Grammar()
316 def main(*args: Text) -> bool:
317 """Main program, when run as a script: produce grammar pickle files.
319 Calls load_grammar for each argument, a path to a grammar text file.
322 args = tuple(sys.argv[1:])
323 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
325 load_grammar(gt, save=True, force=True)
329 if __name__ == "__main__":
330 sys.exit(int(not main()))