All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
1 from __future__ import print_function
14 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
15 DATE_FORMAT_CALC = '%Y-%m-%dT%H:%M:%S'
18 COMPLETED = 'completed'
20 VERSION_2_1_0 = six.u('2.1.0')
21 VERSION_2_2_0 = six.u('2.2.0')
22 VERSION_2_3_0 = six.u('2.3.0')
23 VERSION_2_4_0 = six.u('2.4.0')
24 VERSION_2_4_1 = six.u('2.4.1')
25 VERSION_2_4_2 = six.u('2.4.2')
26 VERSION_2_4_3 = six.u('2.4.3')
27 VERSION_2_4_4 = six.u('2.4.4')
28 VERSION_2_4_5 = six.u('2.4.5')
30 logger = logging.getLogger(__name__)
31 local_zone = tzlocal.get_localzone()
34 class TaskWarriorException(Exception):
38 class ReadOnlyDictView(object):
40 Provides simplified read-only view upon dict object.
43 def __init__(self, viewed_dict):
44 self.viewed_dict = viewed_dict
46 def __getitem__(self, key):
47 return copy.deepcopy(self.viewed_dict.__getitem__(key))
49 def __contains__(self, k):
50 return self.viewed_dict.__contains__(k)
53 for value in self.viewed_dict:
54 yield copy.deepcopy(value)
57 return len(self.viewed_dict)
59 def get(self, key, default=None):
60 return copy.deepcopy(self.viewed_dict.get(key, default))
63 return [copy.deepcopy(v) for v in self.viewed_dict.items()]
66 return [copy.deepcopy(v) for v in self.viewed_dict.values()]
69 class SerializingObject(object):
71 Common ancestor for TaskResource & TaskFilter, since they both
72 need to serialize arguments.
74 Serializing method should hold the following contract:
75 - any empty value (meaning removal of the attribute)
76 is deserialized into a empty string
77 - None denotes a empty value for any attribute
79 Deserializing method should hold the following contract:
80 - None denotes a empty value for any attribute (however,
81 this is here as a safeguard, TaskWarrior currently does
82 not export empty-valued attributes) if the attribute
83 is not iterable (e.g. list or set), in which case
84 a empty iterable should be used.
86 Normalizing methods should hold the following contract:
87 - They are used to validate and normalize the user input.
88 Any attribute value that comes from the user (during Task
89 initialization, assignign values to Task attributes, or
90 filtering by user-provided values of attributes) is first
91 validated and normalized using the normalize_{key} method.
92 - If validation or normalization fails, normalizer is expected
96 def __init__(self, warrior):
97 self.warrior = warrior
99 def _deserialize(self, key, value):
100 hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
101 lambda x: x if x != '' else None)
102 return hydrate_func(value)
104 def _serialize(self, key, value):
105 dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
106 lambda x: x if x is not None else '')
107 return dehydrate_func(value)
109 def _normalize(self, key, value):
111 Use normalize_<key> methods to normalize user input. Any user
112 input will be normalized at the moment it is used as filter,
113 or entered as a value of Task attribute.
116 # None value should not be converted by normalizer
120 normalize_func = getattr(self, 'normalize_{0}'.format(key),
123 return normalize_func(value)
125 def timestamp_serializer(self, date):
129 # Any serialized timestamp should be localized, we need to
130 # convert to UTC before converting to string (DATE_FORMAT uses UTC)
131 date = date.astimezone(pytz.utc)
133 return date.strftime(DATE_FORMAT)
135 def timestamp_deserializer(self, date_str):
139 # Return timestamp localized in the local zone
140 naive_timestamp = datetime.datetime.strptime(date_str, DATE_FORMAT)
141 localized_timestamp = pytz.utc.localize(naive_timestamp)
142 return localized_timestamp.astimezone(local_zone)
144 def serialize_entry(self, value):
145 return self.timestamp_serializer(value)
147 def deserialize_entry(self, value):
148 return self.timestamp_deserializer(value)
150 def normalize_entry(self, value):
151 return self.datetime_normalizer(value)
153 def serialize_modified(self, value):
154 return self.timestamp_serializer(value)
156 def deserialize_modified(self, value):
157 return self.timestamp_deserializer(value)
159 def normalize_modified(self, value):
160 return self.datetime_normalizer(value)
162 def serialize_start(self, value):
163 return self.timestamp_serializer(value)
165 def deserialize_start(self, value):
166 return self.timestamp_deserializer(value)
168 def normalize_start(self, value):
169 return self.datetime_normalizer(value)
171 def serialize_end(self, value):
172 return self.timestamp_serializer(value)
174 def deserialize_end(self, value):
175 return self.timestamp_deserializer(value)
177 def normalize_end(self, value):
178 return self.datetime_normalizer(value)
180 def serialize_due(self, value):
181 return self.timestamp_serializer(value)
183 def deserialize_due(self, value):
184 return self.timestamp_deserializer(value)
186 def normalize_due(self, value):
187 return self.datetime_normalizer(value)
189 def serialize_scheduled(self, value):
190 return self.timestamp_serializer(value)
192 def deserialize_scheduled(self, value):
193 return self.timestamp_deserializer(value)
195 def normalize_scheduled(self, value):
196 return self.datetime_normalizer(value)
198 def serialize_until(self, value):
199 return self.timestamp_serializer(value)
201 def deserialize_until(self, value):
202 return self.timestamp_deserializer(value)
204 def normalize_until(self, value):
205 return self.datetime_normalizer(value)
207 def serialize_wait(self, value):
208 return self.timestamp_serializer(value)
210 def deserialize_wait(self, value):
211 return self.timestamp_deserializer(value)
213 def normalize_wait(self, value):
214 return self.datetime_normalizer(value)
216 def serialize_annotations(self, value):
217 value = value if value is not None else []
219 # This may seem weird, but it's correct, we want to export
220 # a list of dicts as serialized value
221 serialized_annotations = [json.loads(annotation.export_data())
222 for annotation in value]
223 return serialized_annotations if serialized_annotations else ''
225 def deserialize_annotations(self, data):
226 return [TaskAnnotation(self, d) for d in data] if data else []
228 def serialize_tags(self, tags):
229 return ','.join(tags) if tags else ''
231 def deserialize_tags(self, tags):
232 if isinstance(tags, six.string_types):
233 return tags.split(',') if tags else []
236 def serialize_depends(self, value):
237 # Return the list of uuids
238 value = value if value is not None else set()
239 return ','.join(task['uuid'] for task in value)
241 def deserialize_depends(self, raw_uuids):
242 raw_uuids = raw_uuids or [] # Convert None to empty list
244 # TW 2.4.4 encodes list of dependencies as a single string
245 if type(raw_uuids) is not list:
246 uuids = raw_uuids.split(',')
247 # TW 2.4.5 and later exports them as a list, no conversion needed
251 return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
253 def datetime_normalizer(self, value):
255 Normalizes date/datetime value (considered to come from user input)
256 to localized datetime value. Following conversions happen:
258 naive date -> localized datetime with the same date, and time=midnight
259 naive datetime -> localized datetime with the same value
260 localized datetime -> localized datetime (no conversion)
263 if (isinstance(value, datetime.date)
264 and not isinstance(value, datetime.datetime)):
265 # Convert to local midnight
266 value_full = datetime.datetime.combine(value, datetime.time.min)
267 localized = local_zone.localize(value_full)
268 elif isinstance(value, datetime.datetime):
269 if value.tzinfo is None:
270 # Convert to localized datetime object
271 localized = local_zone.localize(value)
273 # If the value is already localized, there is no need to change
274 # time zone at this point. Also None is a valid value too.
276 elif (isinstance(value, six.string_types)
277 and self.warrior.version >= VERSION_2_4_0):
278 # For strings, use 'task calc' to evaluate the string to datetime
279 # available since TW 2.4.0
281 result = self.warrior.execute_command(['calc'] + args)
282 naive = datetime.datetime.strptime(result[0], DATE_FORMAT_CALC)
283 localized = local_zone.localize(naive)
285 raise ValueError("Provided value could not be converted to "
286 "datetime, its type is not supported: {}"
287 .format(type(value)))
291 def normalize_uuid(self, value):
293 if not isinstance(value, six.string_types) or value == '':
294 raise ValueError("UUID must be a valid non-empty string, "
295 "not: {}".format(value))
300 class TaskResource(SerializingObject):
301 read_only_fields = []
303 def _load_data(self, data):
304 self._data = dict((key, self._deserialize(key, value))
305 for key, value in data.items())
306 # We need to use a copy for original data, so that changes
307 # are not propagated.
308 self._original_data = copy.deepcopy(self._data)
310 def _update_data(self, data, update_original=False, remove_missing=False):
312 Low level update of the internal _data dict. Data which are coming as
313 updates should already be serialized. If update_original is True, the
314 original_data dict is updated as well.
316 self._data.update(dict((key, self._deserialize(key, value))
317 for key, value in data.items()))
319 # In certain situations, we want to treat missing keys as removals
321 for key in set(self._data.keys()) - set(data.keys()):
322 self._data[key] = None
325 self._original_data = copy.deepcopy(self._data)
328 def __getitem__(self, key):
329 # This is a workaround to make TaskResource non-iterable
330 # over simple index-based iteration
337 if key not in self._data:
338 self._data[key] = self._deserialize(key, None)
340 return self._data.get(key)
342 def __setitem__(self, key, value):
343 if key in self.read_only_fields:
344 raise RuntimeError('Field \'%s\' is read-only' % key)
346 # Normalize the user input before saving it
347 value = self._normalize(key, value)
348 self._data[key] = value
351 s = six.text_type(self.__unicode__())
353 s = s.encode('utf-8')
359 def export_data(self):
361 Exports current data contained in the Task as JSON
364 # We need to remove spaces for TW-1504, use custom separators
365 data_tuples = ((key, self._serialize(key, value))
366 for key, value in six.iteritems(self._data))
368 # Empty string denotes empty serialized value, we do not want
369 # to pass that to TaskWarrior.
370 data_tuples = filter(lambda t: t[1] is not '', data_tuples)
371 data = dict(data_tuples)
372 return json.dumps(data, separators=(',',':'))
375 def _modified_fields(self):
376 writable_fields = set(self._data.keys()) - set(self.read_only_fields)
377 for key in writable_fields:
378 new_value = self._data.get(key)
379 old_value = self._original_data.get(key)
381 # Make sure not to mark data removal as modified field if the
382 # field originally had some empty value
383 if key in self._data and not new_value and not old_value:
386 if new_value != old_value:
391 return bool(list(self._modified_fields))
394 class TaskAnnotation(TaskResource):
395 read_only_fields = ['entry', 'description']
397 def __init__(self, task, data={}):
399 self._load_data(data)
400 super(TaskAnnotation, self).__init__(task.warrior)
403 self.task.remove_annotation(self)
405 def __unicode__(self):
406 return self['description']
408 def __eq__(self, other):
409 # consider 2 annotations equal if they belong to the same task, and
410 # their data dics are the same
411 return self.task == other.task and self._data == other._data
413 __repr__ = __unicode__
416 class Task(TaskResource):
417 read_only_fields = ['id', 'entry', 'urgency', 'uuid', 'modified']
419 class DoesNotExist(Exception):
422 class CompletedTask(Exception):
424 Raised when the operation cannot be performed on the completed task.
428 class DeletedTask(Exception):
430 Raised when the operation cannot be performed on the deleted task.
434 class ActiveTask(Exception):
436 Raised when the operation cannot be performed on the active task.
440 class InactiveTask(Exception):
442 Raised when the operation cannot be performed on an inactive task.
446 class NotSaved(Exception):
448 Raised when the operation cannot be performed on the task, because
449 it has not been saved to TaskWarrior yet.
454 def from_input(cls, input_file=sys.stdin, modify=None, warrior=None):
456 Creates a Task object, directly from the stdin, by reading one line.
457 If modify=True, two lines are used, first line interpreted as the
458 original state of the Task object, and second line as its new,
459 modified value. This is consistent with the TaskWarrior's hook
462 Object created by this method should not be saved, deleted
463 or refreshed, as t could create a infinite loop. For this
464 reason, TaskWarrior instance is set to None.
466 Input_file argument can be used to specify the input file,
467 but defaults to sys.stdin.
470 # Detect the hook type if not given directly
471 name = os.path.basename(sys.argv[0])
472 modify = name.startswith('on-modify') if modify is None else modify
474 # Create the TaskWarrior instance if none passed
476 hook_parent_dir = os.path.dirname(os.path.dirname(sys.argv[0]))
477 warrior = TaskWarrior(data_location=hook_parent_dir)
479 # TaskWarrior instance is set to None
482 # Load the data from the input
483 task._load_data(json.loads(input_file.readline().strip()))
485 # If this is a on-modify event, we are provided with additional
486 # line of input, which provides updated data
488 task._update_data(json.loads(input_file.readline().strip()),
493 def __init__(self, warrior, **kwargs):
494 super(Task, self).__init__(warrior)
496 # Check that user is not able to set read-only value in __init__
497 for key in kwargs.keys():
498 if key in self.read_only_fields:
499 raise RuntimeError('Field \'%s\' is read-only' % key)
501 # We serialize the data in kwargs so that users of the library
502 # do not have to pass different data formats via __setitem__ and
503 # __init__ methods, that would be confusing
505 # Rather unfortunate syntax due to python2.6 comaptiblity
506 self._data = dict((key, self._normalize(key, value))
507 for (key, value) in six.iteritems(kwargs))
508 self._original_data = copy.deepcopy(self._data)
510 # Provide read only access to the original data
511 self.original = ReadOnlyDictView(self._original_data)
513 def __unicode__(self):
514 return self['description']
516 def __eq__(self, other):
517 if self['uuid'] and other['uuid']:
518 # For saved Tasks, just define equality by equality of uuids
519 return self['uuid'] == other['uuid']
521 # If the tasks are not saved, compare the actual instances
522 return id(self) == id(other)
527 # For saved Tasks, just define equality by equality of uuids
528 return self['uuid'].__hash__()
530 # If the tasks are not saved, return hash of instance id
531 return id(self).__hash__()
535 return self['status'] == six.text_type('completed')
539 return self['status'] == six.text_type('deleted')
543 return self['status'] == six.text_type('waiting')
547 return self['status'] == six.text_type('pending')
551 return self['start'] is not None
555 return self['uuid'] is not None or self['id'] is not None
557 def serialize_depends(self, cur_dependencies):
558 # Check that all the tasks are saved
559 for task in (cur_dependencies or set()):
561 raise Task.NotSaved('Task \'%s\' needs to be saved before '
562 'it can be set as dependency.' % task)
564 return super(Task, self).serialize_depends(cur_dependencies)
566 def format_depends(self):
567 # We need to generate added and removed dependencies list,
568 # since Taskwarrior does not accept redefining dependencies.
570 # This cannot be part of serialize_depends, since we need
571 # to keep a list of all depedencies in the _data dictionary,
572 # not just currently added/removed ones
574 old_dependencies = self._original_data.get('depends', set())
576 added = self['depends'] - old_dependencies
577 removed = old_dependencies - self['depends']
579 # Removed dependencies need to be prefixed with '-'
580 return 'depends:' + ','.join(
581 [t['uuid'] for t in added] +
582 ['-' + t['uuid'] for t in removed]
585 def format_description(self):
586 # Task version older than 2.4.0 ignores first word of the
587 # task description if description: prefix is used
588 if self.warrior.version < VERSION_2_4_0:
589 return self._data['description']
591 return six.u("description:'{0}'").format(self._data['description'] or '')
595 raise Task.NotSaved("Task needs to be saved before it can be deleted")
597 # Refresh the status, and raise exception if the task is deleted
598 self.refresh(only_fields=['status'])
601 raise Task.DeletedTask("Task was already deleted")
603 self.warrior.execute_command([self['uuid'], 'delete'])
605 # Refresh the status again, so that we have updated info stored
606 self.refresh(only_fields=['status', 'start', 'end'])
610 raise Task.NotSaved("Task needs to be saved before it can be started")
612 # Refresh, and raise exception if task is already completed/deleted
613 self.refresh(only_fields=['status'])
616 raise Task.CompletedTask("Cannot start a completed task")
618 raise Task.DeletedTask("Deleted task cannot be started")
620 raise Task.ActiveTask("Task is already active")
622 self.warrior.execute_command([self['uuid'], 'start'])
624 # Refresh the status again, so that we have updated info stored
625 self.refresh(only_fields=['status', 'start'])
629 raise Task.NotSaved("Task needs to be saved before it can be stopped")
631 # Refresh, and raise exception if task is already completed/deleted
632 self.refresh(only_fields=['status'])
635 raise Task.InactiveTask("Cannot stop an inactive task")
637 self.warrior.execute_command([self['uuid'], 'stop'])
639 # Refresh the status again, so that we have updated info stored
640 self.refresh(only_fields=['status', 'start'])
644 raise Task.NotSaved("Task needs to be saved before it can be completed")
646 # Refresh, and raise exception if task is already completed/deleted
647 self.refresh(only_fields=['status'])
650 raise Task.CompletedTask("Cannot complete a completed task")
652 raise Task.DeletedTask("Deleted task cannot be completed")
654 # Older versions of TW do not stop active task at completion
655 if self.warrior.version < VERSION_2_4_0 and self.active:
658 self.warrior.execute_command([self['uuid'], 'done'])
660 # Refresh the status again, so that we have updated info stored
661 self.refresh(only_fields=['status', 'start', 'end'])
664 if self.saved and not self.modified:
667 args = [self['uuid'], 'modify'] if self.saved else ['add']
668 args.extend(self._get_modified_fields_as_args())
669 output = self.warrior.execute_command(args)
671 # Parse out the new ID, if the task is being added for the first time
673 id_lines = [l for l in output if l.startswith('Created task ')]
675 # Complain loudly if it seems that more tasks were created
677 if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
678 raise TaskWarriorException("Unexpected output when creating "
679 "task: %s" % '\n'.join(id_lines))
681 # Circumvent the ID storage, since ID is considered read-only
682 identifier = id_lines[0].split(' ')[2].rstrip('.')
684 # Identifier can be either ID or UUID for completed tasks
686 self._data['id'] = int(identifier)
688 self._data['uuid'] = identifier
690 # Refreshing is very important here, as not only modification time
691 # is updated, but arbitrary attribute may have changed due hooks
692 # altering the data before saving
693 self.refresh(after_save=True)
695 def add_annotation(self, annotation):
697 raise Task.NotSaved("Task needs to be saved to add annotation")
699 args = [self['uuid'], 'annotate', annotation]
700 self.warrior.execute_command(args)
701 self.refresh(only_fields=['annotations'])
703 def remove_annotation(self, annotation):
705 raise Task.NotSaved("Task needs to be saved to remove annotation")
707 if isinstance(annotation, TaskAnnotation):
708 annotation = annotation['description']
709 args = [self['uuid'], 'denotate', annotation]
710 self.warrior.execute_command(args)
711 self.refresh(only_fields=['annotations'])
713 def _get_modified_fields_as_args(self):
716 def add_field(field):
717 # Add the output of format_field method to args list (defaults to
719 serialized_value = self._serialize(field, self._data[field])
721 # Empty values should not be enclosed in quotation marks, see
723 if serialized_value is '':
724 escaped_serialized_value = ''
726 escaped_serialized_value = six.u("'{0}'").format(serialized_value)
728 format_default = lambda: six.u("{0}:{1}").format(field,
729 escaped_serialized_value)
731 format_func = getattr(self, 'format_{0}'.format(field),
734 args.append(format_func())
736 # If we're modifying saved task, simply pass on all modified fields
738 for field in self._modified_fields:
740 # For new tasks, pass all fields that make sense
742 for field in self._data.keys():
743 if field in self.read_only_fields:
749 def refresh(self, only_fields=None, after_save=False):
750 # Raise error when trying to refresh a task that has not been saved
752 raise Task.NotSaved("Task needs to be saved to be refreshed")
754 # We need to use ID as backup for uuid here for the refreshes
755 # of newly saved tasks. Any other place in the code is fine
756 # with using UUID only.
757 args = [self['uuid'] or self['id'], 'export']
758 output = self.warrior.execute_command(args)
761 return len(output) == 1 and output[0].startswith('{')
763 # For older TW versions attempt to uniquely locate the task
764 # using the data we have if it has been just saved.
765 # This can happen when adding a completed task on older TW versions.
766 if (not valid(output) and self.warrior.version < VERSION_2_4_4
769 # Make a copy, removing ID and UUID. It's most likely invalid
770 # (ID 0) if it failed to match a unique task.
771 data = copy.deepcopy(self._data)
773 data.pop('uuid', None)
775 taskfilter = TaskFilter(self.warrior)
776 for key, value in data.items():
777 taskfilter.add_filter_param(key, value)
779 output = self.warrior.execute_command(['export', '--'] +
780 taskfilter.get_filter_params())
782 # If more than 1 task has been matched still, raise an exception
783 if not valid(output):
784 raise TaskWarriorException(
785 "Unique identifiers {0} with description: {1} matches "
786 "multiple tasks: {2}".format(
787 self['uuid'] or self['id'], self['description'], output)
790 new_data = json.loads(output[0])
793 [(k, new_data.get(k)) for k in only_fields])
794 self._update_data(to_update, update_original=True)
796 self._load_data(new_data)
798 class TaskFilter(SerializingObject):
800 A set of parameters to filter the task list with.
803 def __init__(self, warrior, filter_params=None):
804 self.filter_params = filter_params or []
805 super(TaskFilter, self).__init__(warrior)
807 def add_filter(self, filter_str):
808 self.filter_params.append(filter_str)
810 def add_filter_param(self, key, value):
811 key = key.replace('__', '.')
813 # Replace the value with empty string, since that is the
814 # convention in TW for empty values
815 attribute_key = key.split('.')[0]
817 # Since this is user input, we need to normalize before we serialize
818 value = self._normalize(attribute_key, value)
819 value = self._serialize(attribute_key, value)
821 # If we are filtering by uuid:, do not use uuid keyword
824 self.filter_params.insert(0, value)
826 # Surround value with aphostrophes unless it's a empty string
827 value = "'%s'" % value if value else ''
829 # We enforce equality match by using 'is' (or 'none') modifier
830 # Without using this syntax, filter fails due to TW-1479
831 # which is, however, fixed in 2.4.5
832 if self.warrior.version < VERSION_2_4_5:
833 modifier = '.is' if value else '.none'
834 key = key + modifier if '.' not in key else key
836 self.filter_params.append(six.u("{0}:{1}").format(key, value))
838 def get_filter_params(self):
839 return [f for f in self.filter_params if f]
842 c = self.__class__(self.warrior)
843 c.filter_params = list(self.filter_params)
847 class TaskQuerySet(object):
849 Represents a lazy lookup for a task objects.
852 def __init__(self, warrior=None, filter_obj=None):
853 self.warrior = warrior
854 self._result_cache = None
855 self.filter_obj = filter_obj or TaskFilter(warrior)
857 def __deepcopy__(self, memo):
859 Deep copy of a QuerySet doesn't populate the cache
861 obj = self.__class__()
862 for k, v in self.__dict__.items():
863 if k in ('_iter', '_result_cache'):
864 obj.__dict__[k] = None
866 obj.__dict__[k] = copy.deepcopy(v, memo)
870 data = list(self[:REPR_OUTPUT_SIZE + 1])
871 if len(data) > REPR_OUTPUT_SIZE:
872 data[-1] = "...(remaining elements truncated)..."
876 if self._result_cache is None:
877 self._result_cache = list(self)
878 return len(self._result_cache)
881 if self._result_cache is None:
882 self._result_cache = self._execute()
883 return iter(self._result_cache)
885 def __getitem__(self, k):
886 if self._result_cache is None:
887 self._result_cache = list(self)
888 return self._result_cache.__getitem__(k)
891 if self._result_cache is not None:
892 return bool(self._result_cache)
895 except StopIteration:
899 def __nonzero__(self):
900 return type(self).__bool__(self)
902 def _clone(self, klass=None, **kwargs):
904 klass = self.__class__
905 filter_obj = self.filter_obj.clone()
906 c = klass(warrior=self.warrior, filter_obj=filter_obj)
907 c.__dict__.update(kwargs)
912 Fetch the tasks which match the current filters.
914 return self.warrior.filter_tasks(self.filter_obj)
918 Returns a new TaskQuerySet that is a copy of the current one.
923 return self.filter(status=PENDING)
926 return self.filter(status=COMPLETED)
928 def filter(self, *args, **kwargs):
930 Returns a new TaskQuerySet with the given filters added.
932 clone = self._clone()
934 clone.filter_obj.add_filter(f)
935 for key, value in kwargs.items():
936 clone.filter_obj.add_filter_param(key, value)
939 def get(self, **kwargs):
941 Performs the query and returns a single object matching the given
944 clone = self.filter(**kwargs)
947 return clone._result_cache[0]
949 raise Task.DoesNotExist(
950 'Task matching query does not exist. '
951 'Lookup parameters were {0}'.format(kwargs))
953 'get() returned more than one Task -- it returned {0}! '
954 'Lookup parameters were {1}'.format(num, kwargs))
957 class TaskWarrior(object):
958 def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
959 self.taskrc_location = os.path.expanduser(taskrc_location)
961 # If taskrc does not exist, pass / to use defaults and avoid creating
962 # dummy .taskrc file by TaskWarrior
963 if not os.path.exists(self.taskrc_location):
964 self.taskrc_location = '/'
966 self.version = self._get_version()
968 'confirmation': 'no',
969 'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
970 'recurrence.confirmation': 'no', # Necessary for modifying R tasks
972 # Defaults to on since 2.4.5, we expect off during parsing
975 # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
976 # arbitrary big number which is likely to be large enough
977 'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
980 # Set data.location override if passed via kwarg
981 if data_location is not None:
982 data_location = os.path.expanduser(data_location)
983 if create and not os.path.exists(data_location):
984 os.makedirs(data_location)
985 self.config['data.location'] = data_location
987 self.tasks = TaskQuerySet(self)
989 def _get_command_args(self, args, config_override={}):
990 command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
991 config = self.config.copy()
992 config.update(config_override)
993 for item in config.items():
994 command_args.append('rc.{0}={1}'.format(*item))
995 command_args.extend(map(six.text_type, args))
998 def _get_version(self):
999 p = subprocess.Popen(
1000 ['task', '--version'],
1001 stdout=subprocess.PIPE,
1002 stderr=subprocess.PIPE)
1003 stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
1004 return stdout.strip('\n')
1006 def get_config(self):
1007 raw_output = self.execute_command(
1009 config_override={'verbose': 'nothing'}
1013 config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
1015 for line in raw_output:
1016 match = config_regex.match(line)
1018 config[match.group('key')] = match.group('value').strip()
1022 def execute_command(self, args, config_override={}, allow_failure=True,
1024 command_args = self._get_command_args(
1025 args, config_override=config_override)
1026 logger.debug(' '.join(command_args))
1027 p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
1028 stderr=subprocess.PIPE)
1029 stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
1030 if p.returncode and allow_failure:
1032 error_msg = stderr.strip()
1034 error_msg = stdout.strip()
1035 raise TaskWarriorException(error_msg)
1037 # Return all whole triplet only if explicitly asked for
1039 return stdout.rstrip().split('\n')
1041 return (stdout.rstrip().split('\n'),
1042 stderr.rstrip().split('\n'),
1045 def enforce_recurrence(self):
1046 # Run arbitrary report command which will trigger generation
1047 # of recurrent tasks.
1049 # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
1050 if self.version < VERSION_2_4_2:
1051 self.execute_command(['next'], allow_failure=False)
1053 def filter_tasks(self, filter_obj):
1054 self.enforce_recurrence()
1055 args = ['export', '--'] + filter_obj.get_filter_params()
1057 for line in self.execute_command(args):
1059 data = line.strip(',')
1061 filtered_task = Task(self)
1062 filtered_task._load_data(json.loads(data))
1063 tasks.append(filtered_task)
1065 raise TaskWarriorException('Invalid JSON: %s' % data)
1068 def merge_with(self, path, push=False):
1069 path = path.rstrip('/') + '/'
1070 self.execute_command(['merge', path], config_override={
1071 'merge.autopush': 'yes' if push else 'no',
1075 self.execute_command(['undo'])