X-Git-Url: https://git.madduck.net/etc/taskwarrior.git/blobdiff_plain/5f02f81952c8267636cc57e977dc5d257320129f..60313555419249ecfc10511346c0b87a6bd20614:/tasklib/task.py?ds=sidebyside diff --git a/tasklib/task.py b/tasklib/task.py index 4681418..411ce7a 100644 --- a/tasklib/task.py +++ b/tasklib/task.py @@ -216,10 +216,6 @@ class TaskQuerySet(object): class TaskWarrior(object): - DEFAULT_FILTERS = { - 'status': 'pending', - } - def __init__(self, data_location='~/.task', create=True): if not os.path.exists(data_location): os.makedirs(data_location) @@ -228,25 +224,28 @@ class TaskWarrior(object): } self.tasks = TaskQuerySet(self) - def _get_command_args(self, args): + def _get_command_args(self, args, config_override={}): command_args = ['task', 'rc:/'] - for item in self.config.items(): + config = self.config.copy() + config.update(config_override) + for item in config.items(): command_args.append('rc.{0}={1}'.format(*item)) command_args.extend(args) return command_args - def _execute_command(self, args): + def execute_command(self, args, config_override={}): p = subprocess.Popen(self._get_command_args(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode: - raise TaskWarriorException(stderr.strip()) + error_msg = stderr.strip().splitlines()[-1] + raise TaskWarriorException(error_msg) return stdout.strip().split('\n') def filter_tasks(self, filter_obj): args = ['export', '--'] + filter_obj.get_filter_params() tasks = [] - for line in self._execute_command(args): + for line in self.execute_command(args): if line: tasks.append(Task(self, json.loads(line.strip(',')))) return tasks @@ -255,19 +254,26 @@ class TaskWarrior(object): args = ['add', description] if project is not None: args.append('project:{0}'.format(project)) - self._execute_command(args) + self.execute_command(args) def delete_task(self, task_id): args = [task_id, 'rc.confirmation:no', 'delete'] - self._execute_command(args) + self.execute_command(args) def complete_task(self, task_id): args = [task_id, 'done'] - self._execute_command(args) + self.execute_command(args) def import_tasks(self, tasks): fd, path = tempfile.mkstemp() with open(path, 'w') as f: f.write(json.dumps(tasks)) args = ['import', path] - self._execute_command(args) + self.execute_command(args) + + def merge_with(self, path, push=False): + path = path.rstrip('/') + '/' + args = ['merge', path] + self.execute_command(args, config_override={ + 'merge.autopush': 'yes' if push else 'no', + })