madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged.
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
from __future__ import print_function
import copy
from __future__ import print_function
import copy
import json
import logging
import os
import json
import logging
import os
-from backends import TaskWarrior, TaskWarriorException
-from serializing import SerializingObject
+from .serializing import SerializingObject
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
-DATE_FORMAT_CALC = '%Y-%m-%dT%H:%M:%S'
REPR_OUTPUT_SIZE = 10
PENDING = 'pending'
COMPLETED = 'completed'
REPR_OUTPUT_SIZE = 10
PENDING = 'pending'
COMPLETED = 'completed'
+DELETED = 'deleted'
+WAITING = 'waiting'
+RECURRING = 'recurring'
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
-local_zone = tzlocal.get_localzone()
class ReadOnlyDictView(object):
class ReadOnlyDictView(object):
def __len__(self):
return len(self.viewed_dict)
def __len__(self):
return len(self.viewed_dict)
+ def __unicode__(self):
+ return six.u('ReadOnlyDictView: {0}'.format(repr(self.viewed_dict)))
+
+ __repr__ = __unicode__
+
def get(self, key, default=None):
return copy.deepcopy(self.viewed_dict.get(key, default))
def get(self, key, default=None):
return copy.deepcopy(self.viewed_dict.get(key, default))
if update_original:
self._original_data = copy.deepcopy(self._data)
if update_original:
self._original_data = copy.deepcopy(self._data)
def __getitem__(self, key):
# This is a workaround to make TaskResource non-iterable
# over simple index-based iteration
def __getitem__(self, key):
# This is a workaround to make TaskResource non-iterable
# over simple index-based iteration
# to pass that to TaskWarrior.
data_tuples = filter(lambda t: t[1] is not '', data_tuples)
data = dict(data_tuples)
# to pass that to TaskWarrior.
data_tuples = filter(lambda t: t[1] is not '', data_tuples)
data = dict(data_tuples)
- return json.dumps(data, separators=(',',':'))
+ return json.dumps(data, separators=(',', ':'))
@property
def _modified_fields(self):
@property
def _modified_fields(self):
def __init__(self, task, data=None):
self.task = task
self._load_data(data or dict())
def __init__(self, task, data=None):
self.task = task
self._load_data(data or dict())
- super(TaskAnnotation, self).__init__(task.warrior)
+ super(TaskAnnotation, self).__init__(task.backend)
def remove(self):
self.task.remove_annotation(self)
def remove(self):
self.task.remove_annotation(self)
# their data dics are the same
return self.task == other.task and self._data == other._data
# their data dics are the same
return self.task == other.task and self._data == other._data
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
- def from_input(cls, input_file=sys.stdin, modify=None, warrior=None):
+ def from_input(cls, input_file=sys.stdin, modify=None, backend=None):
"""
Creates a Task object, directly from the stdin, by reading one line.
If modify=True, two lines are used, first line interpreted as the
"""
Creates a Task object, directly from the stdin, by reading one line.
If modify=True, two lines are used, first line interpreted as the
modify = name.startswith('on-modify') if modify is None else modify
# Create the TaskWarrior instance if none passed
modify = name.startswith('on-modify') if modify is None else modify
# Create the TaskWarrior instance if none passed
+ if backend is None:
+ backends = importlib.import_module('tasklib.backends')
hook_parent_dir = os.path.dirname(os.path.dirname(sys.argv[0]))
hook_parent_dir = os.path.dirname(os.path.dirname(sys.argv[0]))
- warrior = TaskWarrior(data_location=hook_parent_dir)
+ backend = backends.TaskWarrior(data_location=hook_parent_dir)
# TaskWarrior instance is set to None
# TaskWarrior instance is set to None
# Load the data from the input
task._load_data(json.loads(input_file.readline().strip()))
# Load the data from the input
task._load_data(json.loads(input_file.readline().strip()))
- def __init__(self, warrior, **kwargs):
- super(Task, self).__init__(warrior)
+ def __init__(self, backend, **kwargs):
+ super(Task, self).__init__(backend)
# Check that user is not able to set read-only value in __init__
for key in kwargs.keys():
# Check that user is not able to set read-only value in __init__
for key in kwargs.keys():
# If the tasks are not saved, compare the actual instances
return id(self) == id(other)
# If the tasks are not saved, compare the actual instances
return id(self) == id(other)
+ def __ne__(self, other):
+ return not self.__eq__(other)
def __hash__(self):
if self['uuid']:
def __hash__(self):
if self['uuid']:
def pending(self):
return self['status'] == six.text_type('pending')
def pending(self):
return self['status'] == six.text_type('pending')
+ @property
+ def recurring(self):
+ return self['status'] == six.text_type('recurring')
+
@property
def active(self):
return self['start'] is not None
@property
def active(self):
return self['start'] is not None
else:
self._load_data(new_data)
else:
self._load_data(new_data)
class TaskQuerySet(object):
"""
Represents a lazy lookup for a task objects.
"""
class TaskQuerySet(object):
"""
Represents a lazy lookup for a task objects.
"""
- def __init__(self, warrior=None, filter_obj=None):
- self.warrior = warrior
+ def __init__(self, backend, filter_obj=None):
+ self.backend = backend
self._result_cache = None
self._result_cache = None
- self.filter_obj = filter_obj or TaskWarriorFilter(warrior)
+ self.filter_obj = filter_obj or self.backend.filter_class(backend)
def __deepcopy__(self, memo):
"""
Deep copy of a QuerySet doesn't populate the cache
"""
def __deepcopy__(self, memo):
"""
Deep copy of a QuerySet doesn't populate the cache
"""
+ obj = self.__class__(backend=self.backend)
for k, v in self.__dict__.items():
if k in ('_iter', '_result_cache'):
obj.__dict__[k] = None
for k, v in self.__dict__.items():
if k in ('_iter', '_result_cache'):
obj.__dict__[k] = None
if klass is None:
klass = self.__class__
filter_obj = self.filter_obj.clone()
if klass is None:
klass = self.__class__
filter_obj = self.filter_obj.clone()
- c = klass(warrior=self.warrior, filter_obj=filter_obj)
+ c = klass(backend=self.backend, filter_obj=filter_obj)
c.__dict__.update(kwargs)
return c
c.__dict__.update(kwargs)
return c
"""
Fetch the tasks which match the current filters.
"""
"""
Fetch the tasks which match the current filters.
"""
- return self.warrior.filter_tasks(self.filter_obj)
+ return self.backend.filter_tasks(self.filter_obj)
def completed(self):
return self.filter(status=COMPLETED)
def completed(self):
return self.filter(status=COMPLETED)
+ def deleted(self):
+ return self.filter(status=DELETED)
+
+ def waiting(self):
+ return self.filter(status=WAITING)
+
+ def recurring(self):
+ return self.filter(status=RECURRING)
+
def filter(self, *args, **kwargs):
"""
Returns a new TaskQuerySet with the given filters added.
def filter(self, *args, **kwargs):
"""
Returns a new TaskQuerySet with the given filters added.