From 5ec65e2ab09175981a0ab2136cf052303ae01f87 Mon Sep 17 00:00:00 2001 From: Gold87 <91761103+Gold872@users.noreply.github.com> Date: Sat, 14 Dec 2024 14:03:35 -0500 Subject: [PATCH] Camera stream optimizations (#156) Simplified the logic for the mjpeg stream decoding and display, along with adding text for bandwidth and FPS * Use stream builder instead of flutter hooks * Remove StreamManager, instead have one change notifier to hold the state and decoding streams * Only start a camera stream when the widget becomes focused * Rebuild the widget when focus changes, automatically restarting streams properly --- lib/services/log.dart | 4 + lib/widgets/mjpeg.dart | 491 ++++++++---------- .../nt_widgets/multi-topic/camera_stream.dart | 85 +-- pubspec.lock | 8 - pubspec.yaml | 1 - .../multi-topic/camera_stream_test.dart | 41 ++ 6 files changed, 327 insertions(+), 303 deletions(-) diff --git a/lib/services/log.dart b/lib/services/log.dart index 8343523e..8cdafd3a 100644 --- a/lib/services/log.dart +++ b/lib/services/log.dart @@ -62,6 +62,10 @@ class Log { void debug(dynamic message, [dynamic error, StackTrace? stackTrace]) { log(Level.debug, message, error, stackTrace); } + + void trace(dynamic message, [dynamic error, StackTrace? stackTrace]) { + log(Level.trace, message, error, stackTrace); + } } Log get logger => Log.instance; diff --git a/lib/widgets/mjpeg.dart b/lib/widgets/mjpeg.dart index fc6c1dca..3fa17c61 100644 --- a/lib/widgets/mjpeg.dart +++ b/lib/widgets/mjpeg.dart @@ -4,32 +4,10 @@ import 'dart:io'; import 'package:flutter/foundation.dart'; import 'package:flutter/material.dart'; -import 'package:flutter_hooks/flutter_hooks.dart'; import 'package:http/http.dart'; import 'package:visibility_detector/visibility_detector.dart'; -class _MjpegStateNotifier extends ChangeNotifier { - bool _mounted = true; - bool _visible = true; - - _MjpegStateNotifier() : super(); - - bool get mounted => _mounted; - - bool get visible => _visible; - - set visible(value) { - _visible = value; - notifyListeners(); - } - - @override - void dispose() { - _mounted = false; - notifyListeners(); - super.dispose(); - } -} +import 'package:elastic_dashboard/services/log.dart'; /// A preprocessor for each JPEG frame from an MJPEG stream. class MjpegPreprocessor { @@ -37,107 +15,132 @@ class MjpegPreprocessor { } /// An Mjpeg. -class Mjpeg extends HookWidget { - final streamKey = UniqueKey(); - final MjpegStreamState mjpegStream; +class Mjpeg extends StatefulWidget { + final MjpegController controller; final BoxFit? fit; + final bool expandToFit; final double? width; final double? height; final WidgetBuilder? loading; final Widget Function(BuildContext contet, dynamic error, dynamic stack)? error; - Mjpeg({ - required this.mjpegStream, + const Mjpeg({ + required this.controller, this.width, this.height, this.fit, + this.expandToFit = false, this.error, this.loading, super.key, }); + @override + State createState() => _MjpegState(); +} + +class _MjpegState extends State { + final streamKey = UniqueKey(); + + late void Function() listener; + + @override + void initState() { + listener = () => setState(() {}); + widget.controller.addListener(listener); + super.initState(); + } + + @override + void dispose() { + widget.controller.removeListener(listener); + + widget.controller.setMounted(streamKey, false); + widget.controller.setVisible(streamKey, false); + + super.dispose(); + } + + @override + void didUpdateWidget(Mjpeg oldWidget) { + final controller = widget.controller; + final oldController = oldWidget.controller; + + if (oldController != controller) { + oldController.removeListener(listener); + controller.addListener(listener); + + controller.setMounted(streamKey, oldController.isMounted(streamKey)); + controller.setVisible(streamKey, oldController.isVisible(streamKey)); + } + super.didUpdateWidget(oldWidget); + } + @override Widget build(BuildContext context) { - final image = useState(null); - final state = useMemoized(() => _MjpegStateNotifier()); - final visible = useListenable(state); - final errorState = useState?>(null); - isMounted() => context.mounted; - - final manager = useMemoized( - () => _StreamManager( - mjpegStream: mjpegStream, - mounted: isMounted, - visible: () => visible.visible, - ), - [ - visible.visible, - isMounted(), - mjpegStream, - ]); - - final key = useMemoized(() => UniqueKey(), [manager]); - - useEffect(() { - errorState.value = null; - manager.updateStream(streamKey, image, errorState); - return () { - if (visible.visible && isMounted()) { - return; - } - mjpegStream.cancelSubscription(streamKey); - }; - }, [manager]); + final controller = widget.controller; + + controller.setMounted(streamKey, context.mounted); + + if (controller.isVisible(streamKey)) { + controller.startStream(); + } - if (errorState.value != null && kDebugMode) { + if (controller.errorState.value != null && kDebugMode) { return SizedBox( - width: width, - height: height, - child: error == null + width: widget.width, + height: widget.height, + child: widget.error == null ? Center( child: Padding( padding: const EdgeInsets.all(8.0), child: Text( - '${errorState.value}', + '${controller.errorState.value}', textAlign: TextAlign.center, style: const TextStyle(color: Colors.red), ), ), ) - : error!(context, errorState.value!.first, errorState.value!.last), + : widget.error!(context, controller.errorState.value!.first, + controller.errorState.value!.last), ); } - if ((image.value == null && mjpegStream.previousImage == null) || - errorState.value != null) { - return SizedBox( - width: width, - height: height, - child: loading == null - ? const Center(child: CircularProgressIndicator()) - : loading!(context)); - } - return VisibilityDetector( - key: key, - child: Image( - image: image.value ?? mjpegStream.previousImage!, - width: width, - height: height, - gaplessPlayback: true, - fit: fit, - ), + key: streamKey, + child: StreamBuilder?>( + stream: controller.imageStream.stream, + builder: (context, snapshot) { + if ((snapshot.data == null && controller.previousImage == null) || + controller.errorState.value != null) { + return SizedBox( + width: widget.width, + height: widget.height, + child: widget.loading?.call(context) ?? + const Center(child: CircularProgressIndicator()), + ); + } + + return Image.memory( + Uint8List.fromList(snapshot.data ?? controller.previousImage!), + width: widget.width, + height: widget.height, + gaplessPlayback: true, + fit: widget.fit, + scale: (widget.expandToFit) ? 1e-6 : 1.0, + ); + }), onVisibilityChanged: (VisibilityInfo info) { - if (visible.mounted) { - visible.visible = info.visibleFraction != 0; + if (controller.isMounted(streamKey)) { + controller.setVisible(streamKey, info.visibleFraction != 0); } }, ); } } -class MjpegStreamState { +class MjpegController extends ChangeNotifier { static const _trigger = 0xFF; static const _soi = 0xD8; static const _eoi = 0xD9; @@ -147,220 +150,188 @@ class MjpegStreamState { final Duration timeout; final Map headers; Client httpClient = Client(); - Stream>? byteStream; + + StreamSubscription>? _rawSubscription; + + ValueNotifier bandwidth = ValueNotifier(0); + ValueNotifier framesPerSecond = ValueNotifier(0); + + Timer? _metricsTimer; + + int _bitCount = 0; + int _frameCount = 0; + + ValueNotifier?> errorState = ValueNotifier(null); + StreamController?> imageStream = StreamController.broadcast(); + List? previousImage; final MjpegPreprocessor? preprocessor; - MemoryImage? previousImage; + final Set _mountedKeys = {}; + final Set _visibleKeys = {}; + + bool isVisible(Key key) => _visibleKeys.contains(key); - final Map _subscriptions = {}; + void setVisible(Key key, bool value) { + logger.trace('Setting visibility to $value for $stream'); + if (value) { + bool hasChanged = !_visibleKeys.contains(key); + _visibleKeys.add(key); - StreamSubscription? _bitSubscription; - int bitCount = 0; - double bandwidth = 0.0; + if (hasChanged) { + logger.trace( + 'Visibility changed to true, notifying listeners for mjpeg stream'); + notifyListeners(); + } + } else { + _visibleKeys.remove(key); + + if (_visibleKeys.isEmpty) { + stopStream(); + } + } + } - late final Timer bandwidthTimer; + bool isMounted(Key key) => _mountedKeys.contains(key); - MjpegStreamState({ + void setMounted(Key key, bool value) { + logger.trace('Setting mounted to $value for $stream'); + if (value) { + _mountedKeys.add(key); + } else { + _mountedKeys.remove(key); + } + } + + bool get isStreaming => _rawSubscription != null; + + MjpegController({ required this.stream, this.isLive = true, this.timeout = const Duration(seconds: 5), this.headers = const {}, this.preprocessor, }) { - bandwidthTimer = Timer.periodic(const Duration(seconds: 1), (timer) { - bandwidth = bitCount / 1e6; + errorState.addListener(notifyListeners); + } - bitCount = 0; - }); + @override + void dispose() { + errorState.removeListener(notifyListeners); + stopStream(); + imageStream.close(); + super.dispose(); } - void dispose({bool deleting = false}) { - for (StreamSubscription subscription in _subscriptions.values) { - subscription.cancel(); + void startStream() async { + if (isStreaming) { + return; + } + logger.debug('Starting camera stream on URL $stream'); + Stream>? byteStream; + try { + final request = Request('GET', Uri.parse(stream)); + request.headers.addAll(headers); + final response = await httpClient.send(request).timeout( + timeout); //timeout is to prevent process to hang forever in some case + + if (response.statusCode >= 200 && response.statusCode < 300) { + byteStream = response.stream; + } else { + if (_mountedKeys.isNotEmpty) { + errorState.value = [ + HttpException('Stream returned ${response.statusCode} status'), + StackTrace.current + ]; + imageStream.add(null); + } + stopStream(); + } + } catch (error, stack) { + // we ignore those errors in case play/pause is triggers + if (!error + .toString() + .contains('Connection closed before full header was received')) { + if (_mountedKeys.isNotEmpty) { + errorState.value = [error, stack]; + imageStream.add(null); + } + } } - _subscriptions.clear(); - _bitSubscription?.cancel(); - _bitSubscription = null; - byteStream = null; - httpClient.close(); - httpClient = Client(); - bitCount = 0; - if (deleting) { - bandwidthTimer.cancel(); + if (byteStream == null) { + return; } - } - void cancelSubscription(Key key) { - if (_subscriptions.containsKey(key)) { - _subscriptions.remove(key)!.cancel(); + var buffer = []; + _rawSubscription = byteStream.listen((data) { + _bitCount += data.length * Uint8List.bytesPerElement * 8; + _handleData(buffer, data); + }); - if (_subscriptions.isEmpty) { - dispose(); - } - } + _metricsTimer = Timer.periodic(const Duration(seconds: 1), _updateMetrics); } - void sendImage( - ValueNotifier image, - ValueNotifier errorState, - List chunks, { - required bool Function() mounted, - }) async { - // pass image through preprocessor sending to [Image] for rendering - final List? imageData; - - if (preprocessor != null) { - imageData = preprocessor?.process(chunks); - } else { - imageData = chunks; - } + void _updateMetrics(_) { + bandwidth.value = _bitCount / 1e6; + framesPerSecond.value = _frameCount; - if (imageData == null) return; + _bitCount = 0; + _frameCount = 0; + } - final imageMemory = MemoryImage(Uint8List.fromList(imageData)); - previousImage?.evict(); - previousImage = imageMemory; - if (mounted()) { - errorState.value = null; - image.value = imageMemory; - } + void stopStream() async { + logger.debug('Stopping camera stream on URL $stream'); + await _rawSubscription?.cancel(); + _metricsTimer?.cancel(); + _rawSubscription = null; + _bitCount = 0; + _frameCount = 0; + httpClient.close(); + httpClient = Client(); } - void _onDataReceived({ - required List carry, - required List chunk, - required ValueNotifier image, - required ValueNotifier?> errorState, - required bool Function() mounted, - }) async { - if (carry.isNotEmpty && carry.last == _trigger) { - if (chunk.first == _eoi) { - carry.add(chunk.first); - sendImage(image, errorState, carry, mounted: mounted); - carry = []; + void _handleNewPacket(List packet) { + logger.trace('Handling a ${packet.length} byte packet'); + previousImage = packet; + List imageData = preprocessor?.process(packet) ?? packet; + imageStream.add(imageData); + _frameCount++; + } + + void _handleData(List buffer, List data) { + if (buffer.isNotEmpty && buffer.last == _trigger) { + if (data.first == _eoi) { + buffer.add(data.first); + _handleNewPacket(buffer); + buffer = []; if (!isLive) { dispose(); } } } - - for (var i = 0; i < chunk.length - 1; i++) { - final d = chunk[i]; - final d1 = chunk[i + 1]; + for (var i = 0; i < data.length - 1; i++) { + final d = data[i]; + final d1 = data[i + 1]; if (d == _trigger && d1 == _soi) { - carry = []; - carry.add(d); - } else if (d == _trigger && d1 == _eoi && carry.isNotEmpty) { - carry.add(d); - carry.add(d1); - - sendImage(image, errorState, carry, mounted: mounted); - carry = []; + buffer = []; + buffer.add(d); + } else if (d == _trigger && d1 == _eoi && buffer.isNotEmpty) { + buffer.add(d); + buffer.add(d1); + + _handleNewPacket(buffer); + buffer = []; if (!isLive) { dispose(); } - } else if (carry.isNotEmpty) { - carry.add(d); - if (i == chunk.length - 2) { - carry.add(d1); - } - } - } - } - - void updateStream( - Key key, - ValueNotifier image, - ValueNotifier?> errorState, { - required bool Function() visible, - required bool Function() mounted, - }) async { - if (byteStream == null && visible() && mounted()) { - try { - final request = Request('GET', Uri.parse(stream)); - request.headers.addAll(headers); - final response = await httpClient.send(request).timeout( - timeout); //timeout is to prevent process to hang forever in some case - - if (response.statusCode >= 200 && response.statusCode < 300) { - byteStream = response.stream.asBroadcastStream(); - - _bitSubscription = byteStream!.listen((data) { - bitCount += data.length * Uint8List.bytesPerElement * 8; - }); - } else { - if (mounted()) { - errorState.value = [ - HttpException('Stream returned ${response.statusCode} status'), - StackTrace.current - ]; - image.value = null; - } - dispose(); - } - } catch (error, stack) { - // we ignore those errors in case play/pause is triggers - if (!error - .toString() - .contains('Connection closed before full header was received')) { - if (mounted()) { - errorState.value = [error, stack]; - image.value = null; - } + } else if (buffer.isNotEmpty) { + buffer.add(d); + if (i == buffer.length - 2) { + buffer.add(d1); } } } - - if (byteStream == null) { - return; - } - - var carry = []; - _subscriptions.putIfAbsent( - key, - () => byteStream!.listen((chunk) { - if (!visible() || !mounted()) { - carry.clear(); - return; - } - _onDataReceived( - carry: carry, - chunk: chunk, - image: image, - errorState: errorState, - mounted: mounted, - ); - }, onError: (error, stack) { - try { - if (mounted()) { - errorState.value = [error, stack]; - image.value = null; - } - } finally { - dispose(); - } - }, cancelOnError: true)); - } -} - -class _StreamManager { - final MjpegStreamState mjpegStream; - - final bool Function() mounted; - final bool Function() visible; - - _StreamManager({ - required this.mjpegStream, - required this.mounted, - required this.visible, - }); - - void updateStream(Key key, ValueNotifier image, - ValueNotifier?> errorState) async { - mjpegStream.updateStream(key, image, errorState, - visible: visible, mounted: mounted); } } diff --git a/lib/widgets/nt_widgets/multi-topic/camera_stream.dart b/lib/widgets/nt_widgets/multi-topic/camera_stream.dart index 96646eb0..56336c04 100644 --- a/lib/widgets/nt_widgets/multi-topic/camera_stream.dart +++ b/lib/widgets/nt_widgets/multi-topic/camera_stream.dart @@ -25,13 +25,7 @@ class CameraStreamModel extends MultiTopicNTWidgetModel { int? _fps; Size? _resolution; - MemoryImage? _lastDisplayedImage; - - MjpegStreamState? mjpegStream; - - MemoryImage? get lastDisplayedImage => _lastDisplayedImage; - - set lastDisplayedImage(value) => _lastDisplayedImage = value; + MjpegController? controller; int? get quality => _quality; @@ -91,7 +85,12 @@ class CameraStreamModel extends MultiTopicNTWidgetModel { .toList(); if (resolution != null && resolution.length > 1) { - _resolution = Size(resolution[0].toDouble(), resolution[1].toDouble()); + if (resolution[0] % 2 != 0) { + resolution[0] += 1; + } + if (resolution[0] > 0 && resolution[1] > 0) { + _resolution = Size(resolution[0].toDouble(), resolution[1].toDouble()); + } } } @@ -164,7 +163,12 @@ class CameraStreamModel extends MultiTopicNTWidgetModel { return; } - resolution = Size(newWidth.toDouble(), + if (newWidth! % 2 != 0) { + // Won't allow += for some reason + newWidth = newWidth! + 1; + } + + resolution = Size(newWidth!.toDouble(), resolution?.height.toDouble() ?? 0); }); }, @@ -232,19 +236,15 @@ class CameraStreamModel extends MultiTopicNTWidgetModel { @override void disposeWidget({bool deleting = false}) { if (deleting) { - _lastDisplayedImage?.evict(); - mjpegStream?.previousImage?.evict(); - mjpegStream?.dispose(deleting: deleting); + controller?.dispose(); } super.disposeWidget(deleting: deleting); } void closeClient() { - _lastDisplayedImage?.evict(); - _lastDisplayedImage = mjpegStream?.previousImage; - mjpegStream?.dispose(); - mjpegStream = null; + controller?.dispose(); + controller = null; } } @@ -281,13 +281,11 @@ class CameraStreamWidget extends NTWidget { return Stack( fit: StackFit.expand, children: [ - if (model.mjpegStream?.previousImage != null || - model.lastDisplayedImage != null) + if (model.controller?.previousImage != null) Opacity( opacity: 0.35, - child: Image( - image: model.mjpegStream?.previousImage ?? - model.lastDisplayedImage!, + child: Image.memory( + Uint8List.fromList(model.controller!.previousImage!), fit: BoxFit.contain, ), ), @@ -309,28 +307,47 @@ class CameraStreamWidget extends NTWidget { ); } - bool createNewWidget = model.mjpegStream == null; + bool createNewWidget = model.controller == null; String stream = model.getUrlWithParameters(streams.last); createNewWidget = - createNewWidget || (model.mjpegStream?.stream != stream); + createNewWidget || (model.controller?.stream != stream); if (createNewWidget) { - model.lastDisplayedImage?.evict(); - model.mjpegStream?.dispose(deleting: true); + model.controller?.dispose(); - model.mjpegStream = MjpegStreamState(stream: stream); + model.controller = MjpegController(stream: stream); } - return Stack( - fit: StackFit.expand, - children: [ - Mjpeg( - mjpegStream: model.mjpegStream!, - fit: BoxFit.contain, - ), - ], + return IntrinsicWidth( + child: Column( + mainAxisAlignment: MainAxisAlignment.center, + children: [ + Row( + children: [ + ValueListenableBuilder( + valueListenable: model.controller!.framesPerSecond, + builder: (context, value, child) => Text('FPS: $value'), + ), + const Spacer(), + ValueListenableBuilder( + valueListenable: model.controller!.bandwidth, + builder: (context, value, child) => + Text('Bandwidth: ${value.toStringAsFixed(2)} Mbps'), + ), + ], + ), + Flexible( + child: Mjpeg( + controller: model.controller!, + fit: BoxFit.contain, + expandToFit: true, + ), + ), + const Text(''), + ], + ), ); }, ); diff --git a/pubspec.lock b/pubspec.lock index 0652010b..35f7cf27 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -406,14 +406,6 @@ packages: url: "https://pub.dev" source: hosted version: "1.6.0" - flutter_hooks: - dependency: "direct main" - description: - name: flutter_hooks - sha256: cde36b12f7188c85286fba9b38cc5a902e7279f36dd676967106c041dc9dde70 - url: "https://pub.dev" - source: hosted - version: "0.20.5" flutter_launcher_icons: dependency: "direct main" description: diff --git a/pubspec.yaml b/pubspec.yaml index 0c01a383..8560f9b1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -21,7 +21,6 @@ dependencies: flutter_colorpicker: ^1.1.0 flutter_context_menu: ^0.1.3 flutter_fancy_tree_view: ^1.1.1 - flutter_hooks: ^0.20.0 flutter_launcher_icons: ^0.13.1 github: ^9.17.0 http: ^1.2.0 diff --git a/test/widgets/nt_widgets/multi-topic/camera_stream_test.dart b/test/widgets/nt_widgets/multi-topic/camera_stream_test.dart index b5ac84ca..7a84186c 100644 --- a/test/widgets/nt_widgets/multi-topic/camera_stream_test.dart +++ b/test/widgets/nt_widgets/multi-topic/camera_stream_test.dart @@ -69,6 +69,47 @@ void main() { expect(cameraStreamModel.getUrlWithParameters('0.0.0.0'), '0.0.0.0?'); }); + test('Camera stream from json (with invalid resolution)', () { + NTWidgetModel cameraStreamModel = NTWidgetBuilder.buildNTModelFromJson( + ntConnection, + preferences, + 'Camera Stream', + {...cameraStreamJson}..update('resolution', (_) => [101.0, 100.0]), + ); + + expect(cameraStreamModel.type, 'Camera Stream'); + expect(cameraStreamModel.runtimeType, CameraStreamModel); + + if (cameraStreamModel is! CameraStreamModel) { + return; + } + expect(cameraStreamModel.resolution, const Size(102.0, 100.0)); + + expect(cameraStreamModel.getUrlWithParameters('0.0.0.0'), + '0.0.0.0?resolution=102x100&fps=60&compression=50'); + }); + + test('Camera stream from json (with negative resolution)', () { + NTWidgetModel cameraStreamModel = NTWidgetBuilder.buildNTModelFromJson( + ntConnection, + preferences, + 'Camera Stream', + {...cameraStreamJson}..update('resolution', (_) => [-1, 100.0]), + ); + + expect(cameraStreamModel.type, 'Camera Stream'); + expect(cameraStreamModel.runtimeType, CameraStreamModel); + + if (cameraStreamModel is! CameraStreamModel) { + return; + } + + expect(cameraStreamModel.resolution, isNull); + + expect(cameraStreamModel.getUrlWithParameters('0.0.0.0'), + '0.0.0.0?fps=60&compression=50'); + }); + test('Camera stream to json', () { CameraStreamModel cameraStreamModel = CameraStreamModel( ntConnection: ntConnection,