forked from AlLongley/py-scrcpy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrcpy_client_tests.py
96 lines (77 loc) · 2.7 KB
/
scrcpy_client_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import scrcpy_client
import io
import sys
import numpy as np
import time
import socket
import unittest
import logging
SHOWFRAMES = False
#~136 frames from the game HyperFlex, with PTS meta enabled
MOCKFILE = "mocksession_hflex_withmeta"
#These were the settings enabled during the saved session
scrcpy_client.SVR_maxSize = 600
scrcpy_client.SVR_bitRate = 999999999
scrcpy_client.SVR_tunnelForward = "true"
scrcpy_client.SVR_crop = "9999:9999:0:0"
scrcpy_client.SVR_sendFrameMeta = "true"
if SHOWFRAMES:
try:
import cv2
except ImportError:
SHOWFRAMES = False
class MockSocket():
'''
Replay a previously recorded socket session from a file
'''
def __init__(self, *args):
#print("Starting Mocked Socket",str(args))
self.filename=MOCKFILE
self.fd = None
def connect(self, *args):
#print("Connecting Mocked Socket",str(args))
self.fd = open(self.filename,'rb')
def recv(self, buffersize,*args):
ret = self.fd.read(buffersize)
return ret
def __del__(self):
if self.fd:
self.fd.close()
class TestClientMockConnect(unittest.TestCase):
def setUp(self):
self.SCRCPY = scrcpy_client.SCRCPY_client()
# Replace the socket with our mock filebased "socket"
scrcpy_client.socket.socket = MockSocket
self.assertTrue(self.SCRCPY.connect())
self.assertTrue(self.SCRCPY.start_processing())
#Give FFmpeg a moment to catch up
time.sleep(2.0)
def test_resolution_recieved(self):
self.assertTrue(self.SCRCPY.WIDTH>1)
self.assertTrue(self.SCRCPY.HEIGHT>1)
def test_ffmpeg_running(self):
self.assertIs(self.SCRCPY.ffm.poll(), None)
'''
def test_ffmpeg_detected_stream(self):
ffinfo = ''.join(self.SCRCPY.FFmpeg_info)
self.assertTrue("Stream #0:0 -> #0:0 (h264 -> rawvideo)" in ffinfo)
'''
def test_frames_recieved(self):
frames_counted = 0
while True:
frm=self.SCRCPY.get_next_frame()
if isinstance(frm, (np.ndarray)):
self.assertEqual(len(frm.shape),3)
self.assertTrue(sum(frm.shape)>3)
frames_counted+=1
if SHOWFRAMES:
frm = cv2.cvtColor(frm, cv2.COLOR_RGB2BGR)
cv2.imshow("image", frm)
cv2.waitKey(1000//60) # CAP 60FPS
else:
break
self.assertTrue(frames_counted>10)
#print("Recieved:",frames_counted)
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
unittest.main()