-
-
-class TaskWarrior(object):
- DEFAULT_FILTERS = {
- 'status': 'pending',
- }
-
- def __init__(self, data_location='~/.task', create=True):
- if not os.path.exists(data_location):
- os.makedirs(data_location)
- self.config = {
- 'data.location': os.path.expanduser(data_location),
- }
- self.tasks = TaskQuerySet(self)
-
- def _get_command_args(self, args):
- command_args = ['task', 'rc:/']
- for item in self.config.items():
- command_args.append('rc.{0}={1}'.format(*item))
- command_args.extend(args)
- return command_args
-
- def _execute_command(self, args):
- p = subprocess.Popen(self._get_command_args(args),
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- if p.returncode:
- raise TaskWarriorException(stderr.strip())
- return stdout.strip().split('\n')
-
- def _execute_filter(self, filter_obj):
- args = filter_obj.get_filter_params() + ['export']
- tasks = []
- for line in self._execute_command(args):
- if line:
- tasks.append(Task(self, json.loads(line.strip(','))))
- return tasks
-
- def add_task(self, description, project=None):
- args = ['add', description]
- if project is not None:
- args.append('project:{0}'.format(project))
- self._execute_command(args)
-
- def delete_task(self, task_id):
- args = [task_id, 'rc.confirmation:no', 'delete']
- self._execute_command(args)
-
- def complete_task(self, task_id):
- args = [task_id, 'done']
- self._execute_command(args)
-
- def import_tasks(self, tasks):
- fd, path = tempfile.mkstemp()
- with open(path, 'w') as f:
- f.write(json.dumps(tasks))
- args = ['import', path]
- self._execute_command(args)