]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Initial commit
[etc/taskwarrior.git] / tasklib / task.py
1 import json
2 import os
3 import subprocess
4 import tempfile
5 import uuid
6
7
8 PENDING = 'pending'
9
10
11 class TaskWarriorException(Exception):
12     pass
13
14
15 class Task(object):
16
17     def __init__(self, warrior, data={}):
18         self.warrior = warrior
19         self._data = data
20
21     def __getitem__(self, key):
22         return self._data.get(key)
23
24     def __setitem__(self, key, val):
25         self._data[key] = val
26
27     def __unicode__(self):
28         return self._data.get('description')
29
30     def regenerate_uuid(self):
31         self['uuid'] = str(uuid.uuid4())
32
33     def delete(self):
34         self.warrior.delete_task(self['uuid'])
35
36     def done(self):
37         self.warrior.complete_task(self['uuid'])
38
39     def save(self, delete_first=True):
40         if self['uuid'] and delete_first:
41             self.delete()
42         if not self['uuid'] or delete_first:
43             self.regenerate_uuid()
44         self.warrior.import_tasks([self._data])
45
46     __repr__ = __unicode__
47
48
49 class TaskWarrior(object):
50
51     def __init__(self, data_location='~/.task', create=True):
52         if not os.path.exists(data_location):
53             os.makedirs(data_location)
54         self.config = {
55             'data.location': os.path.expanduser(data_location),
56         }
57
58     def _generate_command(self, command):
59         args = ['task', 'rc:/']
60         for item in self.config.items():
61             args.append('rc.{0}={1}'.format(*item))
62         args.append(command)
63         return ' '.join(args)
64
65     def _execute(self, command):
66         p = subprocess.Popen(self._generate_command(command), shell=True,
67                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
68         stdout, stderr = p.communicate()
69         if p.returncode:
70             raise TaskWarriorException(stderr.strip())
71         return stdout.strip().split('\n')
72
73     def get_tasks(self, project=None, status=PENDING):
74         command = 'export status:{0}'.format(status)
75         if project is not None:
76             command += ' project:{0}'.format(project)
77         tasks = []
78         for line in self._execute(command):
79             if line:
80                 tasks.append(Task(self, json.loads(line.strip(','))))
81         return tasks
82
83     def get_task(self, task_id):
84         command = '{0} export'.format(task_id)
85         return Task(self, json.loads(self._execute(command)[0]))
86
87     def add_task(self, description, project=None):
88         args = ['add', description]
89         if project is not None:
90             args.append('project:{0}'.format(project))
91         self._execute(' '.join(args))
92
93     def delete_task(self, task_id):
94         self._execute('{0} rc.confirmation:no delete'.format(task_id))
95
96     def complete_task(self, task_id):
97         self._execute('{0} done'.format(task_id))
98
99     def import_tasks(self, tasks):
100         fd, path = tempfile.mkstemp()
101         with open(path, 'w') as f:
102             f.write(json.dumps(tasks))
103         self._execute('import {0}'.format(path))