Skip to content

Commit

Permalink
Tasks hash ids update when run function definition changes (#252)
Browse files Browse the repository at this point in the history
* hash own codes

* fix

* [fix]

* fix lint

* fix

* [fix] bug

* [add] test_case

* [fix] chenge parameter description

* [add] document

* [fix] yapf

* [fix] add _visible_in_registry to declare two classes with the same name.

* fix docs

* [fix] filter error with getsource without try except

* fix

* fix
  • Loading branch information
ujiuji1259 authored Oct 15, 2021
1 parent 06ede5c commit 2a48aef
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/task_on_kart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ Please refer to :func:`~gokart.task.TaskOnKart.make_target` and described later
When using `.feather`, index will be converted to column at saving and restored to index at loading.
If you don't prefere saving index, set `store_index_in_feather=False` parameter at `gokart.target.make_target()`.

.. note::
When you set `serialized_task_definition_check=True`, the task will rerun when you modify the scripts of the task.
Please note that the scripts outside the class are not considered.



TaskOnKart.load
----------------
Expand Down
19 changes: 19 additions & 0 deletions gokart/task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import inspect
import os
import types
from importlib import import_module
Expand Down Expand Up @@ -42,6 +43,10 @@ class TaskOnKart(luigi.Task):
description='If this is true, this task will not run only if all input and output files exist,'
' and all input files are modified before output file are modified.',
significant=False)
serialized_task_definition_check = luigi.BoolParameter(default=False,
description='If this is true, even if all outputs are present,'
'this task will be executed if any changes have been made to the code.',
significant=False)
delete_unnecessary_output_files = luigi.BoolParameter(default=False, description='If this is true, delete unnecessary output files.', significant=False)
significant = luigi.BoolParameter(default=True,
description='If this is false, this task is not treated as a part of dependent tasks for the unique id.',
Expand Down Expand Up @@ -265,6 +270,18 @@ def dump(self, obj, target: Union[None, str, TargetOnKart] = None) -> None:
assert not obj.empty
self._get_output_target(target).dump(obj, lock_at_dump=self._lock_at_dump)

@staticmethod
def get_code(target_class) -> Set[str]:
def has_sourcecode(obj):
return inspect.ismethod(obj) or inspect.isfunction(obj) or inspect.isframe(obj) or inspect.iscode(obj)

return {inspect.getsource(t) for _, t in inspect.getmembers(target_class, has_sourcecode)}

def get_own_code(self):
gokart_codes = self.get_code(TaskOnKart)
own_codes = self.get_code(self)
return ''.join(sorted(list(own_codes - gokart_codes)))

def make_unique_id(self):
unique_id = self.task_unique_id or self._make_hash_id()
if self.cache_unique_id:
Expand All @@ -281,6 +298,8 @@ def _to_str_params(task):
dependencies = [d for d in dependencies if d is not None]
dependencies.append(self.to_str_params(only_significant=True))
dependencies.append(self.__class__.__name__)
if self.serialized_task_definition_check:
dependencies.append(self.get_own_code())
return hashlib.md5(str(dependencies).encode()).hexdigest()

def _get_input_targets(self, target: Union[None, str, TargetOnKart]) -> Union[TargetOnKart, List[TargetOnKart]]:
Expand Down
28 changes: 28 additions & 0 deletions test/test_task_on_kart.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,34 @@ def test_make_target_with_processor(self):
self.assertEqual(target._processor, processor)
self.assertIsInstance(target, SingleFileTarget)

def test_get_own_code(self):
task = _DummyTask()
task_scripts = "def output(self):\nreturn None\n"
self.assertEqual(task.get_own_code().replace(' ', ''), task_scripts.replace(' ', ''))

def test_make_unique_id_with_own_code(self):
class _MyDummyTaskA(gokart.TaskOnKart):
_visible_in_registry = False

def run(self):
self.dump('Hello, world!')

task_unique_id = _MyDummyTaskA(serialized_task_definition_check=False).make_unique_id()
task_with_code_unique_id = _MyDummyTaskA(serialized_task_definition_check=True).make_unique_id()
self.assertNotEqual(task_unique_id, task_with_code_unique_id)

class _MyDummyTaskA(gokart.TaskOnKart):
_visible_in_registry = False

def run(self):
modified_code = 'modify!!'
self.dump(modified_code)

task_modified_unique_id = _MyDummyTaskA(serialized_task_definition_check=False).make_unique_id()
task_modified_with_code_unique_id = _MyDummyTaskA(serialized_task_definition_check=True).make_unique_id()
self.assertEqual(task_modified_unique_id, task_unique_id)
self.assertNotEqual(task_modified_with_code_unique_id, task_with_code_unique_id)

def test_compare_targets_of_different_tasks(self):
path1 = _DummyTask(param=1).make_target('test.txt')._target.path
path2 = _DummyTask(param=2).make_target('test.txt')._target.path
Expand Down

0 comments on commit 2a48aef

Please sign in to comment.