]>
git.madduck.net Git - etc/taskwarrior.git/blobdiff - tasklib/task.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
def deserialize_entry(self, value):
return self.timestamp_deserializer(value)
def deserialize_entry(self, value):
return self.timestamp_deserializer(value)
+ def normalize_entry(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_modified(self, value):
return self.timestamp_serializer(value)
def deserialize_modified(self, value):
return self.timestamp_deserializer(value)
def serialize_modified(self, value):
return self.timestamp_serializer(value)
def deserialize_modified(self, value):
return self.timestamp_deserializer(value)
+ def normalize_modified(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_due(self, value):
return self.timestamp_serializer(value)
def deserialize_due(self, value):
return self.timestamp_deserializer(value)
def serialize_due(self, value):
return self.timestamp_serializer(value)
def deserialize_due(self, value):
return self.timestamp_deserializer(value)
+ def normalize_due(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_scheduled(self, value):
return self.timestamp_serializer(value)
def deserialize_scheduled(self, value):
return self.timestamp_deserializer(value)
def serialize_scheduled(self, value):
return self.timestamp_serializer(value)
def deserialize_scheduled(self, value):
return self.timestamp_deserializer(value)
+ def normalize_scheduled(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_until(self, value):
return self.timestamp_serializer(value)
def deserialize_until(self, value):
return self.timestamp_deserializer(value)
def serialize_until(self, value):
return self.timestamp_serializer(value)
def deserialize_until(self, value):
return self.timestamp_deserializer(value)
+ def normalize_until(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_wait(self, value):
return self.timestamp_serializer(value)
def deserialize_wait(self, value):
return self.timestamp_deserializer(value)
def serialize_wait(self, value):
return self.timestamp_serializer(value)
def deserialize_wait(self, value):
return self.timestamp_deserializer(value)
+ def normalize_wait(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_annotations(self, value):
value = value if value is not None else []
def serialize_annotations(self, value):
value = value if value is not None else []
uuids = raw_uuids.split(',')
return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
uuids = raw_uuids.split(',')
return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
- def normalize_datetime (self, value):
+ def datetime_normalizer (self, value):
"""
Normalizes date/datetime value (considered to come from user input)
to localized datetime value. Following conversions happen:
"""
Normalizes date/datetime value (considered to come from user input)
to localized datetime value. Following conversions happen:
if key in self.read_only_fields:
raise RuntimeError('Field \'%s\' is read-only' % key)
if key in self.read_only_fields:
raise RuntimeError('Field \'%s\' is read-only' % key)
- # Localize any naive date/datetime to the detected timezone
- if (isinstance(value, datetime.datetime) or
- isinstance(value, datetime.date)):
- value = self.normalize_datetime(value)
-
+ # Normalize the user input before saving it
+ value = self._normalize(key, value)
self._data[key] = value
def __str__(self):
self._data[key] = value
def __str__(self):
# convention in TW for empty values
attribute_key = key.split('.')[0]
# convention in TW for empty values
attribute_key = key.split('.')[0]
- # Since this is user input, we need to normalize datetime
- # objects
- if (isinstance(value, datetime.datetime) or
- isinstance(value, datetime.date)):
- value = self.normalize_datetime(value)
-
+ # Since this is user input, we need to normalize before we serialize
+ value = self._normalize(key, value)
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
- def execute_command(self, args, config_override={}):
+ def execute_command(self, args, config_override={}, allow_failure=True ):
command_args = self._get_command_args(
args, config_override=config_override)
logger.debug(' '.join(command_args))
p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
command_args = self._get_command_args(
args, config_override=config_override)
logger.debug(' '.join(command_args))
p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
+ if p.returncode and allow_failure :
if stderr.strip():
error_msg = stderr.strip().splitlines()[-1]
else:
if stderr.strip():
error_msg = stderr.strip().splitlines()[-1]
else:
raise TaskWarriorException(error_msg)
return stdout.strip().split('\n')
raise TaskWarriorException(error_msg)
return stdout.strip().split('\n')
+ def enforce_recurrence(self):
+ # Run arbitrary report command which will trigger generation
+ # of recurrent tasks.
+ # TODO: Make a version dependant enforcement once
+ # TW-1531 is handled
+ self.execute_command(['next'], allow_failure=False)
+
def filter_tasks(self, filter_obj):
def filter_tasks(self, filter_obj):
+ self.enforce_recurrence()
args = ['export', '--'] + filter_obj.get_filter_params()
tasks = []
for line in self.execute_command(args):
args = ['export', '--'] + filter_obj.get_filter_params()
tasks = []
for line in self.execute_command(args):