+class SerializingObject(object):
+ """
+ Common ancestor for TaskResource & TaskFilter, since they both
+ need to serialize arguments.
+
+ Serializing method should hold the following contract:
+ - any empty value (meaning removal of the attribute)
+ is deserialized into a empty string
+ - None denotes a empty value for any attribute
+
+ Deserializing method should hold the following contract:
+ - None denotes a empty value for any attribute (however,
+ this is here as a safeguard, TaskWarrior currently does
+ not export empty-valued attributes) if the attribute
+ is not iterable (e.g. list or set), in which case
+ a empty iterable should be used.
+ """
+
+ def _deserialize(self, key, value):
+ hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
+ lambda x: x if x != '' else None)
+ return hydrate_func(value)
+
+ def _serialize(self, key, value):
+ dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
+ lambda x: x if x is not None else '')
+ return dehydrate_func(value)
+
+ def timestamp_serializer(self, date):
+ if not date:
+ return ''
+ return date.strftime(DATE_FORMAT)
+
+ def timestamp_deserializer(self, date_str):
+ if not date_str:
+ return None
+ return datetime.datetime.strptime(date_str, DATE_FORMAT)
+
+ def serialize_entry(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_entry(self, value):
+ return self.timestamp_deserializer(value)
+
+ def serialize_modified(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_modified(self, value):
+ return self.timestamp_deserializer(value)
+
+ def serialize_due(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_due(self, value):
+ return self.timestamp_deserializer(value)
+
+ def serialize_scheduled(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_scheduled(self, value):
+ return self.timestamp_deserializer(value)
+
+ def serialize_until(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_until(self, value):
+ return self.timestamp_deserializer(value)
+
+ def serialize_wait(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_wait(self, value):
+ return self.timestamp_deserializer(value)
+
+ def deserialize_annotations(self, data):
+ return [TaskAnnotation(self, d) for d in data] if data else []
+
+ def serialize_tags(self, tags):
+ return ','.join(tags) if tags else ''
+
+ def deserialize_tags(self, tags):
+ if isinstance(tags, six.string_types):
+ return tags.split(',') if tags else []
+ return tags or []
+
+ def serialize_depends(self, value):
+ # Return the list of uuids
+ value = value if value is not None else set()
+ return ','.join(task['uuid'] for task in value)
+
+ def deserialize_depends(self, raw_uuids):
+ raw_uuids = raw_uuids or '' # Convert None to empty string
+ uuids = raw_uuids.split(',')
+ return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
+
+
+class TaskResource(SerializingObject):
+ read_only_fields = []
+
+ def _load_data(self, data):
+ self._data = dict((key, self._deserialize(key, value))
+ for key, value in data.items())
+ # We need to use a copy for original data, so that changes
+ # are not propagated.
+ self._original_data = copy.deepcopy(self._data)
+
+ def _update_data(self, data, update_original=False):
+ """
+ Low level update of the internal _data dict. Data which are coming as
+ updates should already be serialized. If update_original is True, the
+ original_data dict is updated as well.
+ """
+ self._data.update(dict((key, self._deserialize(key, value))
+ for key, value in data.items()))
+
+ if update_original:
+ self._original_data = copy.deepcopy(self._data)