-
-
-class TaskWarrior(object):
- def __init__(self, data_location='~/.task', create=True):
- if not os.path.exists(data_location):
- os.makedirs(data_location)
- self.config = {
- 'data.location': os.path.expanduser(data_location),
- }
- self.tasks = TaskQuerySet(self)
-
- def _get_command_args(self, args, config_override={}):
- command_args = ['task', 'rc:/']
- config = self.config.copy()
- config.update(config_override)
- for item in config.items():
- command_args.append('rc.{0}={1}'.format(*item))
- command_args.extend(args)
- return command_args
-
- def execute_command(self, args, config_override={}):
- p = subprocess.Popen(self._get_command_args(args),
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- if p.returncode:
- error_msg = stderr.strip().splitlines()[-1]
- raise TaskWarriorException(error_msg)
- return stdout.strip().split('\n')
-
- def filter_tasks(self, filter_obj):
- args = ['export', '--'] + filter_obj.get_filter_params()
- tasks = []
- for line in self.execute_command(args):
- if line:
- tasks.append(Task(self, json.loads(line.strip(','))))
- return tasks
-
- def add_task(self, description, project=None):
- args = ['add', description]
- if project is not None:
- args.append('project:{0}'.format(project))
- self.execute_command(args)
-
- def delete_task(self, task_id):
- args = [task_id, 'rc.confirmation:no', 'delete']
- self.execute_command(args)
-
- def complete_task(self, task_id):
- args = [task_id, 'done']
- self.execute_command(args)
-
- def import_tasks(self, tasks):
- fd, path = tempfile.mkstemp()
- with open(path, 'w') as f:
- f.write(json.dumps(tasks))
- args = ['import', path]
- self.execute_command(args)
-
- def merge_with(self, path, push=False):
- path = path.rstrip('/') + '/'
- args = ['merge', path]
- self.execute_command(args, config_override={
- 'merge.autopush': 'yes' if push else 'no',
- })