]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Implement tasks.completed() and add more tests
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import datetime
3 import json
4 import logging
5 import os
6 import subprocess
7
8 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
9 REPR_OUTPUT_SIZE = 10
10 PENDING = 'pending'
11 COMPLETED = 'completed'
12
13 logger = logging.getLogger(__name__)
14
15
16 class TaskWarriorException(Exception):
17     pass
18
19
20 class Task(object):
21
22     class DoesNotExist(Exception):
23         pass
24
25     def __init__(self, warrior, data={}):
26         self.warrior = warrior
27         self._data = data
28         print data
29         self._modified_fields = set()
30
31     def __unicode__(self):
32         return self['description']
33
34     def __getitem__(self, key):
35         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
36                                lambda x: x)
37         return hydrate_func(self._data.get(key))
38
39     def __setitem__(self, key, value):
40         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
41                                  lambda x: x)
42         self._data[key] = dehydrate_func(value)
43         self._modified_fields.add(key)
44
45     def serialize_due(self, date):
46         return date.strftime(DATE_FORMAT)
47
48     def deserialize_due(self, date_str):
49         if not date_str:
50             return None
51         return datetime.datetime.strptime(date_str, DATE_FORMAT)
52
53     def serialize_annotations(self, annotations):
54         ann_list = list(annotations)
55         for ann in ann_list:
56             ann['entry'] = ann['entry'].strftime(DATE_FORMAT)
57         return ann_list
58
59     def deserialize_annotations(self, annotations):
60         ann_list = list(annotations)
61         for ann in ann_list:
62             ann['entry'] = datetime.datetime.strptime(
63                 ann['entry'], DATE_FORMAT)
64         return ann_list
65
66     def deserialize_tags(self, tags):
67         if isinstance(tags, basestring):
68             return tags.split(',') if tags else []
69         return tags
70
71     def serialize_tags(self, tags):
72         return ','.join(tags) if tags else ''
73
74     def delete(self):
75         self.warrior.execute_command([self['id'], 'delete'], config_override={
76             'confirmation': 'no',
77         })
78
79     def done(self):
80         self.warrior.execute_command([self['id'], 'done'])
81
82     def save(self):
83         args = [self['id'], 'modify'] if self['id'] else ['add']
84         args.extend(self._get_modified_fields_as_args())
85         self.warrior.execute_command(args)
86         self._modified_fields.clear()
87
88     def _get_modified_fields_as_args(self):
89         args = []
90         for field in self._modified_fields:
91             args.append('{}:{}'.format(field, self._data[field]))
92         return args
93
94     __repr__ = __unicode__
95
96
97 class TaskFilter(object):
98     """
99     A set of parameters to filter the task list with.
100     """
101
102     def __init__(self, filter_params=[]):
103         self.filter_params = filter_params
104
105     def add_filter(self, filter_str):
106         self.filter_params.append(filter_str)
107
108     def add_filter_param(self, key, value):
109         key = key.replace('__', '.')
110         self.filter_params.append('{0}:{1}'.format(key, value))
111
112     def get_filter_params(self):
113         return [f for f in self.filter_params if f]
114
115     def clone(self):
116         c = self.__class__()
117         c.filter_params = list(self.filter_params)
118         return c
119
120
121 class TaskQuerySet(object):
122     """
123     Represents a lazy lookup for a task objects.
124     """
125
126     def __init__(self, warrior=None, filter_obj=None):
127         self.warrior = warrior
128         self._result_cache = None
129         self.filter_obj = filter_obj or TaskFilter()
130
131     def __deepcopy__(self, memo):
132         """
133         Deep copy of a QuerySet doesn't populate the cache
134         """
135         obj = self.__class__()
136         for k, v in self.__dict__.items():
137             if k in ('_iter', '_result_cache'):
138                 obj.__dict__[k] = None
139             else:
140                 obj.__dict__[k] = copy.deepcopy(v, memo)
141         return obj
142
143     def __repr__(self):
144         data = list(self[:REPR_OUTPUT_SIZE + 1])
145         if len(data) > REPR_OUTPUT_SIZE:
146             data[-1] = "...(remaining elements truncated)..."
147         return repr(data)
148
149     def __len__(self):
150         if self._result_cache is None:
151             self._result_cache = list(self)
152         return len(self._result_cache)
153
154     def __iter__(self):
155         if self._result_cache is None:
156             self._result_cache = self._execute()
157         return iter(self._result_cache)
158
159     def __getitem__(self, k):
160         if self._result_cache is None:
161             self._result_cache = list(self)
162         return self._result_cache.__getitem__(k)
163
164     def __bool__(self):
165         if self._result_cache is not None:
166             return bool(self._result_cache)
167         try:
168             next(iter(self))
169         except StopIteration:
170             return False
171         return True
172
173     def __nonzero__(self):
174         return type(self).__bool__(self)
175
176     def _clone(self, klass=None, **kwargs):
177         if klass is None:
178             klass = self.__class__
179         filter_obj = self.filter_obj.clone()
180         c = klass(warrior=self.warrior, filter_obj=filter_obj)
181         c.__dict__.update(kwargs)
182         return c
183
184     def _execute(self):
185         """
186         Fetch the tasks which match the current filters.
187         """
188         return self.warrior.filter_tasks(self.filter_obj)
189
190     def all(self):
191         """
192         Returns a new TaskQuerySet that is a copy of the current one.
193         """
194         return self._clone()
195
196     def pending(self):
197         return self.filter(status=PENDING)
198
199     def completed(self):
200         return self.filter(status=COMPLETED)
201
202     def filter(self, *args, **kwargs):
203         """
204         Returns a new TaskQuerySet with the given filters added.
205         """
206         clone = self._clone()
207         for f in args:
208             clone.filter_obj.add_filter(f)
209         for key, value in kwargs.items():
210             clone.filter_obj.add_filter_param(key, value)
211         return clone
212
213     def get(self, **kwargs):
214         """
215         Performs the query and returns a single object matching the given
216         keyword arguments.
217         """
218         clone = self.filter(**kwargs)
219         num = len(clone)
220         if num == 1:
221             return clone._result_cache[0]
222         if not num:
223             raise Task.DoesNotExist(
224                 'Task matching query does not exist. '
225                 'Lookup parameters were {0}'.format(kwargs))
226         raise ValueError(
227             'get() returned more than one Task -- it returned {0}! '
228             'Lookup parameters were {1}'.format(num, kwargs))
229
230
231 class TaskWarrior(object):
232     def __init__(self, data_location='~/.task', create=True):
233         data_location = os.path.expanduser(data_location)
234         if not os.path.exists(data_location):
235             os.makedirs(data_location)
236         self.config = {
237             'data.location': os.path.expanduser(data_location),
238         }
239         self.tasks = TaskQuerySet(self)
240
241     def _get_command_args(self, args, config_override={}):
242         command_args = ['task', 'rc:/']
243         config = self.config.copy()
244         config.update(config_override)
245         for item in config.items():
246             command_args.append('rc.{0}={1}'.format(*item))
247         command_args.extend(map(str, args))
248         return command_args
249
250     def execute_command(self, args, config_override={}):
251         command_args = self._get_command_args(
252             args, config_override=config_override)
253         logger.debug(' '.join(command_args))
254         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
255                              stderr=subprocess.PIPE)
256         stdout, stderr = p.communicate()
257         if p.returncode:
258             error_msg = stderr.strip().splitlines()[-1]
259             raise TaskWarriorException(error_msg)
260         return stdout.strip().split('\n')
261
262     def filter_tasks(self, filter_obj):
263         args = ['export', '--'] + filter_obj.get_filter_params()
264         tasks = []
265         for line in self.execute_command(args):
266             if line:
267                 tasks.append(Task(self, json.loads(line.strip(','))))
268         return tasks
269
270     def merge_with(self, path, push=False):
271         path = path.rstrip('/') + '/'
272         self.execute_command(['merge', path], config_override={
273             'merge.autopush': 'yes' if push else 'no',
274         })
275
276     def undo(self):
277         self.execute_command(['undo'], config_override={
278             'confirmation': 'no',
279         })