-
Notifications
You must be signed in to change notification settings - Fork 1
/
FooThread.py
313 lines (257 loc) · 9.03 KB
/
FooThread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Refactored by Malcolm Jones to work with GTK+3 PyGobject( aka PyGI ). Mar 2016.
# Demo application showing how once can combine the python
# threading module with GObject signals to make a simple thread
# manager class which can be used to stop horrible blocking GUIs.
#
# (c) 2008, John Stowers <[email protected]>
#
# This program serves as an example, and can be freely used, copied, derived
# and redistributed by anyone. No warranty is implied or given.
import os
import sys
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GObject
GObject.threads_init()
import threading
import time
import random
import StringIO
import re
import ConfigParser
from signal import signal, SIGWINCH, SIGKILL, SIGTERM
from IPython.core.debugger import Tracer
from IPython.core import ultratb
sys.excepthook = ultratb.FormattedTB(mode='Verbose',
color_scheme='Linux',
call_pdb=True,
ostream=sys.__stdout__)
from colorlog import ColoredFormatter
import logging
from gettext import gettext as _
import traceback
from functools import wraps
import Queue
import pprint
pp = pprint.PrettyPrinter(indent=4)
def setup_logger():
"""Return a logger with a default ColoredFormatter."""
formatter = ColoredFormatter(
"(%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s %(message_log_color)s%(message)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
},
secondary_log_colors={
'message': {
'ERROR': 'red',
'CRITICAL': 'red',
'DEBUG': 'yellow'
}
},
style='%'
)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
def trace(func):
"""Tracing wrapper to log when function enter/exit happens.
:param func: Function to wrap
:type func: callable
"""
@wraps(func)
def wrapper(*args, **kwargs):
logger.debug('Start {!r}'. format(func.__name__))
result = func(*args, **kwargs)
logger.debug('End {!r}'. format(func.__name__))
return result
return wrapper
# Create a player
logger = setup_logger()
class _IdleObject(GObject.GObject):
"""
Override GObject.GObject to always emit signals in the main thread
by emmitting on an idle handler
"""
@trace
def __init__(self):
GObject.GObject.__init__(self)
@trace
def emit(self, *args):
GObject.idle_add(GObject.GObject.emit, self, *args)
class _FooThread(threading.Thread, _IdleObject):
"""
Cancellable thread which uses gobject signals to return information
to the GUI.
"""
__gsignals__ = {
"completed": (
GObject.SignalFlags.RUN_LAST, None, []),
"progress": (
GObject.SignalFlags.RUN_LAST, None, [
GObject.TYPE_FLOAT]) # percent complete
}
@trace
def __init__(self, *args):
threading.Thread.__init__(self)
_IdleObject.__init__(self)
self.cancelled = False
self.data = args[0]
self.name = args[1]
self.setName(f"{self.name}")
@trace
def cancel(self):
"""
Threads in python are not cancellable, so we implement our own
cancellation logic
"""
self.cancelled = True
@trace
def run(self):
print "Running %s" % str(self)
for i in range(self.data):
if self.cancelled:
break
time.sleep(0.1)
self.emit("progress", i / float(self.data) * 100)
self.emit("completed")
class FooThreadManager:
"""
Manages many FooThreads. This involves starting and stopping
said threads, and respecting a maximum num of concurrent threads limit
"""
@trace
def __init__(self, maxConcurrentThreads):
self.maxConcurrentThreads = maxConcurrentThreads
# stores all threads, running or stopped
self.fooThreads = {}
# the pending thread args are used as an index for the stopped threads
self.pendingFooThreadArgs = []
@trace
def _register_thread_completed(self, thread, *args):
"""
Decrements the count of concurrent threads and starts any
pending threads if there is space
"""
del(self.fooThreads[args])
running = len(self.fooThreads) - len(self.pendingFooThreadArgs)
print "%s completed. %s running, %s pending" % (
thread, running, len(self.pendingFooThreadArgs))
if running < self.maxConcurrentThreads:
try:
args = self.pendingFooThreadArgs.pop()
print "Starting pending %s" % self.fooThreads[args]
self.fooThreads[args].start()
except IndexError:
pass
@trace
def make_thread(self, completedCb, progressCb, userData, *args):
"""
Makes a thread with args. The thread will be started when there is
a free slot
"""
pp.pprint('BOSSJONES - pp.pprint userData')
pp.pprint(userData)
running = len(self.fooThreads) - len(self.pendingFooThreadArgs)
if args not in self.fooThreads:
thread = _FooThread(*args)
# signals run in the order connected. Connect the user completed
# callback first incase they wish to do something
# before we delete the thread
thread.connect("completed", completedCb, userData)
thread.connect("completed", self._register_thread_completed, *args)
thread.connect("progress", progressCb, userData)
# This is why we use args, not kwargs, because args are hashable
self.fooThreads[args] = thread
if running < self.maxConcurrentThreads:
print "Starting %s" % thread
self.fooThreads[args].start()
else:
print "Queing %s" % thread
self.pendingFooThreadArgs.append(args)
@trace
def stop_all_threads(self, block=False):
"""
Stops all threads. If block is True then actually wait for the thread
to finish (may block the UI)
"""
for thread in self.fooThreads.values():
thread.cancel()
if block and thread.isAlive():
thread.join()
class Demo:
@trace
def __init__(self):
# build the GUI
win = Gtk.Window()
win.connect("delete_event", self.quit)
box = Gtk.VBox(False, 4)
win.add(box)
addButton = Gtk.Button("Add Thread")
addButton.connect("clicked", self.add_thread)
box.pack_start(addButton, False, False, 0)
stopButton = Gtk.Button("Stop All Threads")
stopButton.connect("clicked", self.stop_threads)
box.pack_start(stopButton, False, False, 0)
# display threads in a treeview
self.pendingModel = Gtk.ListStore(
GObject.TYPE_STRING, GObject.TYPE_INT)
self.completeModel = Gtk.ListStore(GObject.TYPE_STRING)
self._make_view(self.pendingModel, "Pending Threads", True, box)
self._make_view(self.completeModel, "Completed Threads", False, box)
# THE ACTUAL THREAD BIT
self.manager = FooThreadManager(3)
# Start the demo
win.show_all()
Gtk.main()
@trace
def _make_view(self, model, title, showProgress, vbox):
view = Gtk.TreeView(model)
view.append_column(Gtk.TreeViewColumn(
title, Gtk.CellRendererText(), text=0))
if showProgress:
view.append_column(Gtk.TreeViewColumn(
"Progress", Gtk.CellRendererProgress(), value=1))
vbox.pack_start(view, True, True, 0)
@trace
def quit(self, sender, event):
self.manager.stop_all_threads(block=True)
Gtk.main_quit()
@trace
def stop_threads(self, *args):
# THE ACTUAL THREAD BIT
self.manager.stop_all_threads()
@trace
def add_thread(self, sender):
# make a thread and start it
data = random.randint(20, 60)
name = f"Thread #{random.randint(0, 1000)}"
rowref = self.pendingModel.insert(0, (name, 0))
# THE ACTUAL THREAD BIT
self.manager.make_thread(
self.thread_finished, # completedCb
self.thread_progress, # progressCb
rowref, # userData
data, # args[0]
name) # args[1]
@trace
def thread_finished(self, thread, rowref):
self.pendingModel.remove(rowref)
self.completeModel.insert(0, (thread.name,))
@trace
def thread_progress(self, thread, progress, rowref):
self.pendingModel.set_value(rowref, 1, int(progress))
if __name__ == "__main__":
demo = Demo()