X-Git-Url: https://git.madduck.net/etc/taskwarrior.git/blobdiff_plain/b28ec3e9b03f1595eaf4bb82ef1f01c3c4d1c2b6..ade25ab6ef26a73da25b3be09658ea39875dd6f3:/tasklib/task.py diff --git a/tasklib/task.py b/tasklib/task.py index 80f08e9..5a223e2 100644 --- a/tasklib/task.py +++ b/tasklib/task.py @@ -100,7 +100,7 @@ class TaskFilter(object): self.filter_params.append('{0}:{1}'.format(key, value)) def get_filter_params(self): - return ['({})'.format(f) for f in self.filter_params if f] + return [f for f in self.filter_params if f] def clone(self): c = self.__class__() @@ -175,7 +175,7 @@ class TaskQuerySet(object): """ Fetch the tasks which match the current filters. """ - return self.warrior._execute_filter(self.filter_obj) + return self.warrior.filter_tasks(self.filter_obj) def all(self): """ @@ -216,10 +216,6 @@ class TaskQuerySet(object): class TaskWarrior(object): - DEFAULT_FILTERS = { - 'status': 'pending', - } - def __init__(self, data_location='~/.task', create=True): if not os.path.exists(data_location): os.makedirs(data_location) @@ -228,25 +224,30 @@ class TaskWarrior(object): } self.tasks = TaskQuerySet(self) - def _get_command_args(self, args): + def _get_command_args(self, args, config_override={}): command_args = ['task', 'rc:/'] - for item in self.config.items(): + config = self.config.copy() + config.update(config_override) + for item in config.items(): command_args.append('rc.{0}={1}'.format(*item)) command_args.extend(args) return command_args - def _execute_command(self, args): - p = subprocess.Popen(self._get_command_args(args), - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + def execute_command(self, args, config_override={}): + command_args = self._get_command_args( + args, config_override=config_override) + p = subprocess.Popen(command_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode: - raise TaskWarriorException(stderr.strip()) + error_msg = stderr.strip().splitlines()[-1] + raise TaskWarriorException(error_msg) return stdout.strip().split('\n') - def _execute_filter(self, filter_obj): - args = filter_obj.get_filter_params() + ['export'] + def filter_tasks(self, filter_obj): + args = ['export', '--'] + filter_obj.get_filter_params() tasks = [] - for line in self._execute_command(args): + for line in self.execute_command(args): if line: tasks.append(Task(self, json.loads(line.strip(',')))) return tasks @@ -255,19 +256,26 @@ class TaskWarrior(object): args = ['add', description] if project is not None: args.append('project:{0}'.format(project)) - self._execute_command(args) + self.execute_command(args) def delete_task(self, task_id): args = [task_id, 'rc.confirmation:no', 'delete'] - self._execute_command(args) + self.execute_command(args) def complete_task(self, task_id): args = [task_id, 'done'] - self._execute_command(args) + self.execute_command(args) def import_tasks(self, tasks): fd, path = tempfile.mkstemp() with open(path, 'w') as f: f.write(json.dumps(tasks)) args = ['import', path] - self._execute_command(args) + self.execute_command(args) + + def merge_with(self, path, push=False): + path = path.rstrip('/') + '/' + args = ['merge', path] + self.execute_command(args, config_override={ + 'merge.autopush': 'yes' if push else 'no', + })