import abc
+import copy
import datetime
import json
import logging
import re
import six
import subprocess
+import copy
from .task import Task, TaskQuerySet
from .filters import TaskWarriorFilter
logger = logging.getLogger(__name__)
+
class Backend(object):
@abc.abstractproperty
pass
-class TaskWarrior(object):
+class TaskWarrior(Backend):
VERSION_2_1_0 = six.u('2.1.0')
VERSION_2_2_0 = six.u('2.2.0')
if not os.path.exists(self.taskrc_location):
self.taskrc_location = '/'
+ self._config = None
self.version = self._get_version()
self.config = {
'confirmation': 'no',
def _get_version(self):
p = subprocess.Popen(
- ['task', '--version'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ ['task', '--version'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
if serialized_value is '':
escaped_serialized_value = ''
else:
- escaped_serialized_value = six.u("'{0}'").format(serialized_value)
+ escaped_serialized_value = six.u("'{0}'").format(
+ serialized_value)
- format_default = lambda task: six.u("{0}:{1}").format(field,
- escaped_serialized_value)
+ format_default = lambda task: six.u("{0}:{1}").format(
+ field, escaped_serialized_value)
format_func = getattr(self, 'format_{0}'.format(field),
format_default)
# Removed dependencies need to be prefixed with '-'
return 'depends:' + ','.join(
- [t['uuid'] for t in added] +
- ['-' + t['uuid'] for t in removed]
- )
+ [t['uuid'] for t in added] +
+ ['-' + t['uuid'] for t in removed]
+ )
def format_description(self, task):
# Task version older than 2.4.0 ignores first word of the
# Public interface
- def get_config(self):
+ @property
+ def config(self):
+ # First, check if memoized information is available
+ if self._config:
+ return copy.deepcopy(self._config)
+
+ # If not, fetch the config using the 'show' command
raw_output = self.execute_command(
- ['show'],
- config_override={'verbose': 'nothing'}
- )
+ ['show'],
+ config_override={'verbose': 'nothing'}
+ )
config = dict()
config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
if match:
config[match.group('key')] = match.group('value').strip()
- return config
+ # Memoize the config dict
+ self._config = config
+
+ return copy.deepcopy(config)
def execute_command(self, args, config_override=None, allow_failure=True,
return_all=False):
taskfilter.add_filter_param(key, value)
output = self.execute_command(['export', '--'] +
- taskfilter.get_filter_params())
+ taskfilter.get_filter_params())
# If more than 1 task has been matched still, raise an exception
if not valid(output):
raise TaskWarriorException(
"Unique identifiers {0} with description: {1} matches "
"multiple tasks: {2}".format(
- task['uuid'] or task['id'], task['description'], output)
+ task['uuid'] or task['id'], task['description'], output)
)
return json.loads(output[0])