]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Improve get_tasks to allow arbitrary filtering
[etc/taskwarrior.git] / tasklib / task.py
1 import json
2 import os
3 import subprocess
4 import tempfile
5 import uuid
6
7
8 PENDING = 'pending'
9
10
11 class TaskWarriorException(Exception):
12     pass
13
14
15 class Task(object):
16
17     def __init__(self, warrior, data={}):
18         self.warrior = warrior
19         self._data = data
20
21     def __getitem__(self, key):
22         return self._data.get(key)
23
24     def __setitem__(self, key, val):
25         self._data[key] = val
26
27     def __unicode__(self):
28         return self._data.get('description')
29
30     def regenerate_uuid(self):
31         self['uuid'] = str(uuid.uuid4())
32
33     def delete(self):
34         self.warrior.delete_task(self['uuid'])
35
36     def done(self):
37         self.warrior.complete_task(self['uuid'])
38
39     def save(self, delete_first=True):
40         if self['uuid'] and delete_first:
41             self.delete()
42         if not self['uuid'] or delete_first:
43             self.regenerate_uuid()
44         self.warrior.import_tasks([self._data])
45
46     __repr__ = __unicode__
47
48
49 class TaskWarrior(object):
50     DEFAULT_FILTERS = {
51         'status': 'pending',
52     }
53
54     def __init__(self, data_location='~/.task', create=True):
55         if not os.path.exists(data_location):
56             os.makedirs(data_location)
57         self.config = {
58             'data.location': os.path.expanduser(data_location),
59         }
60
61     def _generate_command(self, command):
62         args = ['task', 'rc:/']
63         for item in self.config.items():
64             args.append('rc.{0}={1}'.format(*item))
65         args.append(command)
66         return ' '.join(args)
67
68     def _execute(self, command):
69         p = subprocess.Popen(self._generate_command(command), shell=True,
70                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
71         stdout, stderr = p.communicate()
72         if p.returncode:
73             raise TaskWarriorException(stderr.strip())
74         return stdout.strip().split('\n')
75
76     def _format_filter_kwarg(self, kwarg):
77         key, val = kwarg[0], kwarg[1]
78         if key in ['tag', 'tags']:
79             key = 'tags.equal'
80         key = key.replace('__', '.')
81         return '{0}:{1}'.format(key, val)
82
83     def get_tasks(self, **filter_kwargs):
84         filters = self.DEFAULT_FILTERS
85         filters.update(filter_kwargs)
86         filter_commands = ' '.join(map(self._format_filter_kwarg,
87                                        filters.items()))
88         command = '{0} export'.format(filter_commands)
89         tasks = []
90         for line in self._execute(command):
91             if line:
92                 tasks.append(Task(self, json.loads(line.strip(','))))
93         return tasks
94
95     def get_task(self, task_id):
96         command = '{0} export'.format(task_id)
97         return Task(self, json.loads(self._execute(command)[0]))
98
99     def add_task(self, description, project=None):
100         args = ['add', description]
101         if project is not None:
102             args.append('project:{0}'.format(project))
103         self._execute(' '.join(args))
104
105     def delete_task(self, task_id):
106         self._execute('{0} rc.confirmation:no delete'.format(task_id))
107
108     def complete_task(self, task_id):
109         self._execute('{0} done'.format(task_id))
110
111     def import_tasks(self, tasks):
112         fd, path = tempfile.mkstemp()
113         with open(path, 'w') as f:
114             f.write(json.dumps(tasks))
115         self._execute('import {0}'.format(path))