import asyncio
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
-from functools import partial
+from functools import partial, wraps
import keyword
import os
from pathlib import Path
import tokenize
import sys
from typing import (
- Dict, Generic, Iterable, Iterator, List, Optional, Set, Tuple, Type, TypeVar, Union
+ Callable,
+ Dict,
+ Generic,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
)
from attr import dataclass, Factory
NodeType = int
LeafID = int
Priority = int
+Index = int
LN = Union[Leaf, Node]
+SplitFunc = Callable[['Line', bool], Iterator['Line']]
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg='red', err=True)
depth: int = 0
leaves: List[Leaf] = Factory(list)
- comments: Dict[LeafID, Leaf] = Factory(dict)
+ comments: List[Tuple[Index, Leaf]] = Factory(list)
bracket_tracker: BracketTracker = Factory(BracketTracker)
inside_brackets: bool = False
has_for: bool = False
self.bracket_tracker.mark(leaf)
self.maybe_remove_trailing_comma(leaf)
self.maybe_increment_for_loop_variable(leaf)
- if self.maybe_adapt_standalone_comment(leaf):
- return
if not self.append_comment(leaf):
self.leaves.append(leaf)
+ def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
+ """Like :func:`append()` but disallow invalid standalone comment structure.
+
+ Raises ValueError when any `leaf` is appended after a standalone comment
+ or when a standalone comment is not the first leaf on the line.
+ """
+ if self.bracket_tracker.depth == 0:
+ if self.is_comment:
+ raise ValueError("cannot append to standalone comments")
+
+ if self.leaves and leaf.type == STANDALONE_COMMENT:
+ raise ValueError(
+ "cannot append standalone comments to a populated line"
+ )
+
+ self.append(leaf, preformatted=preformatted)
+
@property
def is_comment(self) -> bool:
"""Is this line a standalone comment?"""
- return bool(self) and self.leaves[0].type == STANDALONE_COMMENT
+ return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
@property
def is_decorator(self) -> bool:
and self.leaves[0].value == 'yield'
)
+ @property
+ def contains_standalone_comments(self) -> bool:
+ """If so, needs to be split before emitting."""
+ for leaf in self.leaves:
+ if leaf.type == STANDALONE_COMMENT:
+ return True
+
+ return False
+
def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
"""Remove trailing comma if there is one and it's safe."""
if not (
return False
if closing.type == token.RBRACE:
- self.leaves.pop()
+ self.remove_trailing_comma()
return True
if closing.type == token.RSQB:
comma = self.leaves[-1]
if comma.parent and comma.parent.type == syms.listmaker:
- self.leaves.pop()
+ self.remove_trailing_comma()
return True
# For parens let's check if it's safe to remove the comma. If the
break
if commas > 1:
- self.leaves.pop()
+ self.remove_trailing_comma()
return True
return False
return False
- def maybe_adapt_standalone_comment(self, comment: Leaf) -> bool:
- """Hack a standalone comment to act as a trailing comment for line splitting.
-
- If this line has brackets and a standalone `comment`, we need to adapt
- it to be able to still reformat the line.
-
- This is not perfect, the line to which the standalone comment gets
- appended will appear "too long" when splitting.
- """
- if not (
+ def append_comment(self, comment: Leaf) -> bool:
+ """Add an inline or standalone comment to the line."""
+ if (
comment.type == STANDALONE_COMMENT
and self.bracket_tracker.any_open_brackets()
):
+ comment.prefix = ''
return False
- comment.type = token.COMMENT
- comment.prefix = '\n' + ' ' * (self.depth + 1)
- return self.append_comment(comment)
-
- def append_comment(self, comment: Leaf) -> bool:
- """Add an inline comment to the line."""
if comment.type != token.COMMENT:
return False
- try:
- after = id(self.last_non_delimiter())
- except LookupError:
+ after = len(self.leaves) - 1
+ if after == -1:
comment.type = STANDALONE_COMMENT
comment.prefix = ''
return False
else:
- if after in self.comments:
- self.comments[after].value += str(comment)
- else:
- self.comments[after] = comment
+ self.comments.append((after, comment))
return True
- def last_non_delimiter(self) -> Leaf:
- """Return the last non-delimiter on the line. Raise LookupError otherwise."""
- for i in range(len(self.leaves)):
- last = self.leaves[-i - 1]
- if not is_delimiter(last):
- return last
+ def comments_after(self, leaf: Leaf) -> Iterator[Leaf]:
+ """Generate comments that should appear directly after `leaf`."""
+ for _leaf_index, _leaf in enumerate(self.leaves):
+ if leaf is _leaf:
+ break
+
+ else:
+ return
- raise LookupError("No non-delimiters found")
+ for index, comment_after in self.comments:
+ if _leaf_index == index:
+ yield comment_after
+
+ def remove_trailing_comma(self) -> None:
+ """Remove the trailing comma and moves the comments attached to it."""
+ comma_index = len(self.leaves) - 1
+ for i in range(len(self.comments)):
+ comment_index, comment = self.comments[i]
+ if comment_index == comma_index:
+ self.comments[i] = (comma_index - 1, comment)
+ self.leaves.pop()
def __str__(self) -> str:
"""Render the line."""
res = f'{first.prefix}{indent}{first.value}'
for leaf in leaves:
res += str(leaf)
- for comment in self.comments.values():
+ for _, comment in self.comments:
res += str(comment)
return res + '\n'
"""Does nothing and returns False."""
return False
- def maybe_adapt_standalone_comment(self, comment: Leaf) -> bool:
- """Does nothing and returns False."""
- return False
-
@dataclass
class EmptyLineTracker:
If `py36` is True, splitting may generate syntax that is only compatible
with Python 3.6 and later.
"""
- if isinstance(line, UnformattedLines):
+ if isinstance(line, UnformattedLines) or line.is_comment:
yield line
return
line_str = str(line).strip('\n')
- if len(line_str) <= line_length and '\n' not in line_str:
+ if (
+ len(line_str) <= line_length
+ and '\n' not in line_str # multiline strings
+ and not line.contains_standalone_comments
+ ):
yield line
return
+ split_funcs: List[SplitFunc]
if line.is_def:
split_funcs = [left_hand_split]
elif line.inside_brackets:
- split_funcs = [delimiter_split]
- if '\n' not in line_str:
- # Only attempt RHS if we don't have multiline strings or comments
- # on this line.
- split_funcs.append(right_hand_split)
+ split_funcs = [delimiter_split, standalone_comment_split, right_hand_split]
else:
split_funcs = [right_hand_split]
for split_func in split_funcs:
# split altogether.
result: List[Line] = []
try:
- for l in split_func(line, py36=py36):
+ for l in split_func(line, py36):
if str(l).strip('\n') == line_str:
raise CannotSplit("Split function returned an unchanged result")
):
for leaf in leaves:
result.append(leaf, preformatted=True)
- comment_after = line.comments.get(id(leaf))
- if comment_after:
+ for comment_after in line.comments_after(leaf):
result.append(comment_after, preformatted=True)
bracket_split_succeeded_or_raise(head, body, tail)
for result in (head, body, tail):
):
for leaf in leaves:
result.append(leaf, preformatted=True)
- comment_after = line.comments.get(id(leaf))
- if comment_after:
+ for comment_after in line.comments_after(leaf):
result.append(comment_after, preformatted=True)
bracket_split_succeeded_or_raise(head, body, tail)
for result in (head, body, tail):
)
+def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
+ """Normalize prefix of the first leaf in every line returned by `split_func`.
+
+ This is a decorator over relevant split functions.
+ """
+
+ @wraps(split_func)
+ def split_wrapper(line: Line, py36: bool = False) -> Iterator[Line]:
+ for l in split_func(line, py36):
+ normalize_prefix(l.leaves[0], inside_brackets=True)
+ yield l
+
+ return split_wrapper
+
+
+@dont_increase_indentation
def delimiter_split(line: Line, py36: bool = False) -> Iterator[Line]:
"""Split according to delimiters of the highest priority.
- This kind of split doesn't increase indentation.
If `py36` is True, the split will add trailing commas also in function
signatures that contain `*` and `**`.
"""
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
lowest_depth = sys.maxsize
trailing_comma_safe = True
+
+ def append_to_line(leaf: Leaf) -> Iterator[Line]:
+ """Append `leaf` to current line or to new line if appending impossible."""
+ nonlocal current_line
+ try:
+ current_line.append_safe(leaf, preformatted=True)
+ except ValueError as ve:
+ yield current_line
+
+ current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
+ current_line.append(leaf)
+
for leaf in line.leaves:
- current_line.append(leaf, preformatted=True)
- comment_after = line.comments.get(id(leaf))
- if comment_after:
- current_line.append(comment_after, preformatted=True)
+ yield from append_to_line(leaf)
+
+ for comment_after in line.comments_after(leaf):
+ yield from append_to_line(comment_after)
+
lowest_depth = min(lowest_depth, leaf.bracket_depth)
if (
leaf.bracket_depth == lowest_depth
trailing_comma_safe = trailing_comma_safe and py36
leaf_priority = delimiters.get(id(leaf))
if leaf_priority == delimiter_priority:
- normalize_prefix(current_line.leaves[0], inside_brackets=True)
yield current_line
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
and trailing_comma_safe
):
current_line.append(Leaf(token.COMMA, ','))
- normalize_prefix(current_line.leaves[0], inside_brackets=True)
+ yield current_line
+
+
+@dont_increase_indentation
+def standalone_comment_split(line: Line, py36: bool = False) -> Iterator[Line]:
+ """Split standalone comments from the rest of the line."""
+ for leaf in line.leaves:
+ if leaf.type == STANDALONE_COMMENT:
+ if leaf.bracket_depth == 0:
+ break
+
+ else:
+ raise CannotSplit("Line does not have any standalone comments")
+
+ current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
+
+ def append_to_line(leaf: Leaf) -> Iterator[Line]:
+ """Append `leaf` to current line or to new line if appending impossible."""
+ nonlocal current_line
+ try:
+ current_line.append_safe(leaf, preformatted=True)
+ except ValueError as ve:
+ yield current_line
+
+ current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
+ current_line.append(leaf)
+
+ for leaf in line.leaves:
+ yield from append_to_line(leaf)
+
+ for comment_after in line.comments_after(leaf):
+ yield from append_to_line(comment_after)
+
+ if current_line:
yield current_line
--- /dev/null
+class C:
+
+ @pytest.mark.parametrize(
+ ("post_data", "message"),
+ [
+ # metadata_version errors.
+ (
+ {},
+ "None is an invalid value for Metadata-Version. "
+ "Error: This field is required. "
+ "see "
+ "https://packaging.python.org/specifications/core-metadata",
+ ),
+ (
+ {"metadata_version": "-1"},
+ "'-1' is an invalid value for Metadata-Version. "
+ "Error: Unknown Metadata Version "
+ "see "
+ "https://packaging.python.org/specifications/core-metadata",
+ ),
+ # name errors.
+ (
+ {"metadata_version": "1.2"},
+ "'' is an invalid value for Name. "
+ "Error: This field is required. "
+ "see "
+ "https://packaging.python.org/specifications/core-metadata",
+ ),
+ (
+ {"metadata_version": "1.2", "name": "foo-"},
+ "'foo-' is an invalid value for Name. "
+ "Error: Must start and end with a letter or numeral and "
+ "contain only ascii numeric and '.', '_' and '-'. "
+ "see "
+ "https://packaging.python.org/specifications/core-metadata",
+ ),
+ # version errors.
+ (
+ {"metadata_version": "1.2", "name": "example"},
+ "'' is an invalid value for Version. "
+ "Error: This field is required. "
+ "see "
+ "https://packaging.python.org/specifications/core-metadata",
+ ),
+ (
+ {"metadata_version": "1.2", "name": "example", "version": "dog"},
+ "'dog' is an invalid value for Version. "
+ "Error: Must start and end with a letter or numeral and "
+ "contain only ascii numeric and '.', '_' and '-'. "
+ "see "
+ "https://packaging.python.org/specifications/core-metadata",
+ ),
+ ],
+ )
+ def test_fails_invalid_post_data(
+ self, pyramid_config, db_request, post_data, message
+ ):
+ pyramid_config.testing_securitypolicy(userid=1)
+ db_request.POST = MultiDict(post_data)
+
+
+def foo(list_a, list_b):
+ results = (
+ User.query.filter(User.foo == 'bar').filter( # Because foo.
+ db.or_(User.field_a.astext.in_(list_a), User.field_b.astext.in_(list_b))
+ ).filter(
+ User.xyz.is_(None)
+ )
+ # Another comment about the filtering on is_quux goes here.
+ .filter(db.not_(User.is_pending.astext.cast(db.Boolean).is_(True))).order_by(
+ User.created_at.desc()
+ ).with_for_update(
+ key_share=True
+ ).all()
+ )
+ return results