-
-
Notifications
You must be signed in to change notification settings - Fork 39.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Refactor] Break
QGFImageFile
's _save
function into smaller pieces (
- Loading branch information
Showing
1 changed file
with
168 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
# Copyright 2021 Nick Brassel (@tzarc) | ||
# Copyright 2023 Pablo Martinez (@elpekenin) <[email protected]> | ||
# SPDX-License-Identifier: GPL-2.0-or-later | ||
|
||
# Quantum Graphics File "QGF" Image File Format. | ||
# See https://docs.qmk.fm/#/quantum_painter_qgf for more information. | ||
|
||
import functools | ||
from colorsys import rgb_to_hsv | ||
from types import FunctionType | ||
from PIL import Image, ImageFile, ImageChops | ||
|
@@ -15,6 +17,12 @@ def o24(i): | |
return o16(i & 0xFFFF) + o8((i & 0xFF0000) >> 16) | ||
|
||
|
||
# Helper to convert from RGB888 to the QMK "dialect" of HSV888 | ||
def rgb888_to_qmk_hsv888(e): | ||
hsv = rgb_to_hsv(e[0] / 255.0, e[1] / 255.0, e[2] / 255.0) | ||
return (int(hsv[0] * 255.0), int(hsv[1] * 255.0), int(hsv[2] * 255.0)) | ||
|
||
|
||
######################################################################################################################## | ||
|
||
|
||
|
@@ -60,6 +68,14 @@ def write(self, fp): | |
+ o16(self.frame_count) # frame count | ||
) | ||
|
||
@property | ||
def image_size(self): | ||
return self.image_width, self.image_height | ||
|
||
@image_size.setter | ||
def image_size(self, size): | ||
self.image_width, self.image_height = size | ||
|
||
|
||
######################################################################################################################## | ||
|
||
|
@@ -180,6 +196,14 @@ def write(self, fp): | |
+ o16(self.bottom) # bottom | ||
) | ||
|
||
@property | ||
def bbox(self): | ||
return self.left, self.top, self.right, self.bottom | ||
|
||
@bbox.setter | ||
def bbox(self, bbox): | ||
self.left, self.top, self.right, self.bottom = bbox | ||
|
||
|
||
######################################################################################################################## | ||
|
||
|
@@ -221,51 +245,167 @@ def _accept(prefix): | |
return False | ||
|
||
|
||
def _save(im, fp, filename): | ||
def _for_all_frames(x: FunctionType, /, images): | ||
frame_num = 0 | ||
last_frame = None | ||
for frame in images: | ||
# Get number of of frames in this image | ||
nfr = getattr(frame, "n_frames", 1) | ||
for idx in range(nfr): | ||
frame.seek(idx) | ||
frame.load() | ||
copy = frame.copy().convert("RGB") | ||
x(frame_num, copy, last_frame) | ||
last_frame = copy | ||
frame_num += 1 | ||
|
||
|
||
def _compress_image(frame, last_frame, *, use_rle, use_deltas, format_, **_kwargs): | ||
# Convert the original frame so we can do comparisons | ||
converted = qmk.painter.convert_requested_format(frame, format_) | ||
graphic_data = qmk.painter.convert_image_bytes(converted, format_) | ||
|
||
# Convert the raw data to RLE-encoded if requested | ||
raw_data = graphic_data[1] | ||
if use_rle: | ||
rle_data = qmk.painter.compress_bytes_qmk_rle(graphic_data[1]) | ||
use_raw_this_frame = not use_rle or len(raw_data) <= len(rle_data) | ||
image_data = raw_data if use_raw_this_frame else rle_data | ||
|
||
# Work out if a delta frame is smaller than injecting it directly | ||
use_delta_this_frame = False | ||
bbox = None | ||
if use_deltas and last_frame is not None: | ||
# If we want to use deltas, then find the difference | ||
diff = ImageChops.difference(frame, last_frame) | ||
|
||
# Get the bounding box of those differences | ||
bbox = diff.getbbox() | ||
|
||
# If we have a valid bounding box... | ||
if bbox: | ||
# ...create the delta frame by cropping the original. | ||
delta_frame = frame.crop(bbox) | ||
|
||
# Convert the delta frame to the requested format | ||
delta_converted = qmk.painter.convert_requested_format(delta_frame, format_) | ||
delta_graphic_data = qmk.painter.convert_image_bytes(delta_converted, format_) | ||
|
||
# Work out how large the delta frame is going to be with compression etc. | ||
delta_raw_data = delta_graphic_data[1] | ||
if use_rle: | ||
delta_rle_data = qmk.painter.compress_bytes_qmk_rle(delta_graphic_data[1]) | ||
delta_use_raw_this_frame = not use_rle or len(delta_raw_data) <= len(delta_rle_data) | ||
delta_image_data = delta_raw_data if delta_use_raw_this_frame else delta_rle_data | ||
|
||
# If the size of the delta frame (plus delta descriptor) is smaller than the original, use that instead | ||
# This ensures that if a non-delta is overall smaller in size, we use that in preference due to flash | ||
# sizing constraints. | ||
if (len(delta_image_data) + QGFFrameDeltaDescriptorV1.length) < len(image_data): | ||
# Copy across all the delta equivalents so that the rest of the processing acts on those | ||
graphic_data = delta_graphic_data | ||
raw_data = delta_raw_data | ||
rle_data = delta_rle_data | ||
use_raw_this_frame = delta_use_raw_this_frame | ||
image_data = delta_image_data | ||
use_delta_this_frame = True | ||
|
||
# Default to whole image | ||
bbox = bbox or [0, 0, *frame.size] | ||
# Fix sze (as per #20296), we need to cast first as tuples are inmutable | ||
bbox = list(bbox) | ||
bbox[2] -= 1 | ||
bbox[3] -= 1 | ||
|
||
return { | ||
"bbox": bbox, | ||
"graphic_data": graphic_data, | ||
"image_data": image_data, | ||
"use_delta_this_frame": use_delta_this_frame, | ||
"use_raw_this_frame": use_raw_this_frame, | ||
} | ||
|
||
|
||
# Helper function to save each frame to the output file | ||
def _write_frame(idx, frame, last_frame, *, fp, frame_offsets, **kwargs): | ||
# Not an argument of the function as it would consume from **kwargs | ||
format_ = kwargs["format_"] | ||
|
||
# (potentially) Apply RLE and/or delta, and work out output image's information | ||
outputs = _compress_image(frame, last_frame, **kwargs) | ||
bbox = outputs["bbox"] | ||
graphic_data = outputs["graphic_data"] | ||
image_data = outputs["image_data"] | ||
use_delta_this_frame = outputs["use_delta_this_frame"] | ||
use_raw_this_frame = outputs["use_raw_this_frame"] | ||
|
||
# Write out the frame descriptor | ||
frame_offsets.frame_offsets[idx] = fp.tell() | ||
vprint(f'{f"Frame {idx:3d} base":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
frame_descriptor = QGFFrameDescriptorV1() | ||
frame_descriptor.is_delta = use_delta_this_frame | ||
frame_descriptor.is_transparent = False | ||
frame_descriptor.format = format_['image_format_byte'] | ||
frame_descriptor.compression = 0x00 if use_raw_this_frame else 0x01 # See qp.h, painter_compression_t | ||
frame_descriptor.delay = frame.info.get('duration', 1000) # If we're not an animation, just pretend we're delaying for 1000ms | ||
frame_descriptor.write(fp) | ||
|
||
# Write out the palette if required | ||
if format_['has_palette']: | ||
palette = graphic_data[0] | ||
palette_descriptor = QGFFramePaletteDescriptorV1() | ||
|
||
# Convert all palette entries to HSV888 and write to the output | ||
palette_descriptor.palette_entries = list(map(rgb888_to_qmk_hsv888, palette)) | ||
vprint(f'{f"Frame {idx:3d} palette":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
palette_descriptor.write(fp) | ||
|
||
# Write out the delta info if required | ||
if use_delta_this_frame: | ||
# Set up the rendering location of where the delta frame should be situated | ||
delta_descriptor = QGFFrameDeltaDescriptorV1() | ||
delta_descriptor.bbox = bbox | ||
|
||
# Write the delta frame to the output | ||
vprint(f'{f"Frame {idx:3d} delta":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
delta_descriptor.write(fp) | ||
|
||
# Write out the data for this frame to the output | ||
data_descriptor = QGFFrameDataDescriptorV1() | ||
data_descriptor.data = image_data | ||
vprint(f'{f"Frame {idx:3d} data":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
data_descriptor.write(fp) | ||
|
||
|
||
def _save(im, fp, _filename): | ||
"""Helper method used by PIL to write to an output file. | ||
""" | ||
# Work out from the parameters if we need to do anything special | ||
encoderinfo = im.encoderinfo.copy() | ||
append_images = list(encoderinfo.get("append_images", [])) | ||
verbose = encoderinfo.get("verbose", False) | ||
use_deltas = encoderinfo.get("use_deltas", True) | ||
use_rle = encoderinfo.get("use_rle", True) | ||
|
||
# Helper for inline verbose prints | ||
def vprint(s): | ||
if verbose: | ||
print(s) | ||
# Helper for prints, noop taking any args if not verbose | ||
global vprint | ||
verbose = encoderinfo.get("verbose", False) | ||
vprint = print if verbose else lambda *_args, **_kwargs: None | ||
|
||
# Helper to iterate through all frames in the input image | ||
def _for_all_frames(x: FunctionType): | ||
frame_num = 0 | ||
last_frame = None | ||
for frame in [im] + append_images: | ||
# Get number of of frames in this image | ||
nfr = getattr(frame, "n_frames", 1) | ||
for idx in range(nfr): | ||
frame.seek(idx) | ||
frame.load() | ||
copy = frame.copy().convert("RGB") | ||
x(frame_num, copy, last_frame) | ||
last_frame = copy | ||
frame_num += 1 | ||
append_images = list(encoderinfo.get("append_images", [])) | ||
for_all_frames = functools.partial(_for_all_frames, images=[im, *append_images]) | ||
|
||
# Collect all the frame sizes | ||
frame_sizes = [] | ||
_for_all_frames(lambda idx, frame, last_frame: frame_sizes.append(frame.size)) | ||
for_all_frames(lambda _idx, frame, _last_frame: frame_sizes.append(frame.size)) | ||
|
||
# Make sure all frames are the same size | ||
if len(list(set(frame_sizes))) != 1: | ||
if len(set(frame_sizes)) != 1: | ||
raise ValueError("Mismatching sizes on frames") | ||
|
||
# Write out the initial graphics descriptor (and write a dummy value), so that we can come back and fill in the | ||
# correct values once we've written all the frames to the output | ||
graphics_descriptor_location = fp.tell() | ||
graphics_descriptor = QGFGraphicsDescriptor() | ||
graphics_descriptor.frame_count = len(frame_sizes) | ||
graphics_descriptor.image_width = frame_sizes[0][0] | ||
graphics_descriptor.image_height = frame_sizes[0][1] | ||
graphics_descriptor.image_size = frame_sizes[0] | ||
vprint(f'{"Graphics descriptor block":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
graphics_descriptor.write(fp) | ||
|
||
|
@@ -276,117 +416,9 @@ def _for_all_frames(x: FunctionType): | |
vprint(f'{"Frame offsets block":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
frame_offsets.write(fp) | ||
|
||
# Helper function to save each frame to the output file | ||
def _write_frame(idx, frame, last_frame): | ||
# If we replace the frame we're going to output with a delta, we can override it here | ||
this_frame = frame | ||
location = (0, 0) | ||
size = frame.size | ||
|
||
# Work out the format we're going to use | ||
format = encoderinfo["qmk_format"] | ||
|
||
# Convert the original frame so we can do comparisons | ||
converted = qmk.painter.convert_requested_format(this_frame, format) | ||
graphic_data = qmk.painter.convert_image_bytes(converted, format) | ||
|
||
# Convert the raw data to RLE-encoded if requested | ||
raw_data = graphic_data[1] | ||
if use_rle: | ||
rle_data = qmk.painter.compress_bytes_qmk_rle(graphic_data[1]) | ||
use_raw_this_frame = not use_rle or len(raw_data) <= len(rle_data) | ||
image_data = raw_data if use_raw_this_frame else rle_data | ||
|
||
# Work out if a delta frame is smaller than injecting it directly | ||
use_delta_this_frame = False | ||
if use_deltas and last_frame is not None: | ||
# If we want to use deltas, then find the difference | ||
diff = ImageChops.difference(frame, last_frame) | ||
|
||
# Get the bounding box of those differences | ||
bbox = diff.getbbox() | ||
|
||
# If we have a valid bounding box... | ||
if bbox: | ||
# ...create the delta frame by cropping the original. | ||
delta_frame = frame.crop(bbox) | ||
delta_location = (bbox[0], bbox[1]) | ||
delta_size = (bbox[2] - bbox[0], bbox[3] - bbox[1]) | ||
|
||
# Convert the delta frame to the requested format | ||
delta_converted = qmk.painter.convert_requested_format(delta_frame, format) | ||
delta_graphic_data = qmk.painter.convert_image_bytes(delta_converted, format) | ||
|
||
# Work out how large the delta frame is going to be with compression etc. | ||
delta_raw_data = delta_graphic_data[1] | ||
if use_rle: | ||
delta_rle_data = qmk.painter.compress_bytes_qmk_rle(delta_graphic_data[1]) | ||
delta_use_raw_this_frame = not use_rle or len(delta_raw_data) <= len(delta_rle_data) | ||
delta_image_data = delta_raw_data if delta_use_raw_this_frame else delta_rle_data | ||
|
||
# If the size of the delta frame (plus delta descriptor) is smaller than the original, use that instead | ||
# This ensures that if a non-delta is overall smaller in size, we use that in preference due to flash | ||
# sizing constraints. | ||
if (len(delta_image_data) + QGFFrameDeltaDescriptorV1.length) < len(image_data): | ||
# Copy across all the delta equivalents so that the rest of the processing acts on those | ||
this_frame = delta_frame | ||
location = delta_location | ||
size = delta_size | ||
converted = delta_converted | ||
graphic_data = delta_graphic_data | ||
raw_data = delta_raw_data | ||
rle_data = delta_rle_data | ||
use_raw_this_frame = delta_use_raw_this_frame | ||
image_data = delta_image_data | ||
use_delta_this_frame = True | ||
|
||
# Write out the frame descriptor | ||
frame_offsets.frame_offsets[idx] = fp.tell() | ||
vprint(f'{f"Frame {idx:3d} base":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
frame_descriptor = QGFFrameDescriptorV1() | ||
frame_descriptor.is_delta = use_delta_this_frame | ||
frame_descriptor.is_transparent = False | ||
frame_descriptor.format = format['image_format_byte'] | ||
frame_descriptor.compression = 0x00 if use_raw_this_frame else 0x01 # See qp.h, painter_compression_t | ||
frame_descriptor.delay = frame.info['duration'] if 'duration' in frame.info else 1000 # If we're not an animation, just pretend we're delaying for 1000ms | ||
frame_descriptor.write(fp) | ||
|
||
# Write out the palette if required | ||
if format['has_palette']: | ||
palette = graphic_data[0] | ||
palette_descriptor = QGFFramePaletteDescriptorV1() | ||
|
||
# Helper to convert from RGB888 to the QMK "dialect" of HSV888 | ||
def rgb888_to_qmk_hsv888(e): | ||
hsv = rgb_to_hsv(e[0] / 255.0, e[1] / 255.0, e[2] / 255.0) | ||
return (int(hsv[0] * 255.0), int(hsv[1] * 255.0), int(hsv[2] * 255.0)) | ||
|
||
# Convert all palette entries to HSV888 and write to the output | ||
palette_descriptor.palette_entries = list(map(rgb888_to_qmk_hsv888, palette)) | ||
vprint(f'{f"Frame {idx:3d} palette":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
palette_descriptor.write(fp) | ||
|
||
# Write out the delta info if required | ||
if use_delta_this_frame: | ||
# Set up the rendering location of where the delta frame should be situated | ||
delta_descriptor = QGFFrameDeltaDescriptorV1() | ||
delta_descriptor.left = location[0] | ||
delta_descriptor.top = location[1] | ||
delta_descriptor.right = location[0] + size[0] - 1 | ||
delta_descriptor.bottom = location[1] + size[1] - 1 | ||
|
||
# Write the delta frame to the output | ||
vprint(f'{f"Frame {idx:3d} delta":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
delta_descriptor.write(fp) | ||
|
||
# Write out the data for this frame to the output | ||
data_descriptor = QGFFrameDataDescriptorV1() | ||
data_descriptor.data = image_data | ||
vprint(f'{f"Frame {idx:3d} data":26s} {fp.tell():5d}d / {fp.tell():04X}h') | ||
data_descriptor.write(fp) | ||
|
||
# Iterate over each if the input frames, writing it to the output in the process | ||
_for_all_frames(_write_frame) | ||
write_frame = functools.partial(_write_frame, format_=encoderinfo["qmk_format"], fp=fp, use_deltas=encoderinfo.get("use_deltas", True), use_rle=encoderinfo.get("use_rle", True), frame_offsets=frame_offsets) | ||
for_all_frames(write_frame) | ||
|
||
# Go back and update the graphics descriptor now that we can determine the final file size | ||
graphics_descriptor.total_file_size = fp.tell() | ||
|