diff --git a/sender/webcam.py b/sender/webcam.py index c63fd30..9d84cde 100644 --- a/sender/webcam.py +++ b/sender/webcam.py @@ -68,6 +68,9 @@ def parameters(self): interval_re = re.compile( rb"\t\t\tInterval: Discrete [0-9.]+s \([0-9]+\.0+ fps\)\Z" ) + frac_interval_re = re.compile( + rb"\t\t\tInterval: Discrete [0-9.]+s \([0-9]+\.0*[1-9][0-9]* fps\)\Z" + ) proc = subprocess.run( ("v4l2-ctl", "--list-formats-ext"), stdout=subprocess.PIPE, @@ -86,6 +89,9 @@ def parameters(self): elif interval_re.match(i): fps = int(i[22:].split(b"(", 1)[1].split(b".", 1)[0]) formats.append((width, height, fps, {"fmt": fmt})) + elif frac_interval_re.match(i): + # factional FPS not supported + continue elif i in ( b"", b"ioctl: VIDIOC_ENUM_FMT", diff --git a/tests/qvctests/integ.py b/tests/qvctests/integ.py index 0661b37..fb41da6 100644 --- a/tests/qvctests/integ.py +++ b/tests/qvctests/integ.py @@ -8,18 +8,20 @@ class TC_00_QVCTest(qubes.tests.extra.ExtraTestCase): def setUp(self): super(TC_00_QVCTest, self).setUp() - self.screenshare, self.proxy, self.view = self.create_vms( - ["share", "proxy", "view"]) - self.screenshare.start() - if self.screenshare.run('which qubes-video-companion', wait=True) != 0: + self.source, self.view = self.create_vms( + ["source", "view"]) + self.source.start() + if self.source.run('which qubes-video-companion', wait=True) != 0: self.skipTest('qubes-video-companion not installed') def wait_for_video0(self, vm): - vm.run( + retcode = vm.run( 'for i in `seq 30`; do ' ' v4l2-ctl --list-formats /dev/video0 2>/dev/null | grep -F "[0]" && break; ' ' sleep 0.5; ' - 'done; sleep 1', wait=True) + 'done; sleep 1; test -e /dev/video0', wait=True) + self.assertEqual(retcode, 0, + f"Timeout waiting for /dev/video0 in {vm.name}") def wait_for_video0_disconnect(self, vm): vm.run( @@ -37,12 +39,12 @@ def click_stop(self, vm, name): # send keys in separate call, to not send them just to the icon window vm.run('xdotool key Up Return', wait=True) - def capture_from_video(self, vm): + def capture_from_video(self, vm, extra_caps=""): # capture in destination, use gstreamer as it is installed already: gst_command = ( 'gst-launch-1.0 --quiet v4l2src num-buffers=1 ' '! videoconvert ' - '! video/x-raw,format=RGB ' + f'! video/x-raw,format=I420{extra_caps} ' '! fdsink' ) return vm.run( @@ -54,8 +56,6 @@ def capture_from_screen(self, vm): '! capsfilter caps=video/x-raw,format=BGRx,colorimetry=2:4:7:1 ' '! videoconvert ' '! video/x-raw,format=I420 ' - '! videoconvert ' - '! video/x-raw,format=RGB ' '! fdsink' ) return vm.run( @@ -77,7 +77,7 @@ def test_010_screenshare(self): self.qrexec_policy('qvc.ScreenShare', self.view.name, '@default', - target=self.screenshare.name) + target=self.source.name) p = self.view.run('qubes-video-companion screenshare', passio_popen=True, passio_stderr=True) # wait for device to appear, or a timeout @@ -88,11 +88,11 @@ def test_010_screenshare(self): p.returncode, *p.communicate())) # capture in source: - source_image = self.capture_from_screen(self.screenshare) + source_image = self.capture_from_screen(self.source) destination_image = self.capture_from_video(self.view) diff = self.compare_images(source_image, destination_image) self.assertLess(diff, 2.0) - self.click_stop(self.screenshare, 'screenshare') + self.click_stop(self.source, 'screenshare') # wait for device to disappear, or a timeout self.wait_for_video0_disconnect(self.view) stdout, stderr = p.communicate() @@ -104,65 +104,74 @@ def test_010_screenshare(self): print(stdout) print(stderr) + def apply_mask(self, image, width=640, height=480): + """Mask dynamic parts of vivid device output + + There are two: + - timestamp at 16,16 -> 170,32 + - counter + text movement at 16,96 -> 420,116 + + The image is assumed to be in I420 format. The mask is applied to + luminance channel only (which is the first 640*480 bytes). + """ + + image = list(image) + + def fill_black(x1, y1, x2, y2): + for y in range(y1, y2): + start = y*width+x1 + end = y*width+x2 + image[start:end] = bytes(end-start) + fill_black(16, 16, 170, 32) + fill_black(16, 96, 420, 116) + + return bytes(image) + + def test_020_webcam(self): - """Two stages test: screen share and then webcam + """Webcam test - screenshare -> proxy (screenshare), then proxy -> view (webcam) + source -> view (webcam) """ - self.proxy.start() - self.loop.run_until_complete(self.wait_for_session(self.proxy)) + self.loop.run_until_complete(self.wait_for_session(self.source)) self.view.start() self.loop.run_until_complete(self.wait_for_session(self.view)) - self.qrexec_policy('qvc.ScreenShare', - self.proxy.name, - '@default', - target=self.screenshare.name) self.qrexec_policy('qvc.Webcam', self.view.name, '@default', - target=self.proxy.name) - p = self.proxy.run('qubes-video-companion screenshare', - passio_popen=True, passio_stderr=True) + target=self.source.name) + ret = self.source.run("modprobe vivid", user="root", wait=True) + if ret != 0: + self.skipTest("Cannot load 'vivid' module") # wait for device to appear, or a timeout - self.wait_for_video0(self.proxy) - self.loop.run_until_complete(asyncio.sleep(3)) - if p.returncode is not None: - self.fail("'qubes-video-companion screenshare' exited early ({}): {} {}".format( - p.returncode, *p.communicate())) - # check if the screenshare device works - frame = self.capture_from_video(self.proxy) - self.assertTrue(frame) + self.wait_for_video0(self.source) - p2 = self.view.run('qubes-video-companion webcam', + p2 = self.view.run('qubes-video-companion -r 640x480x30 webcam', passio_popen=True, passio_stderr=True) self.wait_for_video0(self.view) if p2.returncode is not None: self.fail("'qubes-video-companion webcam' exited early ({}): {} {}".format( p2.returncode, *p2.communicate())) - source_image = self.capture_from_screen(self.screenshare) destination_image = self.capture_from_video(self.view) - diff = self.compare_images(source_image, destination_image) - self.assertLess(diff, 2.5) - self.click_stop(self.proxy, 'webcam') - self.click_stop(self.screenshare, 'screenshare') - self.wait_for_video0_disconnect(self.proxy) + destination_image = self.apply_mask(destination_image) + self.click_stop(self.source, 'webcam') + self.wait_for_video0_disconnect(self.view) stdout, stderr = p2.communicate() if p2.returncode != 0: - self.fail("'qubes-video-companion screenshare' failed ({}): {} {}".format( + self.fail("'qubes-video-companion webcam' failed ({}): {} {}".format( p2.returncode, stdout, stderr)) else: # just print print(stdout) print(stderr) - stdout, stderr = p.communicate() - if p.returncode != 0: - self.fail("'qubes-video-companion screenshare' failed ({}): {} {}".format( - p.returncode, stdout, stderr)) - else: - # just print - print(stdout) - print(stderr) + + # vivid supports only one client at a time, so capture source only + # after QVC disconnects + source_image = self.capture_from_video(self.source, ",width=640,height=480") + source_image = self.apply_mask(source_image) + diff = self.compare_images(source_image, destination_image) + self.assertLess(diff, 2.5) def list_tests():