from blib2to3.pgen2 import driver, token
from blib2to3.pgen2.grammar import Grammar
from blib2to3.pgen2.parse import ParseError
+from blib2to3.pgen2.tokenize import TokenizerConfig
__version__ = "19.3b0"
NUMERIC_UNDERSCORES = 3
TRAILING_COMMA_IN_CALL = 4
TRAILING_COMMA_IN_DEF = 5
+ # The following two feature-flags are mutually exclusive, and exactly one should be
+ # set for every version of python.
+ ASYNC_IS_VALID_IDENTIFIER = 6
+ ASYNC_IS_RESERVED_KEYWORD = 7
VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
- TargetVersion.PY27: set(),
- TargetVersion.PY33: {Feature.UNICODE_LITERALS},
- TargetVersion.PY34: {Feature.UNICODE_LITERALS},
- TargetVersion.PY35: {Feature.UNICODE_LITERALS, Feature.TRAILING_COMMA_IN_CALL},
+ TargetVersion.PY27: {Feature.ASYNC_IS_VALID_IDENTIFIER},
+ TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IS_VALID_IDENTIFIER},
+ TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IS_VALID_IDENTIFIER},
+ TargetVersion.PY35: {
+ Feature.UNICODE_LITERALS,
+ Feature.TRAILING_COMMA_IN_CALL,
+ Feature.ASYNC_IS_VALID_IDENTIFIER,
+ },
TargetVersion.PY36: {
Feature.UNICODE_LITERALS,
Feature.F_STRINGS,
Feature.NUMERIC_UNDERSCORES,
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
+ Feature.ASYNC_IS_VALID_IDENTIFIER,
},
TargetVersion.PY37: {
Feature.UNICODE_LITERALS,
Feature.NUMERIC_UNDERSCORES,
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
+ Feature.ASYNC_IS_RESERVED_KEYWORD,
},
TargetVersion.PY38: {
Feature.UNICODE_LITERALS,
Feature.NUMERIC_UNDERSCORES,
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
+ Feature.ASYNC_IS_RESERVED_KEYWORD,
},
}
return tiow.read(), encoding, newline
-def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
+@dataclass(frozen=True)
+class ParserConfig:
+ grammar: Grammar
+ tokenizer_config: TokenizerConfig = TokenizerConfig()
+
+
+def get_parser_configs(target_versions: Set[TargetVersion]) -> List[ParserConfig]:
if not target_versions:
# No target_version specified, so try all grammars.
return [
- pygram.python_grammar_no_print_statement_no_exec_statement,
- pygram.python_grammar_no_print_statement,
- pygram.python_grammar,
+ # Python 3.7+
+ ParserConfig(
+ pygram.python_grammar_no_print_statement_no_exec_statement,
+ TokenizerConfig(async_is_reserved_keyword=True),
+ ),
+ # Python 3.0-3.6
+ ParserConfig(
+ pygram.python_grammar_no_print_statement_no_exec_statement,
+ TokenizerConfig(async_is_reserved_keyword=False),
+ ),
+ # Python 2.7 with future print_function import
+ ParserConfig(pygram.python_grammar_no_print_statement),
+ # Python 2.7
+ ParserConfig(pygram.python_grammar),
]
elif all(version.is_python2() for version in target_versions):
# Python 2-only code, so try Python 2 grammars.
- return [pygram.python_grammar_no_print_statement, pygram.python_grammar]
+ return [
+ # Python 2.7 with future print_function import
+ ParserConfig(pygram.python_grammar_no_print_statement),
+ # Python 2.7
+ ParserConfig(pygram.python_grammar),
+ ]
else:
# Python 3-compatible code, so only try Python 3 grammar.
- return [pygram.python_grammar_no_print_statement_no_exec_statement]
+ configs = []
+ # If we have to parse both, try to parse async as a keyword first
+ if not supports_feature(target_versions, Feature.ASYNC_IS_VALID_IDENTIFIER):
+ # Python 3.7+
+ configs.append(
+ ParserConfig(
+ pygram.python_grammar_no_print_statement_no_exec_statement,
+ TokenizerConfig(async_is_reserved_keyword=True),
+ )
+ )
+ if not supports_feature(target_versions, Feature.ASYNC_IS_RESERVED_KEYWORD):
+ # Python 3.0-3.6
+ configs.append(
+ ParserConfig(
+ pygram.python_grammar_no_print_statement_no_exec_statement,
+ TokenizerConfig(async_is_reserved_keyword=False),
+ )
+ )
+ # At least one of the above branches must have been taken, because every Python
+ # version has exactly one of the two 'ASYNC_IS_*' flags
+ return configs
def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
if src_txt[-1:] != "\n":
src_txt += "\n"
- for grammar in get_grammars(set(target_versions)):
- drv = driver.Driver(grammar, pytree.convert)
+ for parser_config in get_parser_configs(set(target_versions)):
+ drv = driver.Driver(
+ parser_config.grammar,
+ pytree.convert,
+ tokenizer_config=parser_config.tokenizer_config,
+ )
try:
result = drv.parse_string(src_txt, True)
break
class Driver(object):
- def __init__(self, grammar, convert=None, logger=None):
+ def __init__(
+ self,
+ grammar,
+ convert=None,
+ logger=None,
+ tokenizer_config=tokenize.TokenizerConfig(),
+ ):
self.grammar = grammar
if logger is None:
logger = logging.getLogger(__name__)
self.logger = logger
self.convert = convert
+ self.tokenizer_config = tokenizer_config
def parse_tokens(self, tokens, debug=False):
"""Parse a series of tokens and return the syntax tree."""
def parse_stream_raw(self, stream, debug=False):
"""Parse a stream and return the syntax tree."""
- tokens = tokenize.generate_tokens(stream.readline)
+ tokens = tokenize.generate_tokens(stream.readline, config=self.tokenizer_config)
return self.parse_tokens(tokens, debug)
def parse_stream(self, stream, debug=False):
def parse_string(self, text, debug=False):
"""Parse a string and return the syntax tree."""
- tokens = tokenize.generate_tokens(io.StringIO(text).readline)
+ tokens = tokenize.generate_tokens(
+ io.StringIO(text).readline,
+ config=self.tokenizer_config,
+ )
return self.parse_tokens(tokens, debug)
def _partially_consume_prefix(self, prefix, column):
from blib2to3.pytree import _Convert, _NL
from blib2to3.pgen2 import _Path
from blib2to3.pgen2.grammar import Grammar
+from blib2to3.pgen2.tokenize import TokenizerConfig
class Driver:
grammar: Grammar
logger: Logger
convert: _Convert
- def __init__(self, grammar: Grammar, convert: Optional[_Convert] = ..., logger: Optional[Logger] = ...) -> None: ...
+ def __init__(
+ self,
+ grammar: Grammar,
+ convert: Optional[_Convert] = ...,
+ logger: Optional[Logger] = ...,
+ tokenizer_config: TokenizerConfig = ...
+ ) -> None: ...
def parse_tokens(self, tokens: Iterable[Any], debug: bool = ...) -> _NL: ...
def parse_stream_raw(self, stream: IO[Text], debug: bool = ...) -> _NL: ...
def parse_stream(self, stream: IO[Text], debug: bool = ...) -> _NL: ...
import re
from codecs import BOM_UTF8, lookup
+from attr import dataclass
from blib2to3.pgen2.token import *
from . import token
tabsize = 8
+@dataclass(frozen=True)
+class TokenizerConfig:
+ async_is_reserved_keyword: bool = False
+
class TokenError(Exception): pass
class StopTokenizing(Exception): pass
ut = Untokenizer()
return ut.untokenize(iterable)
-def generate_tokens(readline):
+def generate_tokens(readline, config: TokenizerConfig = TokenizerConfig()):
"""
The generate_tokens() generator requires one argument, readline, which
must be a callable object which provides the same interface as the
contline = None
indents = [0]
+ # If we know we're parsing 3.7+, we can unconditionally parse `async` and
+ # `await` as keywords.
+ async_is_reserved_keyword = config.async_is_reserved_keyword
# 'stashed' and 'async_*' are used for async/await parsing
stashed = None
async_def = False
yield (STRING, token, spos, epos, line)
elif initial.isidentifier(): # ordinary name
if token in ('async', 'await'):
- if async_def:
+ if async_is_reserved_keyword or async_def:
yield (ASYNC if token == 'async' else AWAIT,
token, spos, epos, line)
continue
# NOTE: Only elements from __all__ are present.
from typing import Callable, Iterable, Iterator, List, Text, Tuple
+from attr import dataclass
from blib2to3.pgen2.token import * # noqa
_TokenEater = Callable[[int, Text, _Coord, _Coord, Text], None]
_TokenInfo = Tuple[int, Text, _Coord, _Coord, Text]
+@dataclass(frozen=True)
+class TokenizerConfig:
+ async_is_reserved_keyword: bool = False
class TokenError(Exception): ...
class StopTokenizing(Exception): ...
--- /dev/null
+def async():
+ pass
+
+
+def await():
+ pass
+
+
+await = lambda: None
+async = lambda: None
+async()
+await()
+
+
+def sync_fn():
+ await = lambda: None
+ async = lambda: None
+ async()
+ await()
+
+
+async def async_fn():
+ await async_fn()
+
+
+# output
+def async():
+ pass
+
+
+def await():
+ pass
+
+
+await = lambda: None
+async = lambda: None
+async()
+await()
+
+
+def sync_fn():
+ await = lambda: None
+ async = lambda: None
+ async()
+ await()
+
+
+async def async_fn():
+ await async_fn()
self.async_inc, arange(8), batch_size=3
)
]
+
+def awaited_generator_value(n):
+ return (await awaitable for awaitable in awaitable_list)
+
+def make_arange(n):
+ return (i * 2 for i in range(n) if await wrap(i))
+
+
# output
self.async_inc, arange(8), batch_size=3
)
]
+
+
+def awaited_generator_value(n):
+ return (await awaitable for awaitable in awaitable_list)
+
+
+def make_arange(n):
+ return (i * 2 for i in range(n) if await wrap(i))
self.assertFormatEqual(expected, actual)
black.assert_stable(source, actual, mode)
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_async_as_identifier(self) -> None:
+ source_path = (THIS_DIR / "data" / "async_as_identifier.py").resolve()
+ source, expected = read_data("async_as_identifier")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ major, minor = sys.version_info[:2]
+ if major < 3 or (major <= 3 and minor < 7):
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ # ensure black can parse this when the target is 3.6
+ self.invokeBlack([str(source_path), "--target-version", "py36"])
+ # but not on 3.7, because async/await is no longer an identifier
+ self.invokeBlack([str(source_path), "--target-version", "py37"], exit_code=123)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_python37(self) -> None:
+ source_path = (THIS_DIR / "data" / "python37.py").resolve()
source, expected = read_data("python37")
actual = fs(source)
self.assertFormatEqual(expected, actual)
if major > 3 or (major == 3 and minor >= 7):
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ # ensure black can parse this when the target is 3.7
+ self.invokeBlack([str(source_path), "--target-version", "py37"])
+ # but not on 3.6, because we use async as a reserved keyword
+ self.invokeBlack([str(source_path), "--target-version", "py36"], exit_code=123)
@patch("black.dump_to_file", dump_to_stderr)
def test_fmtonoff(self) -> None: