import json
import logging
import os
+import pytz
import six
import sys
import subprocess
+import tzlocal
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
REPR_OUTPUT_SIZE = 10
VERSION_2_4_0 = six.u('2.4.0')
logger = logging.getLogger(__name__)
+local_zone = tzlocal.get_localzone()
class TaskWarriorException(Exception):
"""
Common ancestor for TaskResource & TaskFilter, since they both
need to serialize arguments.
+
+ Serializing method should hold the following contract:
+ - any empty value (meaning removal of the attribute)
+ is deserialized into a empty string
+ - None denotes a empty value for any attribute
+
+ Deserializing method should hold the following contract:
+ - None denotes a empty value for any attribute (however,
+ this is here as a safeguard, TaskWarrior currently does
+ not export empty-valued attributes) if the attribute
+ is not iterable (e.g. list or set), in which case
+ a empty iterable should be used.
"""
def _deserialize(self, key, value):
lambda x: x if x is not None else '')
return dehydrate_func(value)
+ def _normalize(self, key, value):
+ """
+ Use normalize_<key> methods to normalize user input. Any user
+ input will be normalized at the moment it is used as filter,
+ or entered as a value of Task attribute.
+ """
+
+ normalize_func = getattr(self, 'normalize_{0}'.format(key),
+ lambda x: x)
+
+ return normalize_func(value)
+
def timestamp_serializer(self, date):
if not date:
- return None
+ return ''
+
+ # Any serialized timestamp should be localized, we need to
+ # convert to UTC before converting to string (DATE_FORMAT uses UTC)
+ date = date.astimezone(pytz.utc)
+
return date.strftime(DATE_FORMAT)
def timestamp_deserializer(self, date_str):
if not date_str:
return None
- return datetime.datetime.strptime(date_str, DATE_FORMAT)
+
+ # Return timestamp localized in the local zone
+ naive_timestamp = datetime.datetime.strptime(date_str, DATE_FORMAT)
+ localized_timestamp = pytz.utc.localize(naive_timestamp)
+ return localized_timestamp.astimezone(local_zone)
def serialize_entry(self, value):
return self.timestamp_serializer(value)
def deserialize_entry(self, value):
return self.timestamp_deserializer(value)
+ def normalize_entry(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_modified(self, value):
return self.timestamp_serializer(value)
def deserialize_modified(self, value):
return self.timestamp_deserializer(value)
+ def normalize_modified(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_due(self, value):
return self.timestamp_serializer(value)
def deserialize_due(self, value):
return self.timestamp_deserializer(value)
+ def normalize_due(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_scheduled(self, value):
return self.timestamp_serializer(value)
def deserialize_scheduled(self, value):
return self.timestamp_deserializer(value)
+ def normalize_scheduled(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_until(self, value):
return self.timestamp_serializer(value)
def deserialize_until(self, value):
return self.timestamp_deserializer(value)
+ def normalize_until(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_wait(self, value):
return self.timestamp_serializer(value)
def deserialize_wait(self, value):
return self.timestamp_deserializer(value)
+ def normalize_wait(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_annotations(self, value):
+ value = value if value is not None else []
+
+ # This may seem weird, but it's correct, we want to export
+ # a list of dicts as serialized value
+ serialized_annotations = [json.loads(annotation.export_data())
+ for annotation in value]
+ return serialized_annotations if serialized_annotations else ''
+
def deserialize_annotations(self, data):
return [TaskAnnotation(self, d) for d in data] if data else []
return tags.split(',') if tags else []
return tags or []
- def serialize_depends(self, cur_dependencies):
+ def serialize_depends(self, value):
# Return the list of uuids
- return ','.join(task['uuid'] for task in cur_dependencies)
+ value = value if value is not None else set()
+ return ','.join(task['uuid'] for task in value)
def deserialize_depends(self, raw_uuids):
raw_uuids = raw_uuids or '' # Convert None to empty string
uuids = raw_uuids.split(',')
return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
+ def datetime_normalizer(self, value):
+ """
+ Normalizes date/datetime value (considered to come from user input)
+ to localized datetime value. Following conversions happen:
+
+ naive date -> localized datetime with the same date, and time=midnight
+ naive datetime -> localized datetime with the same value
+ localized datetime -> localized datetime (no conversion)
+ """
+
+ if (isinstance(value, datetime.date)
+ and not isinstance(value, datetime.datetime)):
+ # Convert to local midnight
+ value_full = datetime.datetime.combine(value, datetime.time.min)
+ localized = local_zone.localize(value_full)
+ elif isinstance(value, datetime.datetime) and value.tzinfo is None:
+ # Convert to localized datetime object
+ localized = local_zone.localize(value)
+ else:
+ # If the value is already localized, there is no need to change
+ # time zone at this point. Also None is a valid value too.
+ localized = value
+
+ return localized
+
+
class TaskResource(SerializingObject):
read_only_fields = []
def __setitem__(self, key, value):
if key in self.read_only_fields:
raise RuntimeError('Field \'%s\' is read-only' % key)
+
+ # Normalize the user input before saving it
+ value = self._normalize(key, value)
self._data[key] = value
def __str__(self):
def __repr__(self):
return str(self)
+ def export_data(self):
+ """
+ Exports current data contained in the Task as JSON
+ """
+
+ # We need to remove spaces for TW-1504, use custom separators
+ data_tuples = ((key, self._serialize(key, value))
+ for key, value in six.iteritems(self._data))
+
+ # Empty string denotes empty serialized value, we do not want
+ # to pass that to TaskWarrior.
+ data_tuples = filter(lambda t: t[1] is not '', data_tuples)
+ data = dict(data_tuples)
+ return json.dumps(data, separators=(',',':'))
+
+ @property
+ def _modified_fields(self):
+ writable_fields = set(self._data.keys()) - set(self.read_only_fields)
+ for key in writable_fields:
+ new_value = self._data.get(key)
+ old_value = self._original_data.get(key)
+
+ # Make sure not to mark data removal as modified field if the
+ # field originally had some empty value
+ if key in self._data and not new_value and not old_value:
+ continue
+
+ if new_value != old_value:
+ yield key
+
+ @property
+ def modified(self):
+ return bool(list(self._modified_fields))
+
class TaskAnnotation(TaskResource):
read_only_fields = ['entry', 'description']
# __init__ methods, that would be confusing
# Rather unfortunate syntax due to python2.6 comaptiblity
- self._load_data(dict((key, self._serialize(key, value))
- for (key, value) in six.iteritems(kwargs)))
+ self._data = dict((key, self._normalize(key, value))
+ for (key, value) in six.iteritems(kwargs))
+ self._original_data = copy.deepcopy(self._data)
def __unicode__(self):
return self['description']
# If the tasks are not saved, return hash of instance id
return id(self).__hash__()
- @property
- def _modified_fields(self):
- writable_fields = set(self._data.keys()) - set(self.read_only_fields)
- for key in writable_fields:
- new_value = self._data.get(key)
- old_value = self._original_data.get(key)
-
- # Make sure not to mark data removal as modified field if the
- # field originally had some empty value
- if key in self._data and not new_value and not old_value:
- continue
-
- if new_value != old_value:
- yield key
-
- @property
- def modified(self):
- return bool(list(self._modified_fields))
-
@property
def completed(self):
return self['status'] == six.text_type('completed')
def serialize_depends(self, cur_dependencies):
# Check that all the tasks are saved
- for task in cur_dependencies:
+ for task in (cur_dependencies or set()):
if not task.saved:
raise Task.NotSaved('Task \'%s\' needs to be saved before '
'it can be set as dependency.' % task)
else:
self._load_data(new_data)
- def export_data(self):
- """
- Exports current data contained in the Task as JSON
- """
-
- # We need to remove spaces for TW-1504, use custom separators
- data_tuples = ((key, self._serialize(key, value))
- for key, value in six.iteritems(self._data))
-
- # Empty string denotes empty serialized value, we do not want
- # to pass that to TaskWarrior.
- data_tuples = filter(lambda t: t[1] is not '', data_tuples)
- data = dict(data_tuples)
- return json.dumps(data, separators=(',',':'))
-
class TaskFilter(SerializingObject):
"""
A set of parameters to filter the task list with.
# Replace the value with empty string, since that is the
# convention in TW for empty values
attribute_key = key.split('.')[0]
+
+ # Since this is user input, we need to normalize before we serialize
+ value = self._normalize(key, value)
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
self.config = {
'data.location': os.path.expanduser(data_location),
'confirmation': 'no',
- 'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
+ 'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
+ 'recurrence.confirmation': 'no', # Necessary for modifying R tasks
}
self.tasks = TaskQuerySet(self)
self.version = self._get_version()