Skip to content

Commit

Permalink
Merge pull request #15 from vaaaaanquish/joblib_sample
Browse files Browse the repository at this point in the history
using multiprocessing
  • Loading branch information
vaaaaanquish authored May 8, 2020
2 parents 926f230 + 22bb40e commit a0f645a
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 105 deletions.
23 changes: 18 additions & 5 deletions .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,28 @@ jobs:
exit 1
fi
echo "pass yapf"
- name: Test with pytest
run: |
poetry run python -m unittest discover -s ./test/unit_test/
- name: Test with mypy
- name: Lint with mypy
run: |
mypy --ignore-missing-imports ./cloudia/
if [ $? != 0 ]; then
echo "failed: mypy"
exit 1
fi
echo "pass mypy"
- name: Unit Test
run: |
poetry run python -m unittest discover -s ./test/unit_test/
- name: Integration Test
run: |
poetry run python ./test/integration_test/test_cloudia_plot.py
if [ $? != 0 ]; then
echo "failed: cloudia_plot"
exit 1
fi
echo "pass cloudia_plot"
poetry run python ./test/integration_test/test_cloudia_pandas_plot.py
if [ $? != 0 ]; then
echo "failed: cloudia_pandas_plot"
exit 1
fi
echo "pass cloudia_pandas_plot"
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ Cloudia(
single_words=[], # It's not split word list, example: ["neural network"]
stop_words=STOPWORDS, # not count words, default is wordcloud.STOPWORDS
extract_postags=['名詞', '英単語', 'ローマ字文'], # part of speech for japanese
word_num=100, # max word num
parser=None, # morphological analysis instance for japanese
parse_func=None, # split text function, example: lambda x: x.split(',')
multiprocess=True # Flag for using multiprocessing
)
```

Expand Down Expand Up @@ -125,9 +124,8 @@ DataFrame.wc.plot(
single_words=[], # It's not split word list, example: ["neural network"]
stop_words=STOPWORDS, # not count words, default is wordcloud.STOPWORDS
extract_postags=['名詞', '英単語', 'ローマ字文'], # part of speech for japanese
word_num=100, # max word num
parser=None, # morphological analysis instance for japanese
parse_func=None, # split text function, example: lambda x: x.split(',')
multiprocess=True, # Flag for using multiprocessing
dark_theme=False, # color theme
title_size=12, # title text size
row_num=3, # for example, 12 wordcloud, row_num=3 -> 4*3image
Expand Down
17 changes: 7 additions & 10 deletions cloudia/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import matplotlib.pyplot as plt
import japanize_matplotlib
from wordcloud import WordCloud, STOPWORDS

from cloudia.word_data import WordData
from cloudia.utils import default_parse_func


class CloudiaBase:
Expand All @@ -12,16 +14,11 @@ def __init__(self,
single_words: List[str] = [],
stop_words: List[str] = STOPWORDS,
extract_postags: List[str] = ['名詞', '英単語', 'ローマ字文'],
word_num: int = 100,
parser: Any = None,
parse_func: Any = None):
self.wd = WordData(data=data,
single_words=single_words,
stop_words=stop_words,
extract_postags=extract_postags,
word_num=word_num,
parser=parser,
parse_func=parse_func)
parse_func: Any = default_parse_func,
multiprocess: bool = True,
**args):
args.update(dict(single_words=single_words, stop_words=stop_words, extract_postags=extract_postags))
self.wd = WordData(data, parse_func, multiprocess, **args)

def make_wordcloud(self, dark_theme: bool, rate: int) -> List[Tuple[str, WordCloud]]:
wordcloud_list = []
Expand Down
38 changes: 11 additions & 27 deletions cloudia/pandas_accessor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Any, List

from cloudia.main import CloudiaBase, Cloudia
import matplotlib.pyplot as plt
from wordcloud import STOPWORDS
import pandas as pd

from cloudia.main import CloudiaBase, Cloudia
from cloudia.utils import default_parse_func


@pd.api.extensions.register_dataframe_accessor('wc')
class CloudiaDataFrame(CloudiaBase):
Expand All @@ -15,22 +17,13 @@ def plot(self,
single_words: List[str] = [],
stop_words: List[str] = STOPWORDS,
extract_postags: List[str] = ['名詞', '英単語', 'ローマ字文'],
word_num: int = 100,
parser: Any = None,
parse_func: Any = None,
parse_func: Any = default_parse_func,
dark_theme: bool = False,
title_size: int = 12,
row_num: int = 3,
figsize_rate: int = 2):
Cloudia(
self.df,
single_words,
stop_words,
extract_postags,
word_num,
parser,
parse_func,
).plot(dark_theme, title_size, row_num, figsize_rate)
figsize_rate: int = 2,
multiprocess: bool = True):
Cloudia(self.df, single_words, stop_words, extract_postags, parse_func, multiprocess).plot(dark_theme, title_size, row_num, figsize_rate)

def save(self, fig_path: str, dark_theme: bool, **args: Any):
self.plot(**args)
Expand All @@ -46,22 +39,13 @@ def plot(self,
single_words: List[str] = [],
stop_words: List[str] = STOPWORDS,
extract_postags: List[str] = ['名詞', '英単語', 'ローマ字文'],
word_num: int = 100,
parser: Any = None,
parse_func: Any = None,
parse_func: Any = default_parse_func,
dark_theme: bool = False,
title_size: int = 12,
row_num: int = 3,
figsize_rate: int = 2):
Cloudia(
self.series,
single_words,
stop_words,
extract_postags,
word_num,
parser,
parse_func,
).plot(dark_theme, title_size, row_num, figsize_rate)
figsize_rate: int = 2,
multiprocess: bool = True):
Cloudia(self.series, single_words, stop_words, extract_postags, parse_func, multiprocess).plot(dark_theme, title_size, row_num, figsize_rate)

def save(self, fig_path: str, dark_theme: bool, **args: Any):
self.plot(**args)
Expand Down
28 changes: 28 additions & 0 deletions cloudia/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from collections import Counter
from typing import List
import re

from wurlitzer import pipes

with pipes() as (out, err):
# https://github.com/clab/dynet/issues/1528
import nagisa

NUM_REGEX = re.compile('^[0-9]+$')


def default_parse_func(text: str, single_words: List[str], extract_postags: List[str], stop_words: List[str]) -> List[str]:
parser = nagisa.Tagger(single_word_list=single_words)
for x in ['"', ';', ',', '(', ')', '\u3000']:
text = text.replace(x, ' ')
text = text.lower()
return [x for x in parser.extract(text, extract_postags=extract_postags).words if len(x) > 1 and not NUM_REGEX.match(x) and x not in stop_words]


def function_wrapper(func):
def _f(t, **kwargs):
i = kwargs.pop('_index')
d = Counter(func(t, **kwargs))
return d, i

return _f
79 changes: 44 additions & 35 deletions cloudia/word_data.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,58 @@
from typing import Any, List, Tuple, Dict
import re

from typing import Any, List, Tuple, Dict, Callable, Union
from itertools import repeat, chain, zip_longest
from collections import Counter

from joblib import Parallel, delayed
import pandas as pd
from wurlitzer import pipes

with pipes() as (out, err):
# https://github.com/clab/dynet/issues/1528
import nagisa
from cloudia.utils import function_wrapper


class WordData:
def __init__(self, data: Any, single_words: List[str], stop_words: List[str], extract_postags: List[str], word_num: int, parser: Any, parse_func: Any):
def __init__(self, data: Any, parse_func: Callable[..., List[str]], multiprocess: bool, **args):
words, self.names = self._init_data(data)
self.word_num = word_num
self.single_words = single_words
self.extract_postags = extract_postags
self.stop_words = stop_words
self.parser = nagisa.Tagger(single_word_list=self.single_words) if not parser else parser
self.num_regex = re.compile('^[0-9]+$')
if parse_func:
self.words = [self.count(parse_func(x)) for x in words]
self.counter_list = self.parse(words, parse_func, multiprocess, **args)
self.words = [self.convert_weight(x) for x in self.counter_list]

def parse(self, words, parse_func: Callable[..., List[str]], multiprocess: bool, **args) -> List[Counter]:
if isinstance(words[0], list):
word_list_length = len(words[0])
words = list(chain.from_iterable(words))
words = self._parse(words, parse_func, multiprocess, **args)
words = list(zip_longest(*[iter(words)] * word_list_length))
words = [sum(w, Counter()) for w in words]
else:
self.words = [self.count(self.parse(x)) for x in words]
words = self._parse(words, parse_func, multiprocess, **args)
return words

def convert_weight(self, c: Counter) -> Dict[str, float]:
most_common = c.most_common()
_max_count = most_common[0][1]
weight = {k: v / _max_count for k, v in most_common}
weight = {k: weight[k] for k in list(weight.keys())}
return weight

def _parse(self, words: List[str], parse_func: Callable[..., List[str]], multiprocess: bool, **args) -> Union[List[Counter], List[List[Counter]]]:
if multiprocess:
return self._parallel_parse(words, function_wrapper(parse_func), **args)
return self._single_thread_parse(words, parse_func, **args)

def _single_thread_parse(self, words: List[str], parse_func: Callable[..., List[str]], **args) -> List[Counter]:
return [Counter(parse_func(x, **args)) for x in words]

def _parallel_parse(self, words: List[str], parse_func: Callable, **args) -> List[List[Counter]]:
parsed_words = Parallel(n_jobs=-1)([delayed(parse_func)(w, **dict(**a, **{'_index': i})) for i, (w, a) in enumerate(zip(words, repeat(args)))])
parsed_words.sort(key=lambda x: x[1])
parsed_words = [t[0] for t in parsed_words]
return parsed_words

def _init_data(self, data: Any) -> Tuple[List[str], List[str]]:
# TODO: set assert
words, names = [], []
if isinstance(data, list):
if isinstance(data[0], tuple):
if isinstance(data[0][1], pd.Series):
words = [' '.join(d.values.tolist()) for n, d in data]
words = [d.values.tolist() for n, d in data]
names = [n for n, d in data]
else:
words = [w for n, w in data]
Expand All @@ -38,7 +61,7 @@ def _init_data(self, data: Any) -> Tuple[List[str], List[str]]:
words = data
names = [f'word cloud {i+1}' for i in range(len(data))]
elif isinstance(data[0], pd.Series):
words = [' '.join(d.values.tolist()) for d in data]
words = [d.values.tolist() for d in data]
names = [d.name for d in data]
elif isinstance(data, str):
words = [data]
Expand All @@ -48,26 +71,12 @@ def _init_data(self, data: Any) -> Tuple[List[str], List[str]]:
names = [data[0]]
elif isinstance(data, pd.DataFrame):
names = data.columns.tolist()
words = [' '.join(data[x].values.tolist()) for x in names]
words = [data[x].values.tolist() for x in names]
elif isinstance(data, pd.Series):
words = [' '.join(data.values.tolist())]
words = [data.values.tolist()]
names = [data.name]

return words, names

def count(self, words: List[str]) -> Dict[str, float]:
c = Counter(words).most_common()
_max_count = c[0][1]
weight = {k: v / _max_count for k, v in c if k not in self.stop_words}
weight = {k: weight[k] for k in list(weight.keys())[:self.word_num]}
return weight

def parse(self, text: str) -> List[str]:
for x in ['"', ';', ',', '(', ')', '\u3000']:
text = text.replace(x, ' ')
text = text.lower()
return [x for x in self.parser.extract(text, extract_postags=self.extract_postags).words if len(x) > 1 and not self.num_regex.match(x)]

def __iter__(self):
for n, w in zip(self.names, self.words):
yield n, w
18 changes: 15 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ wordcloud = "*"
pandas = "*"
matplotlib = "*"
wurlitzer = "*"
joblib = "*"
japanize_matplotlib = "^1.1.1"

[tool.poetry.scripts]
Expand Down
19 changes: 19 additions & 0 deletions test/integration_test/test_cloudia_pandas_plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import sys
import pathlib
import traceback
import pandas as pd

if __name__ == '__main__':
current_dir = pathlib.Path(__file__).resolve().parent
sys.path.append(str(current_dir.parents[1]))
import cloudia # noqa

try:
for multiprocess in [True, False]:
pd.DataFrame({'test': ['hoge']}).wc.plot(multiprocess=multiprocess)
pd.DataFrame({'test': ['hoge']})['test'].wc.plot(multiprocess=multiprocess)
pd.Series(['hoge']).wc.plot(multiprocess=multiprocess)
except Exception:
traceback.print_exc()
sys.exit(1)
sys.exit(0)
24 changes: 24 additions & 0 deletions test/integration_test/test_cloudia_plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sys
import pathlib
import traceback
import pandas as pd

if __name__ == '__main__':
current_dir = pathlib.Path(__file__).resolve().parent
sys.path.append(str(current_dir.parents[1]))
from cloudia.main import Cloudia

try:
for multiprocess in [True, False]:
Cloudia([('test', pd.Series(['hoge']))], multiprocess=multiprocess).plot()
Cloudia([('test', 'hoge')], multiprocess=multiprocess).plot()
Cloudia(['hoge'], multiprocess=multiprocess).plot()
Cloudia([pd.Series(['hoge'])], multiprocess=multiprocess).plot()
Cloudia('hoge', multiprocess=multiprocess).plot()
Cloudia(('test', 'hoge'), multiprocess=multiprocess).plot()
Cloudia(pd.DataFrame({'test': ['hoge']}), multiprocess=multiprocess).plot()
Cloudia(pd.Series(['hoge']), multiprocess=multiprocess).plot()
except Exception:
traceback.print_exc()
sys.exit(1)
sys.exit(0)
Loading

0 comments on commit a0f645a

Please sign in to comment.