All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
1 # Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
5 # Copyright 2006 Google, Inc. All Rights Reserved.
6 # Licensed to PSF under a Contributor Agreement.
10 This provides a high-level interface to parse a file into a syntax tree.
14 __author__ = "Guido van Rossum <guido@python.org>"
16 __all__ = ["Driver", "load_grammar"]
38 from contextlib import contextmanager
39 from dataclasses import dataclass, field
42 from . import grammar, parse, token, tokenize, pgen
43 from logging import Logger
44 from blib2to3.pytree import NL
45 from blib2to3.pgen2.grammar import Grammar
46 from blib2to3.pgen2.tokenize import GoodTokenInfo
48 Path = Union[str, "os.PathLike[str]"]
54 end: Optional[int] = None
55 tokens: List[Any] = field(default_factory=list)
57 def lock(self) -> None:
58 total_eaten = len(self.tokens)
59 self.end = self.start + total_eaten
63 def __init__(self, generator: Any) -> None:
64 self._tokens = generator
66 self._release_ranges: List[ReleaseRange] = []
69 def release(self) -> Iterator["TokenProxy"]:
70 release_range = ReleaseRange(self._counter)
71 self._release_ranges.append(release_range)
75 # Lock the last release range to the final position that
79 def eat(self, point: int) -> Any:
80 eaten_tokens = self._release_ranges[-1].tokens
81 if point < len(eaten_tokens):
82 return eaten_tokens[point]
84 while point >= len(eaten_tokens):
85 token = next(self._tokens)
86 eaten_tokens.append(token)
89 def __iter__(self) -> "TokenProxy":
92 def __next__(self) -> Any:
93 # If the current position is already compromised (looked up)
94 # return the eaten token, if not just go further on the given
96 for release_range in self._release_ranges:
97 assert release_range.end is not None
99 start, end = release_range.start, release_range.end
100 if start <= self._counter < end:
101 token = release_range.tokens[self._counter - start]
104 token = next(self._tokens)
108 def can_advance(self, to: int) -> bool:
109 # Try to eat, fail if it can't. The eat operation is cached
110 # so there wont be any additional cost of eating here
113 except StopIteration:
119 class Driver(object):
120 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
121 self.grammar = grammar
123 logger = logging.getLogger(__name__)
126 def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
127 """Parse a series of tokens and return the syntax tree."""
128 # XXX Move the prefix computation into a wrapper around tokenize.
129 proxy = TokenProxy(tokens)
131 p = parse.Parser(self.grammar)
136 indent_columns: List[int] = []
137 type = value = start = end = line_text = None
140 for quintuple in proxy:
141 type, value, start, end, line_text = quintuple
142 if start != (lineno, column):
143 assert (lineno, column) <= start, ((lineno, column), start)
144 s_lineno, s_column = start
145 if lineno < s_lineno:
146 prefix += "\n" * (s_lineno - lineno)
149 if column < s_column:
150 prefix += line_text[column:s_column]
152 if type in (tokenize.COMMENT, tokenize.NL):
155 if value.endswith("\n"):
160 type = grammar.opmap[value]
162 assert type is not None
164 "%s %r (prefix=%r)", token.tok_name[type], value, prefix
166 if type == token.INDENT:
167 indent_columns.append(len(value))
168 _prefix = prefix + value
171 elif type == token.DEDENT:
172 _indent_col = indent_columns.pop()
173 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
174 if p.addtoken(cast(int, type), value, (prefix, start)):
176 self.logger.debug("Stop.")
179 if type in {token.INDENT, token.DEDENT}:
182 if value.endswith("\n"):
186 # We never broke out -- EOF is too soon (how can this happen???)
187 assert start is not None
188 raise parse.ParseError("incomplete input", type, value, (prefix, start))
189 assert p.rootnode is not None
192 def parse_stream_raw(self, stream: IO[Text], debug: bool = False) -> NL:
193 """Parse a stream and return the syntax tree."""
194 tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
195 return self.parse_tokens(tokens, debug)
197 def parse_stream(self, stream: IO[Text], debug: bool = False) -> NL:
198 """Parse a stream and return the syntax tree."""
199 return self.parse_stream_raw(stream, debug)
202 self, filename: Path, encoding: Optional[Text] = None, debug: bool = False
204 """Parse a file and return the syntax tree."""
205 with io.open(filename, "r", encoding=encoding) as stream:
206 return self.parse_stream(stream, debug)
208 def parse_string(self, text: Text, debug: bool = False) -> NL:
209 """Parse a string and return the syntax tree."""
210 tokens = tokenize.generate_tokens(
211 io.StringIO(text).readline, grammar=self.grammar
213 return self.parse_tokens(tokens, debug)
215 def _partially_consume_prefix(self, prefix: Text, column: int) -> Tuple[Text, Text]:
216 lines: List[str] = []
224 if current_line.strip() and current_column < column:
226 return res, prefix[len(res) :]
228 lines.append(current_line)
235 # unexpected empty line
240 return "".join(lines), current_line
243 def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> Text:
244 head, tail = os.path.splitext(gt)
247 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
249 return os.path.join(cache_dir, os.path.basename(name))
255 gt: Text = "Grammar.txt",
256 gp: Optional[Text] = None,
259 logger: Optional[Logger] = None,
261 """Load the grammar (maybe from a pickle)."""
263 logger = logging.getLogger(__name__)
264 gp = _generate_pickle_name(gt) if gp is None else gp
265 if force or not _newer(gp, gt):
266 logger.info("Generating grammar tables from %s", gt)
267 g: grammar.Grammar = pgen.generate_grammar(gt)
269 logger.info("Writing grammar tables to %s", gp)
273 logger.info("Writing failed: %s", e)
275 g = grammar.Grammar()
280 def _newer(a: Text, b: Text) -> bool:
281 """Inquire whether file a was written since file b."""
282 if not os.path.exists(a):
284 if not os.path.exists(b):
286 return os.path.getmtime(a) >= os.path.getmtime(b)
289 def load_packaged_grammar(
290 package: str, grammar_source: Text, cache_dir: Optional[Path] = None
291 ) -> grammar.Grammar:
292 """Normally, loads a pickled grammar by doing
293 pkgutil.get_data(package, pickled_grammar)
294 where *pickled_grammar* is computed from *grammar_source* by adding the
295 Python version and using a ``.pickle`` extension.
297 However, if *grammar_source* is an extant file, load_grammar(grammar_source)
298 is called instead. This facilitates using a packaged grammar file when needed
299 but preserves load_grammar's automatic regeneration behavior when possible.
302 if os.path.isfile(grammar_source):
303 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
304 return load_grammar(grammar_source, gp=gp)
305 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
306 data = pkgutil.get_data(package, pickled_name)
307 assert data is not None
308 g = grammar.Grammar()
313 def main(*args: Text) -> bool:
314 """Main program, when run as a script: produce grammar pickle files.
316 Calls load_grammar for each argument, a path to a grammar text file.
319 args = tuple(sys.argv[1:])
320 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
322 load_grammar(gt, save=True, force=True)
326 if __name__ == "__main__":
327 sys.exit(int(not main()))