import re
import six
import subprocess
+import copy
from .task import Task, TaskQuerySet
from .filters import TaskWarriorFilter
logger = logging.getLogger(__name__)
+
class Backend(object):
@abc.abstractproperty
def _get_version(self):
p = subprocess.Popen(
- ['task', '--version'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ ['task', '--version'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
if serialized_value is '':
escaped_serialized_value = ''
else:
- escaped_serialized_value = six.u("'{0}'").format(serialized_value)
+ escaped_serialized_value = six.u("'{0}'").format(
+ serialized_value)
- format_default = lambda task: six.u("{0}:{1}").format(field,
- escaped_serialized_value)
+ format_default = lambda task: six.u("{0}:{1}").format(
+ field, escaped_serialized_value)
format_func = getattr(self, 'format_{0}'.format(field),
format_default)
# Removed dependencies need to be prefixed with '-'
return 'depends:' + ','.join(
- [t['uuid'] for t in added] +
- ['-' + t['uuid'] for t in removed]
- )
+ [t['uuid'] for t in added] +
+ ['-' + t['uuid'] for t in removed]
+ )
def format_description(self, task):
# Task version older than 2.4.0 ignores first word of the
def get_config(self):
raw_output = self.execute_command(
- ['show'],
- config_override={'verbose': 'nothing'}
- )
+ ['show'],
+ config_override={'verbose': 'nothing'}
+ )
config = dict()
config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
taskfilter.add_filter_param(key, value)
output = self.execute_command(['export', '--'] +
- taskfilter.get_filter_params())
+ taskfilter.get_filter_params())
# If more than 1 task has been matched still, raise an exception
if not valid(output):
raise TaskWarriorException(
"Unique identifiers {0} with description: {1} matches "
"multiple tasks: {2}".format(
- task['uuid'] or task['id'], task['description'], output)
+ task['uuid'] or task['id'], task['description'], output)
)
return json.loads(output[0])
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
local_zone = tzlocal.get_localzone()
+
class SerializingObject(object):
"""
Common ancestor for TaskResource & TaskWarriorFilter, since they both
localized datetime -> localized datetime (no conversion)
"""
- if (isinstance(value, datetime.date)
- and not isinstance(value, datetime.datetime)):
+ if (
+ isinstance(value, datetime.date)
+ and not isinstance(value, datetime.datetime)
+ ):
# Convert to local midnight
value_full = datetime.datetime.combine(value, datetime.time.min)
localized = local_zone.localize(value_full)
"not: {}".format(value))
return value
-
-
from __future__ import print_function
import copy
-import datetime
import importlib
import json
import logging
if update_original:
self._original_data = copy.deepcopy(self._data)
-
def __getitem__(self, key):
# This is a workaround to make TaskResource non-iterable
# over simple index-based iteration
# to pass that to TaskWarrior.
data_tuples = filter(lambda t: t[1] is not '', data_tuples)
data = dict(data_tuples)
- return json.dumps(data, separators=(',',':'))
+ return json.dumps(data, separators=(',', ':'))
@property
def _modified_fields(self):
# If the tasks are not saved, compare the actual instances
return id(self) == id(other)
-
def __hash__(self):
if self['uuid']:
# For saved Tasks, just define equality by equality of uuids
else:
self._load_data(new_data)
+
class TaskQuerySet(object):
"""
Represents a lazy lookup for a task objects.