VERSION_2_2_0 = six.u('2.2.0')
VERSION_2_3_0 = six.u('2.3.0')
VERSION_2_4_0 = six.u('2.4.0')
+VERSION_2_4_1 = six.u('2.4.1')
+VERSION_2_4_2 = six.u('2.4.2')
logger = logging.getLogger(__name__)
local_zone = tzlocal.get_localzone()
pass
+class ReadOnlyDictView(object):
+ """
+ Provides simplified read-only view upon dict object.
+ """
+
+ def __init__(self, viewed_dict):
+ self.viewed_dict = viewed_dict
+
+ def __getitem__(self, key):
+ return copy.deepcopy(self.viewed_dict.__getitem__(key))
+
+ def __contains__(self, k):
+ return self.viewed_dict.__contains__(k)
+
+ def __iter__(self):
+ for value in self.viewed_dict:
+ yield copy.deepcopy(value)
+
+ def __len__(self):
+ return len(self.viewed_dict)
+
+ def get(self, key, default=None):
+ return copy.deepcopy(self.viewed_dict.get(key, default))
+
+ def items(self):
+ return [copy.deepcopy(v) for v in self.viewed_dict.items()]
+
+ def values(self):
+ return [copy.deepcopy(v) for v in self.viewed_dict.values()]
+
+
class SerializingObject(object):
"""
Common ancestor for TaskResource & TaskFilter, since they both
not export empty-valued attributes) if the attribute
is not iterable (e.g. list or set), in which case
a empty iterable should be used.
+
+ Normalizing methods should hold the following contract:
+ - They are used to validate and normalize the user input.
+ Any attribute value that comes from the user (during Task
+ initialization, assignign values to Task attributes, or
+ filtering by user-provided values of attributes) is first
+ validated and normalized using the normalize_{key} method.
+ - If validation or normalization fails, normalizer is expected
+ to raise ValueError.
"""
def _deserialize(self, key, value):
or entered as a value of Task attribute.
"""
+ # None value should not be converted by normalizer
+ if value is None:
+ return None
+
normalize_func = getattr(self, 'normalize_{0}'.format(key),
lambda x: x)
def deserialize_entry(self, value):
return self.timestamp_deserializer(value)
+ def normalize_entry(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_modified(self, value):
return self.timestamp_serializer(value)
def deserialize_modified(self, value):
return self.timestamp_deserializer(value)
+ def normalize_modified(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_start(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_start(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_start(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_end(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_end(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_end(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_due(self, value):
return self.timestamp_serializer(value)
def deserialize_due(self, value):
return self.timestamp_deserializer(value)
+ def normalize_due(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_scheduled(self, value):
return self.timestamp_serializer(value)
def deserialize_scheduled(self, value):
return self.timestamp_deserializer(value)
+ def normalize_scheduled(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_until(self, value):
return self.timestamp_serializer(value)
def deserialize_until(self, value):
return self.timestamp_deserializer(value)
+ def normalize_until(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_wait(self, value):
return self.timestamp_serializer(value)
def deserialize_wait(self, value):
return self.timestamp_deserializer(value)
+ def normalize_wait(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_annotations(self, value):
value = value if value is not None else []
uuids = raw_uuids.split(',')
return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
- def normalize_datetime(self, value):
+ def datetime_normalizer(self, value):
"""
Normalizes date/datetime value (considered to come from user input)
to localized datetime value. Following conversions happen:
# If the value is already localized, there is no need to change
# time zone at this point. Also None is a valid value too.
localized = value
-
+
return localized
-
+
+ def normalize_uuid(self, value):
+ # Enforce sane UUID
+ if not isinstance(value, six.string_types) or value == '':
+ raise ValueError("UUID must be a valid non-empty string, "
+ "not: {}".format(value))
+
+ return value
class TaskResource(SerializingObject):
if key in self.read_only_fields:
raise RuntimeError('Field \'%s\' is read-only' % key)
- # Localize any naive date/datetime to the detected timezone
- if (isinstance(value, datetime.datetime) or
- isinstance(value, datetime.date)):
- value = self.normalize_datetime(value)
-
+ # Normalize the user input before saving it
+ value = self._normalize(key, value)
self._data[key] = value
def __str__(self):
pass
@classmethod
- def from_input(cls, input_file=sys.stdin, modify=None):
+ def from_input(cls, input_file=sys.stdin, modify=None, warrior=None):
"""
Creates a Task object, directly from the stdin, by reading one line.
If modify=True, two lines are used, first line interpreted as the
but defaults to sys.stdin.
"""
- # TaskWarrior instance is set to None
- task = cls(None)
-
# Detect the hook type if not given directly
name = os.path.basename(sys.argv[0])
modify = name.startswith('on-modify') if modify is None else modify
+ # Create the TaskWarrior instance if none passed
+ if warrior is None:
+ hook_parent_dir = os.path.dirname(os.path.dirname(sys.argv[0]))
+ warrior = TaskWarrior(data_location=hook_parent_dir)
+
+ # TaskWarrior instance is set to None
+ task = cls(warrior)
+
# Load the data from the input
task._load_data(json.loads(input_file.readline().strip()))
for (key, value) in six.iteritems(kwargs))
self._original_data = copy.deepcopy(self._data)
+ # Provide read only access to the original data
+ self.original = ReadOnlyDictView(self._original_data)
+
def __unicode__(self):
return self['description']
self.warrior.execute_command([self['uuid'], 'delete'])
# Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start', 'end'])
+
+ def start(self):
+ if not self.saved:
+ raise Task.NotSaved("Task needs to be saved before it can be started")
+
+ # Refresh, and raise exception if task is already completed/deleted
self.refresh(only_fields=['status'])
+ if self.completed:
+ raise Task.CompletedTask("Cannot start a completed task")
+ elif self.deleted:
+ raise Task.DeletedTask("Deleted task cannot be started")
+
+ self.warrior.execute_command([self['uuid'], 'start'])
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start'])
def done(self):
if not self.saved:
self.warrior.execute_command([self['uuid'], 'done'])
# Refresh the status again, so that we have updated info stored
- self.refresh(only_fields=['status'])
+ self.refresh(only_fields=['status', 'start', 'end'])
def save(self):
if self.saved and not self.modified:
# convention in TW for empty values
attribute_key = key.split('.')[0]
- # Since this is user input, we need to normalize datetime
- # objects
- if (isinstance(value, datetime.datetime) or
- isinstance(value, datetime.date)):
- value = self.normalize_datetime(value)
-
+ # Since this is user input, we need to normalize before we serialize
+ value = self._normalize(attribute_key, value)
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
- def execute_command(self, args, config_override={}):
+ def execute_command(self, args, config_override={}, allow_failure=True):
command_args = self._get_command_args(
args, config_override=config_override)
logger.debug(' '.join(command_args))
p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
- if p.returncode:
+ if p.returncode and allow_failure:
if stderr.strip():
- error_msg = stderr.strip().splitlines()[-1]
+ error_msg = stderr.strip()
else:
error_msg = stdout.strip()
raise TaskWarriorException(error_msg)
return stdout.strip().split('\n')
+ def enforce_recurrence(self):
+ # Run arbitrary report command which will trigger generation
+ # of recurrent tasks.
+
+ # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
+ if self.version < VERSION_2_4_2:
+ self.execute_command(['next'], allow_failure=False)
+
def filter_tasks(self, filter_obj):
+ self.enforce_recurrence()
args = ['export', '--'] + filter_obj.get_filter_params()
tasks = []
for line in self.execute_command(args):