-
Notifications
You must be signed in to change notification settings - Fork 0
/
imagezmq.py
392 lines (302 loc) · 13.3 KB
/
imagezmq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
""" imagezmq: Transport OpenCV images via ZMQ.
Classes that transport OpenCV images from one computer to another. For example,
OpenCV images gathered by a Raspberry Pi camera could be sent to another
computer for displaying the images using cv2.imshow() or for further image
processing. See API and Usage Examples for details.
Copyright (c) 2019 by Jeff Bass.
License: MIT, see LICENSE for more details.
"""
import zmq
import numpy as np
import cv2
class ImageSender():
"""Opens a zmq socket and sends images
Opens a zmq (REQ or PUB) socket on the image sending computer, often a
Raspberry Pi, that will be sending OpenCV images and
related text messages to the hub computer. Provides methods to
send images or send jpg compressed images.
Two kinds of ZMQ message patterns are possible in imagezmq:
REQ/REP: an image is sent and the sender waits for a reply ("blocking").
PUB/SUB: an images is sent and no reply is sent or expected ("non-blocking").
There are advantabes and disadvantages for each message pattern.
See the documentation for a full description of REQ/REP and PUB/SUB.
The default is REQ/REP for the ImageSender class and the ImageHub class.
Arguments:
connect_to: the tcp address:port of the hub computer.
REQ_REP: (optional) if True (the default), a REQ socket will be created
if False, a PUB socket will be created
"""
def __init__(self, connect_to='tcp://127.0.0.1:5555', REQ_REP = True):
"""Initializes zmq socket for sending images to the hub.
Expects an appropriate ZMQ socket at the connect_to tcp:port address:
If REQ_REP is True (the default), then a REQ socket is created. It
must connect to a matching REP socket on the ImageHub().
If REQ_REP = False, then a PUB socket is created. It must connect to
a matching SUB socket on the ImageHub().
"""
if REQ_REP == True:
# REQ/REP mode, this is a blocking scenario
self.init_reqrep(connect_to)
else:
#PUB/SUB mode, non-blocking scenario
self.init_pubsub(connect_to)
def init_reqrep(self, address):
""" Creates and inits a socket in REQ/REP mode
"""
socketType = zmq.REQ
self.zmq_context = SerializingContext()
self.zmq_socket = self.zmq_context.socket(socketType)
self.zmq_socket.connect(address)
# Assign corresponding send methods for REQ/REP mode
self.send_image = self.send_image_reqrep
self.send_jpg = self.send_jpg_reqrep
def init_pubsub(self, address):
"""Creates and inits a socket in PUB/SUB mode
"""
socketType = zmq.PUB
self.zmq_context = SerializingContext()
self.zmq_socket = self.zmq_context.socket(socketType)
self.zmq_socket.bind(address)
# Assign corresponding send methods for PUB/SUB mode
self.send_image = self.send_image_pubsub
self.send_jpg = self.send_jpg_pubsub
def send_image(self, msg, image):
""" This is a placeholder. This method will be set to either a REQ/REP
or PUB/SUB sending method, depending on REQ_REP option value.
Arguments:
msg: text message or image name.
image: OpenCV image to send to hub.
Returns:
A text reply from hub in REQ/REP mode or nothing in PUB/SUB mode.
"""
pass
def send_image_reqrep(self, msg, image):
"""Sends OpenCV image and msg to hub computer in REQ/REP mode
Arguments:
msg: text message or image name.
image: OpenCV image to send to hub.
Returns:
A text reply from hub.
"""
if image.flags['C_CONTIGUOUS']:
# if image is already contiguous in memory just send it
self.zmq_socket.send_array(image, msg, copy=False)
else:
# else make it contiguous before sending
image = np.ascontiguousarray(image)
self.zmq_socket.send_array(image, msg, copy=False)
hub_reply = self.zmq_socket.recv() # receive the reply message
return hub_reply
def send_image_pubsub(self, msg, image):
"""Sends OpenCV image and msg hub computer in PUB/SUB mode. If
there is no hub computer subscribed to this socket, then image and msg
are discarded.
Arguments:
msg: text message or image name.
image: OpenCV image to send to hub.
Returns:
Nothing; there is no reply from hub computer in PUB/SUB mode
"""
if image.flags['C_CONTIGUOUS']:
# if image is already contiguous in memory just send it
self.zmq_socket.send_array(image, msg, copy=False)
else:
# else make it contiguous before sending
image = np.ascontiguousarray(image)
self.zmq_socket.send_array(image, msg, copy=False)
def send_jpg(self, msg, jpg_buffer):
"""This is a placeholder. This method will be set to either a REQ/REP
or PUB/SUB sending method, depending on REQ_REP option value.
Arguments:
msg: image name or message text.
jpg_buffer: bytestring containing the jpg image to send to hub.
Returns:
A text reply from hub in REQ/REP mode or nothing in PUB/SUB mode.
"""
def send_jpg_reqrep(self, msg, jpg_buffer):
"""Sends msg text and jpg buffer to hub computer in REQ/REP mode.
Arguments:
msg: image name or message text.
jpg_buffer: bytestring containing the jpg image to send to hub.
Returns:
A text reply from hub.
"""
self.zmq_socket.send_jpg(msg, jpg_buffer, copy=False)
hub_reply = self.zmq_socket.recv() # receive the reply message
return hub_reply
def send_jpg_pubsub(self, msg, jpg_buffer):
"""Sends msg text and jpg buffer to hub computer in PUB/SUB mode. If
there is no hub computer subscribed to this socket, then image and msg
are discarded.
Arguments:
msg: image name or message text.
jpg_buffer: bytestring containing the jpg image to send to hub.
Returns:
Nothing; there is no reply from the hub computer in PUB/SUB mode.
"""
self.zmq_socket.send_jpg(msg, jpg_buffer, copy=False)
class ImageHub():
"""Opens a zmq socket and receives images
Opens a zmq (REP or SUB) socket on the hub computer, for example,
a Mac, that will be receiving and displaying or processing OpenCV images
and related text messages. Provides methods to receive images or receive
jpg compressed images.
Two kinds of ZMQ message patterns are possible in imagezmq:
REQ/REP: an image is sent and the sender waits for a reply ("blocking").
PUB/SUB: an images is sent and no reply is sent or expected ("non-blocking").
There are advantabes and disadvantages for each message pattern.
See the documentation for a full description of REQ/REP and PUB/SUB.
The default is REQ/REP for the ImageSender class and the ImageHub class.
Arguments:
open_port: (optional) the socket to open for receiving REQ requests or
socket to connect to for SUB requests.
REQ_REP: (optional) if True (the default), a REP socket will be created
if False, a SUB socket will be created
"""
def __init__(self, open_port='tcp://*:5555', REQ_REP = True):
"""Initializes zmq socket to receive images and text.
Expects an appropriate ZMQ socket at the senders tcp:port address:
If REQ_REP is True (the default), then a REP socket is created. It
must connect to a matching REQ socket on the ImageSender().
If REQ_REP = False, then a SUB socket is created. It must connect to
a matching PUB socket on the ImageSender().
"""
self.REQ_REP = REQ_REP
if REQ_REP ==True:
#Init REP socket for blocking mode
self.init_reqrep(open_port)
else:
#Connect to PUB socket for non-blocking mode
self.init_pubsub(open_port)
def init_reqrep(self, address):
""" Initializes Hub in REQ/REP mode
"""
socketType = zmq.REP
self.zmq_context = SerializingContext()
self.zmq_socket = self.zmq_context.socket(socketType)
self.zmq_socket.bind(address)
def init_pubsub(self, address):
""" Initialize Hub in PUB/SUB mode
"""
socketType = zmq.SUB
self.zmq_context = SerializingContext()
self.zmq_socket = self.zmq_context.socket(socketType)
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
self.zmq_socket.connect(address)
def connect(self, open_port):
"""In PUB/SUB mode, the hub can connect to multiple senders at the same
time.
Use this method to connect (and subscribe) to additional senders.
Arguments:
open_port: the PUB socket to connect to.
"""
if self.REQ_REP == False:
#This makes sense only in PUB/SUB mode
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
self.zmq_socket.connect(open_port)
self.zmq_socket.subscribe(b'')
return
def recv_image(self, copy=False):
"""Receives OpenCV image and text msg.
Arguments:
copy: (optional) zmq copy flag.
Returns:
msg: text msg, often the image name.
image: OpenCV image.
"""
msg, image = self.zmq_socket.recv_array(copy=False)
return msg, image
def recv_jpg(self, copy=False):
"""Receives text msg, jpg buffer.
Arguments:
copy: (optional) zmq copy flag
Returns:
msg: text message, often image name
jpg_buffer: bytestring jpg compressed image
"""
msg, jpg_buffer = self.zmq_socket.recv_jpg(copy=False)
return msg, jpg_buffer
def send_reply(self, reply_message=b'OK'):
"""Sends the zmq REP reply message.
Arguments:
reply_message: reply message text, often just string 'OK'
"""
self.zmq_socket.send(reply_message)
class SerializingSocket(zmq.Socket):
"""Numpy array serialization methods.
Modelled on PyZMQ serialization examples.
Used for sending / receiving OpenCV images, which are Numpy arrays.
Also used for sending / receiving jpg compressed OpenCV images.
"""
def send_array(self, A, msg='NoName', flags=0, copy=True, track=False):
"""Sends a numpy array with metadata and text message.
Sends a numpy array with the metadata necessary for reconstructing
the array (dtype,shape). Also sends a text msg, often the array or
image name.
Arguments:
A: numpy array or OpenCV image.
msg: (optional) array name, image name or text message.
flags: (optional) zmq flags.
copy: (optional) zmq copy flag.
track: (optional) zmq track flag.
"""
md = dict(
msg=msg,
dtype=str(A.dtype),
shape=A.shape,
)
self.send_json(md, flags | zmq.SNDMORE)
return self.send(A, flags, copy=copy, track=track)
def send_jpg(self,
msg='NoName',
jpg_buffer=b'00',
flags=0,
copy=True,
track=False):
"""Send a jpg buffer with a text message.
Sends a jpg bytestring of an OpenCV image.
Also sends text msg, often the image name.
Arguments:
msg: image name or text message.
jpg_buffer: jpg buffer of compressed image to be sent.
flags: (optional) zmq flags.
copy: (optional) zmq copy flag.
track: (optional) zmq track flag.
"""
md = dict(msg=msg, )
self.send_json(md, flags | zmq.SNDMORE)
return self.send(jpg_buffer, flags, copy=copy, track=track)
def recv_array(self, flags=0, copy=True, track=False):
"""Receives a numpy array with metadata and text message.
Receives a numpy array with the metadata necessary
for reconstructing the array (dtype,shape).
Returns the array and a text msg, often the array or image name.
Arguments:
flags: (optional) zmq flags.
copy: (optional) zmq copy flag.
track: (optional) zmq track flag.
Returns:
msg: image name or text message.
A: numpy array or OpenCV image reconstructed with dtype and shape.
"""
md = self.recv_json(flags=flags)
msg = self.recv(flags=flags, copy=copy, track=track)
A = np.frombuffer(msg, dtype=md['dtype'])
return (md['msg'], A.reshape(md['shape']))
def recv_jpg(self, flags=0, copy=True, track=False):
"""Receives a jpg buffer and a text msg.
Receives a jpg bytestring of an OpenCV image.
Also receives a text msg, often the image name.
Arguments:
flags: (optional) zmq flags.
copy: (optional) zmq copy flag.
track: (optional) zmq track flag.
Returns:
msg: image name or text message.
jpg_buffer: bytestring, containing jpg image.
"""
md = self.recv_json(flags=flags) # metadata text
jpg_buffer = self.recv(flags=flags, copy=copy, track=track)
return (md['msg'], jpg_buffer)
class SerializingContext(zmq.Context):
_socket_class = SerializingSocket