All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
17 class TaskWarriorException(Exception):
23 class DoesNotExist(Exception):
26 def __init__(self, warrior, data={}):
27 self.warrior = warrior
30 def __getitem__(self, key):
31 return self._get_field(key)
33 def __setitem__(self, key, val):
36 def __unicode__(self):
37 return self._data.get('description')
39 def _get_field(self, key):
40 hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41 return hydrate_func(self._data.get(key))
43 def _set_field(self, key, value):
44 dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45 self._data[key] = dehydrate_func(value)
47 def serialize_due(self, date):
48 return date.strftime(DATE_FORMAT)
50 def deserialize_due(self, date_str):
53 return datetime.datetime.strptime(date_str, DATE_FORMAT)
55 def serialize_annotations(self, annotations):
56 ann_list = list(annotations)
58 ann['entry'] = ann['entry'].strftime(DATE_FORMAT)
61 def deserialize_annotations(self, annotations):
62 ann_list = list(annotations)
64 ann['entry'] = datetime.datetime.strptime(
65 ann['entry'], DATE_FORMAT)
68 def regenerate_uuid(self):
69 self['uuid'] = str(uuid.uuid4())
72 self.warrior.delete_task(self['uuid'])
75 self.warrior.complete_task(self['uuid'])
77 def save(self, delete_first=True):
78 if self['uuid'] and delete_first:
80 if not self['uuid'] or delete_first:
81 self.regenerate_uuid()
82 self.warrior.import_tasks([self._data])
84 __repr__ = __unicode__
87 class TaskFilter(object):
89 A set of parameters to filter the task list with.
92 def __init__(self, filter_params=[]):
93 self.filter_params = filter_params
95 def add_filter(self, filter_str):
96 self.filter_params.append(filter_str)
98 def add_filter_param(self, key, value):
99 key = key.replace('__', '.')
100 self.filter_params.append('{0}:{1}'.format(key, value))
102 def get_filter_params(self):
103 return [f for f in self.filter_params if f]
107 c.filter_params = list(self.filter_params)
111 class TaskQuerySet(object):
113 Represents a lazy lookup for a task objects.
116 def __init__(self, warrior=None, filter_obj=None):
117 self.warrior = warrior
118 self._result_cache = None
119 self.filter_obj = filter_obj or TaskFilter()
121 def __deepcopy__(self, memo):
123 Deep copy of a QuerySet doesn't populate the cache
125 obj = self.__class__()
126 for k,v in self.__dict__.items():
127 if k in ('_iter','_result_cache'):
128 obj.__dict__[k] = None
130 obj.__dict__[k] = copy.deepcopy(v, memo)
134 data = list(self[:REPR_OUTPUT_SIZE + 1])
135 if len(data) > REPR_OUTPUT_SIZE:
136 data[-1] = "...(remaining elements truncated)..."
140 if self._result_cache is None:
141 self._result_cache = list(self)
142 return len(self._result_cache)
145 if self._result_cache is None:
146 self._result_cache = self._execute()
147 return iter(self._result_cache)
149 def __getitem__(self, k):
150 if self._result_cache is None:
151 self._result_cache = list(self)
152 return self._result_cache.__getitem__(k)
155 if self._result_cache is not None:
156 return bool(self._result_cache)
159 except StopIteration:
163 def __nonzero__(self):
164 return type(self).__bool__(self)
166 def _clone(self, klass=None, **kwargs):
168 klass = self.__class__
169 filter_obj = self.filter_obj.clone()
170 c = klass(warrior=self.warrior, filter_obj=filter_obj)
171 c.__dict__.update(kwargs)
176 Fetch the tasks which match the current filters.
178 return self.warrior.filter_tasks(self.filter_obj)
182 Returns a new TaskQuerySet that is a copy of the current one.
187 return self.filter(status=PENDING)
189 def filter(self, *args, **kwargs):
191 Returns a new TaskQuerySet with the given filters added.
193 clone = self._clone()
195 clone.filter_obj.add_filter(f)
196 for key, value in kwargs.items():
197 clone.filter_obj.add_filter_param(key, value)
200 def get(self, **kwargs):
202 Performs the query and returns a single object matching the given
205 clone = self.filter(**kwargs)
208 return clone._result_cache[0]
210 raise Task.DoesNotExist(
211 'Task matching query does not exist. '
212 'Lookup parameters were {0}'.format(kwargs))
214 'get() returned more than one Task -- it returned {0}! '
215 'Lookup parameters were {1}'.format(num, kwargs))
218 class TaskWarrior(object):
219 def __init__(self, data_location='~/.task', create=True):
220 if not os.path.exists(data_location):
221 os.makedirs(data_location)
223 'data.location': os.path.expanduser(data_location),
225 self.tasks = TaskQuerySet(self)
227 def _get_command_args(self, args, config_override={}):
228 command_args = ['task', 'rc:/']
229 config = self.config.copy()
230 config.update(config_override)
231 for item in config.items():
232 command_args.append('rc.{0}={1}'.format(*item))
233 command_args.extend(args)
236 def execute_command(self, args, config_override={}):
237 command_args = self._get_command_args(
238 args, config_override=config_override)
239 p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
240 stderr=subprocess.PIPE)
241 stdout, stderr = p.communicate()
243 error_msg = stderr.strip().splitlines()[-1]
244 raise TaskWarriorException(error_msg)
245 return stdout.strip().split('\n')
247 def filter_tasks(self, filter_obj):
248 args = ['export', '--'] + filter_obj.get_filter_params()
250 for line in self.execute_command(args):
252 tasks.append(Task(self, json.loads(line.strip(','))))
255 def add_task(self, description, project=None):
256 args = ['add', description]
257 if project is not None:
258 args.append('project:{0}'.format(project))
259 self.execute_command(args)
261 def delete_task(self, task_id):
262 args = [task_id, 'rc.confirmation:no', 'delete']
263 self.execute_command(args)
265 def complete_task(self, task_id):
266 args = [task_id, 'done']
267 self.execute_command(args)
269 def import_tasks(self, tasks):
270 fd, path = tempfile.mkstemp()
271 with open(path, 'w') as f:
272 f.write(json.dumps(tasks))
273 args = ['import', path]
274 self.execute_command(args)
276 def merge_with(self, path, push=False):
277 path = path.rstrip('/') + '/'
278 args = ['merge', path]
279 self.execute_command(args, config_override={
280 'merge.autopush': 'yes' if push else 'no',