]>
git.madduck.net Git - etc/taskwarrior.git/blobdiff - tasklib/task.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
VERSION_2_4_0 = six.u('2.4.0')
VERSION_2_4_1 = six.u('2.4.1')
VERSION_2_4_2 = six.u('2.4.2')
VERSION_2_4_0 = six.u('2.4.0')
VERSION_2_4_1 = six.u('2.4.1')
VERSION_2_4_2 = six.u('2.4.2')
+VERSION_2_4_3 = six.u('2.4.3')
logger = logging.getLogger(__name__)
local_zone = tzlocal.get_localzone()
logger = logging.getLogger(__name__)
local_zone = tzlocal.get_localzone()
+ def __init__(self, warrior):
+ self.warrior = warrior
+
def _deserialize(self, key, value):
hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
lambda x: x if x != '' else None)
def _deserialize(self, key, value):
hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
lambda x: x if x != '' else None)
# If the value is already localized, there is no need to change
# time zone at this point. Also None is a valid value too.
localized = value
# If the value is already localized, there is no need to change
# time zone at this point. Also None is a valid value too.
localized = value
- elif isinstance(value, six.string_types):
+ elif (isinstance(value, six.string_types)
+ and self.warrior.version >= VERSION_2_4_0):
# For strings, use 'task calc' to evaluate the string to datetime
# For strings, use 'task calc' to evaluate the string to datetime
+ # available since TW 2.4.0
args = value.split()
result = self.warrior.execute_command(['calc'] + args)
naive = datetime.datetime.strptime(result[0], DATE_FORMAT_CALC)
args = value.split()
result = self.warrior.execute_command(['calc'] + args)
naive = datetime.datetime.strptime(result[0], DATE_FORMAT_CALC)
def __init__(self, task, data={}):
self.task = task
self._load_data(data)
def __init__(self, task, data={}):
self.task = task
self._load_data(data)
+ super(TaskAnnotation, self).__init__(task.warrior)
def remove(self):
self.task.remove_annotation(self)
def remove(self):
self.task.remove_annotation(self)
return task
def __init__(self, warrior, **kwargs):
return task
def __init__(self, warrior, **kwargs):
+ super(Task, self).__init__(warrior)
# Check that user is not able to set read-only value in __init__
for key in kwargs.keys():
# Check that user is not able to set read-only value in __init__
for key in kwargs.keys():
A set of parameters to filter the task list with.
"""
A set of parameters to filter the task list with.
"""
- def __init__(self, filter_params=[]):
+ def __init__(self, warrior, filter_params=[]):
self.filter_params = filter_params
self.filter_params = filter_params
+ super(TaskFilter, self).__init__(warrior)
def add_filter(self, filter_str):
self.filter_params.append(filter_str)
def add_filter(self, filter_str):
self.filter_params.append(filter_str)
return [f for f in self.filter_params if f]
def clone(self):
return [f for f in self.filter_params if f]
def clone(self):
+ c = self.__class__(self.warrior )
c.filter_params = list(self.filter_params)
return c
c.filter_params = list(self.filter_params)
return c
def __init__(self, warrior=None, filter_obj=None):
self.warrior = warrior
self._result_cache = None
def __init__(self, warrior=None, filter_obj=None):
self.warrior = warrior
self._result_cache = None
- self.filter_obj = filter_obj or TaskFilter()
+ self.filter_obj = filter_obj or TaskFilter(warrior )
def __deepcopy__(self, memo):
"""
def __deepcopy__(self, memo):
"""
if create and not os.path.exists(data_location):
os.makedirs(data_location)
if create and not os.path.exists(data_location):
os.makedirs(data_location)
+ self.version = self._get_version()
self.config = {
'data.location': data_location,
'confirmation': 'no',
'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
'recurrence.confirmation': 'no', # Necessary for modifying R tasks
self.config = {
'data.location': data_location,
'confirmation': 'no',
'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
'recurrence.confirmation': 'no', # Necessary for modifying R tasks
+ # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
+ # arbitrary big number which is likely to be large enough
+ 'bulk': 0 if self.version > VERSION_2_4_3 else 100000,
}
self.tasks = TaskQuerySet(self)
}
self.tasks = TaskQuerySet(self)
- self.version = self._get_version()
def _get_command_args(self, args, config_override={}):
command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
def _get_command_args(self, args, config_override={}):
command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
else:
error_msg = stdout.strip()
raise TaskWarriorException(error_msg)
else:
error_msg = stdout.strip()
raise TaskWarriorException(error_msg)
- return stdout.strip().split('\n')
+ return stdout.r strip().split('\n')
def enforce_recurrence(self):
# Run arbitrary report command which will trigger generation
def enforce_recurrence(self):
# Run arbitrary report command which will trigger generation