#!/usr/bin/env python3
+import multiprocessing
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO, TextIOWrapper
import os
from pathlib import Path
+from platform import system
import regex as re
import sys
from tempfile import TemporaryDirectory
black.assert_stable(source, actual, DEFAULT_MODE)
@patch("black.dump_to_file", dump_to_stderr)
- def test_function_trailing_comma_wip(self) -> None:
- source, expected = read_data("function_trailing_comma_wip")
- # sys.settrace(tracefunc)
- actual = fs(source)
- # sys.settrace(None)
+ def _test_wip(self) -> None:
+ source, expected = read_data("wip")
+ sys.settrace(tracefunc)
+ mode = replace(
+ DEFAULT_MODE,
+ experimental_string_processing=False,
+ target_versions={black.TargetVersion.PY38},
+ )
+ actual = fs(source, mode=mode)
+ sys.settrace(None)
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, DEFAULT_MODE)
+ @unittest.expectedFailure
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_trailing_comma_optional_parens_stability1(self) -> None:
+ source, _expected = read_data("trailing_comma_optional_parens1")
+ actual = fs(source)
+ black.assert_stable(source, actual, DEFAULT_MODE)
+
+ @unittest.expectedFailure
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_trailing_comma_optional_parens_stability2(self) -> None:
+ source, _expected = read_data("trailing_comma_optional_parens2")
+ actual = fs(source)
+ black.assert_stable(source, actual, DEFAULT_MODE)
+
+ @unittest.expectedFailure
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_trailing_comma_optional_parens_stability3(self) -> None:
+ source, _expected = read_data("trailing_comma_optional_parens3")
+ actual = fs(source)
+ black.assert_stable(source, actual, DEFAULT_MODE)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_expression(self) -> None:
source, expected = read_data("expression")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, DEFAULT_MODE)
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_docstring_no_string_normalization(self) -> None:
+ """Like test_docstring but with string normalization off."""
+ source, expected = read_data("docstring_no_string_normalization")
+ mode = replace(DEFAULT_MODE, string_normalization=False)
+ actual = fs(source, mode=mode)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, mode)
+
def test_long_strings(self) -> None:
"""Tests for splitting long strings."""
source, expected = read_data("long_strings")
@patch("black.dump_to_file", dump_to_stderr)
def test_comments7(self) -> None:
source, expected = read_data("comments7")
- actual = fs(source)
+ mode = replace(DEFAULT_MODE, target_versions={black.TargetVersion.PY38})
+ actual = fs(source, mode=mode)
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, DEFAULT_MODE)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, DEFAULT_MODE)
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_composition_no_trailing_comma(self) -> None:
+ source, expected = read_data("composition_no_trailing_comma")
+ mode = replace(DEFAULT_MODE, target_versions={black.TargetVersion.PY38})
+ actual = fs(source, mode=mode)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, DEFAULT_MODE)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_empty_lines(self) -> None:
source, expected = read_data("empty_lines")
src = (workspace / "test.py").resolve()
with src.open("w") as fobj:
fobj.write("print('hello')")
- self.invokeBlack([str(src), "--diff"])
- cache_file = black.get_cache_file(mode)
- self.assertFalse(cache_file.exists())
+ with patch("black.read_cache") as read_cache, patch(
+ "black.write_cache"
+ ) as write_cache:
+ self.invokeBlack([str(src), "--diff"])
+ cache_file = black.get_cache_file(mode)
+ self.assertFalse(cache_file.exists())
+ write_cache.assert_not_called()
+ read_cache.assert_not_called()
+
+ def test_no_cache_when_writeback_color_diff(self) -> None:
+ mode = DEFAULT_MODE
+ with cache_dir() as workspace:
+ src = (workspace / "test.py").resolve()
+ with src.open("w") as fobj:
+ fobj.write("print('hello')")
+ with patch("black.read_cache") as read_cache, patch(
+ "black.write_cache"
+ ) as write_cache:
+ self.invokeBlack([str(src), "--diff", "--color"])
+ cache_file = black.get_cache_file(mode)
+ self.assertFalse(cache_file.exists())
+ write_cache.assert_not_called()
+ read_cache.assert_not_called()
+
+ @event_loop()
+ def test_output_locking_when_writeback_diff(self) -> None:
+ with cache_dir() as workspace:
+ for tag in range(0, 4):
+ src = (workspace / f"test{tag}.py").resolve()
+ with src.open("w") as fobj:
+ fobj.write("print('hello')")
+ with patch("black.Manager", wraps=multiprocessing.Manager) as mgr:
+ self.invokeBlack(["--diff", str(workspace)], exit_code=0)
+ # this isn't quite doing what we want, but if it _isn't_
+ # called then we cannot be using the lock it provides
+ mgr.assert_called()
+
+ @event_loop()
+ def test_output_locking_when_writeback_color_diff(self) -> None:
+ with cache_dir() as workspace:
+ for tag in range(0, 4):
+ src = (workspace / f"test{tag}.py").resolve()
+ with src.open("w") as fobj:
+ fobj.write("print('hello')")
+ with patch("black.Manager", wraps=multiprocessing.Manager) as mgr:
+ self.invokeBlack(["--diff", "--color", str(workspace)], exit_code=0)
+ # this isn't quite doing what we want, but if it _isn't_
+ # called then we cannot be using the lock it provides
+ mgr.assert_called()
def test_no_cache_when_stdin(self) -> None:
mode = DEFAULT_MODE
black.assert_stable(source, actual, DEFAULT_MODE)
def test_single_file_force_pyi(self) -> None:
- reg_mode = DEFAULT_MODE
pyi_mode = replace(DEFAULT_MODE, is_pyi=True)
contents, expected = read_data("force_pyi")
with cache_dir() as workspace:
# verify cache with --pyi is separate
pyi_cache = black.read_cache(pyi_mode)
self.assertIn(path, pyi_cache)
- normal_cache = black.read_cache(reg_mode)
+ normal_cache = black.read_cache(DEFAULT_MODE)
self.assertNotIn(path, normal_cache)
- self.assertEqual(actual, expected)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(contents, actual)
+ black.assert_stable(contents, actual, pyi_mode)
@event_loop()
def test_multi_file_force_pyi(self) -> None:
self.assertEqual(black.find_project_root((src_dir,)), src_dir.resolve())
self.assertEqual(black.find_project_root((src_python,)), src_dir.resolve())
+ def test_bpo_33660_workaround(self) -> None:
+ if system() == "Windows":
+ return
+
+ # https://bugs.python.org/issue33660
+
+ old_cwd = Path.cwd()
+ try:
+ root = Path("/")
+ os.chdir(str(root))
+ path = Path("workspace") / "project"
+ report = black.Report(verbose=True)
+ normalized_path = black.normalize_path_maybe_ignore(path, root, report)
+ self.assertEqual(normalized_path, "workspace/project")
+ finally:
+ os.chdir(str(old_cwd))
+
class BlackDTestCase(AioHTTPTestCase):
async def get_application(self) -> web.Application:
return tracefunc
stack = len(inspect.stack()) - 19
+ stack *= 2
filename = frame.f_code.co_filename
lineno = frame.f_lineno
func_sig_lineno = lineno - 1