import json
import logging
import os
+import pytz
import six
import sys
import subprocess
+import tzlocal
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
REPR_OUTPUT_SIZE = 10
VERSION_2_4_0 = six.u('2.4.0')
logger = logging.getLogger(__name__)
+local_zone = tzlocal.get_localzone()
class TaskWarriorException(Exception):
def timestamp_serializer(self, date):
if not date:
return ''
+
+ # Any serialized timestamp should be localized, we need to
+ # convert to UTC before converting to string (DATE_FORMAT uses UTC)
+ date = date.astimezone(pytz.utc)
+
return date.strftime(DATE_FORMAT)
def timestamp_deserializer(self, date_str):
if not date_str:
return None
- return datetime.datetime.strptime(date_str, DATE_FORMAT)
+
+ # Return timestamp localized in the local zone
+ naive_timestamp = datetime.datetime.strptime(date_str, DATE_FORMAT)
+ localized_timestamp = pytz.utc.localize(naive_timestamp)
+ return localized_timestamp.astimezone(local_zone)
def serialize_entry(self, value):
return self.timestamp_serializer(value)
def deserialize_entry(self, value):
return self.timestamp_deserializer(value)
+ def normalize_entry(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_modified(self, value):
return self.timestamp_serializer(value)
def deserialize_modified(self, value):
return self.timestamp_deserializer(value)
+ def normalize_modified(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_due(self, value):
return self.timestamp_serializer(value)
def deserialize_due(self, value):
return self.timestamp_deserializer(value)
+ def normalize_due(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_scheduled(self, value):
return self.timestamp_serializer(value)
def deserialize_scheduled(self, value):
return self.timestamp_deserializer(value)
+ def normalize_scheduled(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_until(self, value):
return self.timestamp_serializer(value)
def deserialize_until(self, value):
return self.timestamp_deserializer(value)
+ def normalize_until(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_wait(self, value):
return self.timestamp_serializer(value)
def deserialize_wait(self, value):
return self.timestamp_deserializer(value)
+ def normalize_wait(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_annotations(self, value):
value = value if value is not None else []
uuids = raw_uuids.split(',')
return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
+ def datetime_normalizer(self, value):
+ """
+ Normalizes date/datetime value (considered to come from user input)
+ to localized datetime value. Following conversions happen:
+
+ naive date -> localized datetime with the same date, and time=midnight
+ naive datetime -> localized datetime with the same value
+ localized datetime -> localized datetime (no conversion)
+ """
+
+ if (isinstance(value, datetime.date)
+ and not isinstance(value, datetime.datetime)):
+ # Convert to local midnight
+ value_full = datetime.datetime.combine(value, datetime.time.min)
+ localized = local_zone.localize(value_full)
+ elif isinstance(value, datetime.datetime) and value.tzinfo is None:
+ # Convert to localized datetime object
+ localized = local_zone.localize(value)
+ else:
+ # If the value is already localized, there is no need to change
+ # time zone at this point. Also None is a valid value too.
+ localized = value
+
+ return localized
+
+
class TaskResource(SerializingObject):
read_only_fields = []
def __setitem__(self, key, value):
if key in self.read_only_fields:
raise RuntimeError('Field \'%s\' is read-only' % key)
+
+ # Normalize the user input before saving it
+ value = self._normalize(key, value)
self._data[key] = value
def __str__(self):
# Replace the value with empty string, since that is the
# convention in TW for empty values
attribute_key = key.split('.')[0]
+
+ # Since this is user input, we need to normalize before we serialize
+ value = self._normalize(key, value)
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
- def execute_command(self, args, config_override={}):
+ def execute_command(self, args, config_override={}, allow_failure=True):
command_args = self._get_command_args(
args, config_override=config_override)
logger.debug(' '.join(command_args))
p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
- if p.returncode:
+ if p.returncode and allow_failure:
if stderr.strip():
error_msg = stderr.strip().splitlines()[-1]
else:
raise TaskWarriorException(error_msg)
return stdout.strip().split('\n')
+ def enforce_recurrence(self):
+ # Run arbitrary report command which will trigger generation
+ # of recurrent tasks.
+ # TODO: Make a version dependant enforcement once
+ # TW-1531 is handled
+ self.execute_command(['next'], allow_failure=False)
+
def filter_tasks(self, filter_obj):
+ self.enforce_recurrence()
args = ['export', '--'] + filter_obj.get_filter_params()
tasks = []
for line in self.execute_command(args):