pass
+class ReadOnlyDictView(object):
+ """
+ Provides simplified read-only view upon dict object.
+ """
+
+ def __init__(self, viewed_dict):
+ self.viewed_dict = viewed_dict
+
+ def __getitem__(self, key):
+ return copy.deepcopy(self.viewed_dict.__getitem__(key))
+
+ def __contains__(self, k):
+ return self.viewed_dict.__contains__(k)
+
+ def __iter__(self):
+ for value in self.viewed_dict:
+ yield copy.deepcopy(value)
+
+ def __len__(self):
+ return len(self.viewed_dict)
+
+ def get(self, key, default=None):
+ return copy.deepcopy(self.viewed_dict.get(key, default))
+
+ def items(self):
+ return [copy.deepcopy(v) for v in self.viewed_dict.items()]
+
+ def values(self):
+ return [copy.deepcopy(v) for v in self.viewed_dict.values()]
+
+
class SerializingObject(object):
"""
Common ancestor for TaskResource & TaskFilter, since they both
not export empty-valued attributes) if the attribute
is not iterable (e.g. list or set), in which case
a empty iterable should be used.
+
+ Normalizing methods should hold the following contract:
+ - They are used to validate and normalize the user input.
+ Any attribute value that comes from the user (during Task
+ initialization, assignign values to Task attributes, or
+ filtering by user-provided values of attributes) is first
+ validated and normalized using the normalize_{key} method.
+ - If validation or normalization fails, normalizer is expected
+ to raise ValueError.
"""
def _deserialize(self, key, value):
localized = value
return localized
-
+
+ def normalize_uuid(self, value):
+ # Enforce sane UUID
+ if not isinstance(value, six.text_type) or value == '':
+ raise ValueError("UUID must be a valid non-empty string.")
+
+ return value
class TaskResource(SerializingObject):
for (key, value) in six.iteritems(kwargs))
self._original_data = copy.deepcopy(self._data)
+ # Provide read only access to the original data
+ self.original = ReadOnlyDictView(self._original_data)
+
def __unicode__(self):
return self['description']
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
- def execute_command(self, args, config_override={}):
+ def execute_command(self, args, config_override={}, allow_failure=True):
command_args = self._get_command_args(
args, config_override=config_override)
logger.debug(' '.join(command_args))
p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
- if p.returncode:
+ if p.returncode and allow_failure:
if stderr.strip():
error_msg = stderr.strip().splitlines()[-1]
else:
raise TaskWarriorException(error_msg)
return stdout.strip().split('\n')
+ def enforce_recurrence(self):
+ # Run arbitrary report command which will trigger generation
+ # of recurrent tasks.
+ # TODO: Make a version dependant enforcement once
+ # TW-1531 is handled
+ self.execute_command(['next'], allow_failure=False)
+
def filter_tasks(self, filter_obj):
+ self.enforce_recurrence()
args = ['export', '--'] + filter_obj.get_filter_params()
tasks = []
for line in self.execute_command(args):