From 5eaca086dd22d7aabcad703a330cd12247f2a15e Mon Sep 17 00:00:00 2001 From: Martin Kubovcik Date: Sat, 2 Apr 2022 13:04:57 +0200 Subject: [PATCH 1/5] Denormalization layer --- keras/layers/preprocessing/BUILD | 33 ++ keras/layers/preprocessing/denormalization.py | 326 ++++++++++++++ .../preprocessing/denormalization_test.py | 413 ++++++++++++++++++ 3 files changed, 772 insertions(+) create mode 100644 keras/layers/preprocessing/denormalization.py create mode 100644 keras/layers/preprocessing/denormalization_test.py diff --git a/keras/layers/preprocessing/BUILD b/keras/layers/preprocessing/BUILD index ca9cd75ca4a..0aa7fdd70e1 100644 --- a/keras/layers/preprocessing/BUILD +++ b/keras/layers/preprocessing/BUILD @@ -128,6 +128,21 @@ py_library( ], ) +py_library( + name = "denormalization", + srcs = [ + "denormalization.py", + ], + srcs_version = "PY3", + deps = [ + ":preprocessing_utils", + "//:expect_numpy_installed", + "//:expect_tensorflow_installed", + "//keras:backend", + "//keras/engine", + ], +) + py_library( name = "integer_lookup", srcs = [ @@ -471,6 +486,24 @@ tf_py_test( ], ) +tf_py_test( + name = "denormalization_test", + srcs = ["denormalization_test.py"], + python_version = "PY3", + shard_count = 4, + tags = [ + "noasan", # TODO(b/337374867) fails with -fsanitize=null + ], + deps = [ + ":denormalization", + ":preprocessing_test_utils", + "//:expect_absl_installed", + "//:expect_tensorflow_installed", + "//keras", + "//keras/testing_infra:test_combinations", + ], +) + tf_py_test( name = "integer_lookup_test", srcs = ["integer_lookup_test.py"], diff --git a/keras/layers/preprocessing/denormalization.py b/keras/layers/preprocessing/denormalization.py new file mode 100644 index 00000000000..45f4838d80d --- /dev/null +++ b/keras/layers/preprocessing/denormalization.py @@ -0,0 +1,326 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Denormalization preprocessing layer.""" + +# pylint: disable=g-classes-have-attributes +# pylint: disable=g-direct-tensorflow-import + +from keras import backend +from keras.engine import base_preprocessing_layer +from keras.layers.preprocessing import preprocessing_utils as utils +import numpy as np +import tensorflow.compat.v2 as tf +from tensorflow.python.util.tf_export import keras_export + + +@keras_export('keras.layers.Denormalization', + 'keras.layers.experimental.preprocessing.Denormalization') +class Denormalization(base_preprocessing_layer.PreprocessingLayer): + """A preprocessing layer which denormalizes continuous features. + + This layer will shift and scale inputs into a distribution of training data. It accomplishes this by precomputing the mean and + variance of the data, and calling `(input + mean) * sqrt(var)` at runtime. + + The mean and variance values for the layer must be either supplied on + construction or learned via `adapt()`. `adapt()` will compute the mean and + variance of the data and store them as the layer's weights. `adapt()` should + be called before `fit()`, `evaluate()`, or `predict()`. + + For an overview and full list of preprocessing layers, see the preprocessing + [guide](https://www.tensorflow.org/guide/keras/preprocessing_layers). + + Args: + axis: Integer, tuple of integers, or None. The axis or axes that should + have a separate mean and variance for each index in the shape. For + example, if shape is `(None, 5)` and `axis=1`, the layer will track 5 + separate mean and variance values for the last axis. If `axis` is set to + `None`, the layer will normalize all elements in the input by a scalar + mean and variance. Defaults to -1, where the last axis of the input is + assumed to be a feature dimension and is normalized per index. Note that + in the specific case of batched scalar inputs where the only axis is the + batch axis, the default will normalize each index in the batch + separately. In this case, consider passing `axis=None`. + mean: The mean value(s) to use during denormalization. The passed value(s) + will be broadcast to the shape of the kept axes above; if the value(s) + cannot be broadcast, an error will be raised when this layer's `build()` + method is called. + variance: The variance value(s) to use during denormalization. The passed + value(s) will be broadcast to the shape of the kept axes above; if the + value(s) cannot be broadcast, an error will be raised when this layer's + `build()` method is called. + + Examples: + + Calculate a global mean and variance by analyzing the dataset in `adapt()`. + + >>> adapt_data = np.array([1., 2., 3., 4., 5.], dtype='float32') + >>> input_data = np.array([1., 2., 3.], dtype='float32') + >>> layer = tf.keras.layers.Denormalization(axis=None) + >>> layer.adapt(adapt_data) + >>> layer(input_data) + + + Calculate a mean and variance for each index on the last axis. + + >>> adapt_data = np.array([[0., 7., 4.], + ... [2., 9., 6.], + ... [0., 7., 4.], + ... [2., 9., 6.]], dtype='float32') + >>> input_data = np.array([[0., 7., 4.]], dtype='float32') + >>> layer = tf.keras.layers.Denormalization(axis=-1) + >>> layer.adapt(adapt_data) + >>> layer(input_data) + + + Pass the mean and variance directly. + + >>> input_data = np.array([[1.], [2.], [3.]], dtype='float32') + >>> layer = tf.keras.layers.Denormalization(mean=3., variance=2.) + >>> layer(input_data) + + """ + + def __init__(self, axis=-1, mean=None, variance=None, **kwargs): + super().__init__(**kwargs) + base_preprocessing_layer.keras_kpl_gauge.get_cell('Denormalization').set(True) + + # Standardize `axis` to a tuple. + if axis is None: + axis = () + elif isinstance(axis, int): + axis = (axis,) + else: + axis = tuple(axis) + self.axis = axis + + # Set `mean` and `variance` if passed. + if isinstance(mean, tf.Variable): + raise ValueError('Denormalization does not support passing a Variable ' + 'for the `mean` init arg.') + if isinstance(variance, tf.Variable): + raise ValueError('Denormalization does not support passing a Variable ' + 'for the `variance` init arg.') + if (mean is not None) != (variance is not None): + raise ValueError( + 'When setting values directly, both `mean` and `variance` ' + 'must be set. Got mean: {} and variance: {}'.format(mean, variance)) + self.input_mean = mean + self.input_variance = variance + + def build(self, input_shape): + super().build(input_shape) + + if (isinstance(input_shape, (list, tuple)) and + all(isinstance(shape, tf.TensorShape) for shape in input_shape)): + raise ValueError('Denormalization only accepts a single input. If you are ' + 'passing a python list or tuple as a single input, ' + 'please convert to a numpy array or `tf.Tensor`.') + + input_shape = tf.TensorShape(input_shape).as_list() + ndim = len(input_shape) + + if any(a < -ndim or a >= ndim for a in self.axis): + raise ValueError('All `axis` values must be in the range [-ndim, ndim). ' + 'Found ndim: `{}`, axis: {}'.format(ndim, self.axis)) + + # Axes to be kept, replacing negative values with positive equivalents. + # Sorted to avoid transposing axes. + self._keep_axis = sorted([d if d >= 0 else d + ndim for d in self.axis]) + # All axes to be kept should have known shape. + for d in self._keep_axis: + if input_shape[d] is None: + raise ValueError( + 'All `axis` values to be kept must have known shape. Got axis: {}, ' + 'input shape: {}, with unknown axis at index: {}'.format( + self.axis, input_shape, d)) + # Axes to be reduced. + self._reduce_axis = [d for d in range(ndim) if d not in self._keep_axis] + # 1 if an axis should be reduced, 0 otherwise. + self._reduce_axis_mask = [ + 0 if d in self._keep_axis else 1 for d in range(ndim) + ] + # Broadcast any reduced axes. + self._broadcast_shape = [ + input_shape[d] if d in self._keep_axis else 1 for d in range(ndim) + ] + mean_and_var_shape = tuple(input_shape[d] for d in self._keep_axis) + + if self.input_mean is None: + self.adapt_mean = self.add_weight( + name='mean', + shape=mean_and_var_shape, + dtype=self.compute_dtype, + initializer='zeros', + trainable=False) + self.adapt_variance = self.add_weight( + name='variance', + shape=mean_and_var_shape, + dtype=self.compute_dtype, + initializer='ones', + trainable=False) + self.count = self.add_weight( + name='count', + shape=(), + dtype=tf.int64, + initializer='zeros', + trainable=False) + self.finalize_state() + else: + # In the no adapt case, make constant tensors for mean and variance with + # proper broadcast shape for use during call. + mean = self.input_mean * np.ones(mean_and_var_shape) + variance = self.input_variance * np.ones(mean_and_var_shape) + mean = tf.reshape(mean, self._broadcast_shape) + variance = tf.reshape(variance, self._broadcast_shape) + self.mean = tf.cast(mean, self.compute_dtype) + self.variance = tf.cast(variance, self.compute_dtype) + + # We override this method solely to generate a docstring. + def adapt(self, data, batch_size=None, steps=None): + """Computes the mean and variance of values in a dataset. + + Calling `adapt()` on a `Denormalization` layer is an alternative to passing in + `mean` and `variance` arguments during layer construction. A `Denormalization` + layer should always either be adapted over a dataset or passed `mean` and + `variance`. + + During `adapt()`, the layer will compute a `mean` and `variance` separately + for each position in each axis specified by the `axis` argument. To + calculate a single `mean` and `variance` over the input data, simply pass + `axis=None`. + + In order to make `Denormalization` efficient in any distribution context, the + computed mean and variance are kept static with respect to any compiled + `tf.Graph`s that call the layer. As a consequence, if the layer is adapted a + second time, any models using the layer should be re-compiled. For more + information see + `tf.keras.layers.experimental.preprocessing.PreprocessingLayer.adapt`. + + `adapt()` is meant only as a single machine utility to compute layer state. + To analyze a dataset that cannot fit on a single machine, see + [Tensorflow Transform](https://www.tensorflow.org/tfx/transform/get_started) + for a multi-machine, map-reduce solution. + + Arguments: + data: The data to train on. It can be passed either as a + `tf.data.Dataset`, or as a numpy array. + batch_size: Integer or `None`. + Number of samples per state update. + If unspecified, `batch_size` will default to 32. + Do not specify the `batch_size` if your data is in the + form of datasets, generators, or `keras.utils.Sequence` instances + (since they generate batches). + steps: Integer or `None`. + Total number of steps (batches of samples) + When training with input tensors such as + TensorFlow data tensors, the default `None` is equal to + the number of samples in your dataset divided by + the batch size, or 1 if that cannot be determined. If x is a + `tf.data` dataset, and 'steps' is None, the epoch will run until + the input dataset is exhausted. When passing an infinitely + repeating dataset, you must specify the `steps` argument. This + argument is not supported with array inputs. + """ + super().adapt(data, batch_size=batch_size, steps=steps) + + def update_state(self, data): + if self.input_mean is not None: + raise ValueError( + 'Cannot `adapt` a Denormalization layer that is initialized with ' + 'static `mean` and `variance`, you passed mean {} and variance {}.' + .format(self.input_mean, self.input_variance)) + + if not self.built: + raise RuntimeError('`build` must be called before `update_state`.') + + data = self._standardize_inputs(data) + data = tf.cast(data, self.adapt_mean.dtype) + batch_mean, batch_variance = tf.nn.moments(data, axes=self._reduce_axis) + batch_shape = tf.shape(data, out_type=self.count.dtype) + if self._reduce_axis: + batch_reduce_shape = tf.gather(batch_shape, self._reduce_axis) + batch_count = tf.reduce_prod(batch_reduce_shape) + else: + batch_count = 1 + + total_count = batch_count + self.count + batch_weight = ( + tf.cast(batch_count, dtype=self.compute_dtype) / + tf.cast(total_count, dtype=self.compute_dtype)) + existing_weight = 1. - batch_weight + + total_mean = self.adapt_mean * existing_weight + batch_mean * batch_weight + # The variance is computed using the lack-of-fit sum of squares + # formula (see https://en.wikipedia.org/wiki/Lack-of-fit_sum_of_squares). + total_variance = ((self.adapt_variance + + (self.adapt_mean - total_mean)**2) * existing_weight + + (batch_variance + + (batch_mean - total_mean)**2) * batch_weight) + self.adapt_mean.assign(total_mean) + self.adapt_variance.assign(total_variance) + self.count.assign(total_count) + + def reset_state(self): # pylint: disable=method-hidden + if self.input_mean is not None or not self.built: + return + + self.adapt_mean.assign(tf.zeros_like(self.adapt_mean)) + self.adapt_variance.assign(tf.ones_like(self.adapt_variance)) + self.count.assign(tf.zeros_like(self.count)) + + def finalize_state(self): + if self.input_mean is not None or not self.built: + return + + # In the adapt case, we make constant tensors for mean and variance with + # proper broadcast shape and dtype each time `finalize_state` is called. + self.mean = tf.reshape(self.adapt_mean, self._broadcast_shape) + self.mean = tf.cast(self.mean, self.compute_dtype) + self.variance = tf.reshape(self.adapt_variance, self._broadcast_shape) + self.variance = tf.cast(self.variance, self.compute_dtype) + + def call(self, inputs): + inputs = self._standardize_inputs(inputs) + # The base layer automatically casts floating-point inputs, but we + # explicitly cast here to also allow integer inputs to be passed + inputs = tf.cast(inputs, self.compute_dtype) + return ((inputs + self.mean) * + tf.maximum(tf.sqrt(self.variance), backend.epsilon())) + + def compute_output_shape(self, input_shape): + return input_shape + + def compute_output_signature(self, input_spec): + return input_spec + + def get_config(self): + config = super().get_config() + config.update({ + 'axis': self.axis, + 'mean': utils.listify_tensors(self.input_mean), + 'variance': utils.listify_tensors(self.input_variance), + }) + return config + + def _standardize_inputs(self, inputs): + inputs = tf.convert_to_tensor(inputs) + if inputs.dtype != self.compute_dtype: + inputs = tf.cast(inputs, self.compute_dtype) + return inputs diff --git a/keras/layers/preprocessing/denormalization_test.py b/keras/layers/preprocessing/denormalization_test.py new file mode 100644 index 00000000000..a3b79d2467f --- /dev/null +++ b/keras/layers/preprocessing/denormalization_test.py @@ -0,0 +1,413 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for keras.layers.preprocessing.denormalization.""" + +import tensorflow.compat.v2 as tf + +import os + +from absl.testing import parameterized + +import numpy as np + +import keras +from keras.testing_infra import test_combinations +from keras.testing_infra import test_utils +from keras.layers.preprocessing import denormalization +from keras.layers.preprocessing import preprocessing_test_utils +from keras.mixed_precision import policy + + +def _get_layer_computation_test_cases(): + test_cases = ({ + "adapt_data": np.array([[1.], [2.], [3.], [4.], [5.]], dtype=np.float32), + "axis": -1, + "test_data": np.array([[1.], [2.], [3.]], np.float32), + "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), + "testcase_name": "2d_single_element" + }, { + "adapt_data": np.array([[1], [2], [3], [4], [5]], dtype=np.int32), + "axis": -1, + "test_data": np.array([[1], [2], [3]], np.int32), + "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), + "testcase_name": "2d_int_data" + }, { + "adapt_data": np.array([[1.], [2.], [3.], [4.], [5.]], dtype=np.float32), + "axis": None, + "test_data": np.array([[1.], [2.], [3.]], np.float32), + "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), + "testcase_name": "2d_single_element_none_axis" + }, { + "adapt_data": np.array([[1., 2., 3., 4., 5.]], dtype=np.float32), + "axis": None, + "test_data": np.array([[1.], [2.], [3.]], np.float32), + "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), + "testcase_name": "2d_single_element_none_axis_flat_data" + }, { + "adapt_data": + np.array([[[1., 2., 3.], [2., 3., 4.]], [[3., 4., 5.], [4., 5., 6.]]], + np.float32), + "axis": + 1, + "test_data": + np.array([[[1., 2., 3.], [2., 3., 4.]], [[3., 4., 5.], [4., 5., 6.]]], + np.float32), + "expected": + np.array([[[-1.549193, -0.774597, 0.], [-1.549193, -0.774597, 0.]], + [[0., 0.774597, 1.549193], [0., 0.774597, 1.549193]]], + np.float32), + "testcase_name": + "3d_internal_axis" + }, { + "adapt_data": + np.array( + [[[1., 0., 3.], [2., 3., 4.]], [[3., -1., 5.], [4., 5., 8.]]], + np.float32), + "axis": (1, 2), + "test_data": + np.array( + [[[3., 1., -1.], [2., 5., 4.]], [[3., 0., 5.], [2., 5., 8.]]], + np.float32), + "expected": + np.array( + [[[1., 3., -5.], [-1., 1., -1.]], [[1., 1., 1.], [-1., 1., 1.]]], + np.float32), + "testcase_name": + "3d_multiple_axis" + }, { + "adapt_data": + np.zeros((3, 4)), + "axis": -1, + "test_data": + np.zeros((3, 4)), + "expected": + np.zeros((3, 4)), + "testcase_name": + "zero_variance" + }) + + crossed_test_cases = [] + # Cross above test cases with use_dataset in (True, False) + for use_dataset in (True, False): + for case in test_cases: + case = case.copy() + if use_dataset: + case["testcase_name"] = case["testcase_name"] + "_with_dataset" + case["use_dataset"] = use_dataset + crossed_test_cases.append(case) + + return crossed_test_cases + + +@test_combinations.run_all_keras_modes +class NormalizationTest(test_combinations.TestCase, + preprocessing_test_utils.PreprocessingLayerTest): + + def test_broadcasting_during_direct_setting(self): + layer = denormalization.Denormalization(axis=-1, mean=[1.0], variance=[2.0]) + output = layer(np.array([[1., 2.]])) + expected_output = [[4., 6.]] + self.assertAllClose(output, expected_output) + self.assertAllClose(layer.get_weights(), []) + + def test_broadcasting_during_direct_setting_with_tensors(self): + if not tf.executing_eagerly(): + self.skipTest("Only supported in TF2.") + + layer = denormalization.Denormalization( + axis=-1, + mean=tf.constant([1.0]), + variance=tf.constant([2.0])) + output = layer(np.array([[1., 2.]])) + expected_output = [[4., 6.]] + self.assertAllClose(output, expected_output) + self.assertAllClose(layer.get_weights(), []) + + def test_1d_data(self): + data = np.array([0., 2., 0., 2.]) + layer = denormalization.Denormalization(mean=1.0, variance=2.0) + output = layer(data) + self.assertListEqual(output.shape.as_list(), [4]) + self.assertAllClose(output, [2, 6, -2, 6]) + + def test_0d_data(self): + layer = denormalization.Denormalization(axis=None, mean=1.0, variance=2.0) + output = layer(0.) + self.assertListEqual(output.shape.as_list(), []) + self.assertAllClose(output, 2) + + def test_broadcasting_during_direct_setting_with_variables_fails(self): + with self.assertRaisesRegex(ValueError, "passing a Variable"): + _ = denormalization.Denormalization( + axis=-1, + mean=tf.Variable([1.0]), + variance=tf.Variable([2.0])) + + def test_keeping_an_unknown_axis_fails(self): + layer = denormalization.Denormalization(axis=-1) + with self.assertRaisesRegex(ValueError, "axis.*must have known shape"): + layer.build([None]) + + @parameterized.parameters( + # Out of bounds + {"axis": 3}, + {"axis": -4}, + # In a tuple + {"axis": (1, 3)}, + {"axis": (1, -4)}, + ) + def test_bad_axis_fail_build(self, axis): + layer = denormalization.Denormalization(axis=axis) + with self.assertRaisesRegex(ValueError, "in the range"): + layer.build([None, 2, 3]) + + def test_list_input(self): + with self.assertRaisesRegex( + ValueError, ("Normalization only accepts a single input. If you are " + "passing a python list or tuple as a single input, " + "please convert to a numpy array or `tf.Tensor`.")): + denormalization.Denormalization()([1, 2, 3]) + + def test_scalar_input(self): + with self.assertRaisesRegex(ValueError, + "axis.*values must be in the range"): + denormalization.Denormalization()(1) + + def test_output_dtype(self): + if not tf.__internal__.tf2.enabled(): + self.skipTest("set_global_policy only supported in TF2.") + # Output should respect an explicit dtype, and default to the global policy. + policy.set_global_policy("float64") + input_data = keras.Input(batch_size=16, shape=(1,)) + layer = denormalization.Denormalization(mean=1.0, variance=2.0, dtype="float16") + output = layer(input_data) + self.assertAllEqual(output.dtype, tf.float16) + layer = denormalization.Denormalization(mean=1.0, variance=2.0) + output = layer(input_data) + self.assertAllEqual(output.dtype, tf.float64) + + +@test_combinations.run_all_keras_modes(always_skip_v1=True) +class NormalizationAdaptTest(test_combinations.TestCase, + preprocessing_test_utils.PreprocessingLayerTest): + + def test_layer_api_compatibility(self): + cls = denormalization.Denormalization + output_data = test_utils.layer_test( + cls, + kwargs={"axis": -1}, + input_shape=(None, 3), + input_data=np.array([[3, 1, 2], [6, 5, 4]], dtype=np.float32), + validate_training=False, + adapt_data=np.array([[1, 2, 1], [2, 3, 4], [1, 2, 1], [2, 3, 4]])) + expected = np.array([[3., -3., -0.33333333], [9., 5., 1.]]) + self.assertAllClose(expected, output_data) + + @parameterized.named_parameters(*_get_layer_computation_test_cases()) + def test_layer_computation(self, adapt_data, axis, test_data, use_dataset, + expected): + input_shape = tuple([test_data.shape[i] for i in range(1, test_data.ndim)]) + if use_dataset: + # Keras APIs expect batched datasets + adapt_data = tf.data.Dataset.from_tensor_slices(adapt_data).batch( + test_data.shape[0] // 2) + test_data = tf.data.Dataset.from_tensor_slices(test_data).batch( + test_data.shape[0] // 2) + + layer = denormalization.Denormalization(axis=axis) + layer.adapt(adapt_data) + + input_data = keras.Input(shape=input_shape) + output = layer(input_data) + model = keras.Model(input_data, output) + model._run_eagerly = test_utils.should_run_eagerly() + output_data = model.predict(test_data) + self.assertAllClose(expected, output_data) + + def test_1d_unbatched_adapt(self): + ds = tf.data.Dataset.from_tensor_slices([ + [2., 0., 2., 0.], + [0., 2., 0., 2.], + ]) + layer = denormalization.Denormalization(axis=-1) + layer.adapt(ds) + output_ds = ds.map(layer) + self.assertAllClose( + list(output_ds.as_numpy_iterator()), [ + [1., -1., 1., -1.], + [-1., 1., -1., 1.], + ]) + + def test_0d_unbatched_adapt(self): + ds = tf.data.Dataset.from_tensor_slices([2., 0., 2., 0.]) + layer = denormalization.Denormalization(axis=None) + layer.adapt(ds) + output_ds = ds.map(layer) + self.assertAllClose(list(output_ds.as_numpy_iterator()), [1., -1., 1., -1.]) + + @parameterized.parameters( + # Results should be identical no matter how the axes are specified (3d). + {"axis": (1, 2)}, + {"axis": (2, 1)}, + {"axis": (1, -1)}, + {"axis": (-1, 1)}, + ) + def test_axis_permutations(self, axis): + layer = denormalization.Denormalization(axis=axis) + # data.shape = [2, 2, 3] + data = np.array([[[0., 1., 2.], [0., 2., 6.]], + [[2., 3., 4.], [3., 6., 10.]]]) + expect = np.array([[[-1., -1., -1.], [-1., -1., -1.]], + [[1., 1., 1.], [1., 1., 1.]]]) + layer.adapt(data) + self.assertAllClose(expect, layer(data)) + + def test_model_summary_after_layer_adapt(self): + data = np.array([[[0., 1., 2.], [0., 2., 6.]], + [[2., 3., 4.], [3., 6., 10.]]]) + layer = denormalization.Denormalization(axis=-1) + layer.adapt(data) + model = keras.Sequential( + [layer, + keras.layers.Dense(64, activation="relu"), + keras.layers.Dense(1)]) + model.summary() + + def test_multiple_adapts(self): + first_adapt = [[0], [2], [0], [2]] + second_adapt = [[2], [4], [2], [4]] + predict_input = [[2], [2]] + expected_first_output = [[1], [1]] + expected_second_output = [[-1], [-1]] + + inputs = keras.Input(shape=(1,), dtype=tf.int32) + layer = denormalization.Denormalization(axis=-1) + layer.adapt(first_adapt) + outputs = layer(inputs) + model = keras.Model(inputs=inputs, outputs=outputs) + + actual_output = model.predict(predict_input) + self.assertAllClose(actual_output, expected_first_output) + + # Re-adapt the layer on new inputs. + layer.adapt(second_adapt) + # Re-compile the model. + model.compile() + # `predict` should now use the new model state. + actual_output = model.predict(predict_input) + self.assertAllClose(actual_output, expected_second_output) + + @parameterized.parameters( + {"adapted": True}, + {"adapted": False}, + ) + def test_saved_model_tf(self, adapted): + input_data = [[0.], [2.], [0.], [2.]] + expected_output = [[-1.], [1.], [-1.], [1.]] + + inputs = keras.Input(shape=(1,), dtype=tf.float32) + if adapted: + layer = denormalization.Normalization(axis=-1) + layer.adapt(input_data) + else: + layer = denormalization.Normalization(mean=1., variance=2.) + outputs = layer(inputs) + model = keras.Model(inputs=inputs, outputs=outputs) + + output_data = model.predict(input_data) + self.assertAllClose(output_data, expected_output) + + # Save the model to disk. + output_path = os.path.join(self.get_temp_dir(), "tf_saved_model") + tf.saved_model.save(model, output_path) + loaded_model = tf.saved_model.load(output_path) + f = loaded_model.signatures["serving_default"] + + # Ensure that the loaded model is unique (so that the save/load is real) + self.assertIsNot(model, loaded_model) + + # Validate correctness of the new model. + new_output_data = f(tf.constant(input_data))["normalization"] + self.assertAllClose(new_output_data, expected_output) + + @parameterized.product( + save_format=["tf", "h5"], + adapt=[True, False], + ) + def test_saved_model_keras(self, save_format, adapt): + input_data = [[0.], [2.], [0.], [2.]] + expected_output = [[-1.], [1.], [-1.], [1.]] + + cls = denormalization.Denormalization + inputs = keras.Input(shape=(1,), dtype=tf.float32) + if adapt: + layer = cls(axis=-1) + layer.adapt(input_data) + else: + layer = cls(mean=1., variance=2.) + outputs = layer(inputs) + model = keras.Model(inputs=inputs, outputs=outputs) + + output_data = model.predict(input_data) + self.assertAllClose(output_data, expected_output) + + # Save the model to disk. + output_path = os.path.join(self.get_temp_dir(), "tf_keras_saved_model") + model.save(output_path, save_format=format) + loaded_model = keras.models.load_model( + output_path, custom_objects={"Normalization": cls}) + + # Ensure that the loaded model is unique (so that the save/load is real) + self.assertIsNot(model, loaded_model) + + # Validate correctness of the new model. + new_output_data = loaded_model.predict(input_data) + self.assertAllClose(new_output_data, expected_output) + + @parameterized.parameters( + {"adapted": True}, + {"adapted": False}, + ) + def test_saved_weights_keras(self, adapted): + input_data = [[0.], [2.], [0.], [2.]] + expected_output = [[-1.], [1.], [-1.], [1.]] + + cls = denormalization.Denormalization + inputs = keras.Input(shape=(1,), dtype=tf.float32) + if adapted: + layer = cls(axis=-1) + layer.adapt(input_data) + else: + layer = cls(mean=1., variance=2.) + outputs = layer(inputs) + model = keras.Model(inputs=inputs, outputs=outputs) + + output_data = model.predict(input_data) + self.assertAllClose(output_data, expected_output) + + # Save the model to disk. + output_path = os.path.join(self.get_temp_dir(), "tf_keras_saved_weights") + model.save_weights(output_path, save_format="tf") + new_model = keras.Model.from_config( + model.get_config(), custom_objects={"Normalization": cls}) + new_model.load_weights(output_path) + + # Validate correctness of the new model. + new_output_data = new_model.predict(input_data) + self.assertAllClose(new_output_data, expected_output) + + +if __name__ == "__main__": + tf.test.main() From 261c08f01dec10b40b09eebc734ebfb4e62e5b6f Mon Sep 17 00:00:00 2001 From: Martin Kubovcik Date: Mon, 4 Apr 2022 20:14:06 +0200 Subject: [PATCH 2/5] + invert --- keras/layers/preprocessing/BUILD | 33 -- keras/layers/preprocessing/denormalization.py | 326 -------------- .../preprocessing/denormalization_test.py | 413 ------------------ keras/layers/preprocessing/normalization.py | 10 +- .../preprocessing/normalization_test.py | 7 + 5 files changed, 15 insertions(+), 774 deletions(-) delete mode 100644 keras/layers/preprocessing/denormalization.py delete mode 100644 keras/layers/preprocessing/denormalization_test.py diff --git a/keras/layers/preprocessing/BUILD b/keras/layers/preprocessing/BUILD index 0aa7fdd70e1..ca9cd75ca4a 100644 --- a/keras/layers/preprocessing/BUILD +++ b/keras/layers/preprocessing/BUILD @@ -128,21 +128,6 @@ py_library( ], ) -py_library( - name = "denormalization", - srcs = [ - "denormalization.py", - ], - srcs_version = "PY3", - deps = [ - ":preprocessing_utils", - "//:expect_numpy_installed", - "//:expect_tensorflow_installed", - "//keras:backend", - "//keras/engine", - ], -) - py_library( name = "integer_lookup", srcs = [ @@ -486,24 +471,6 @@ tf_py_test( ], ) -tf_py_test( - name = "denormalization_test", - srcs = ["denormalization_test.py"], - python_version = "PY3", - shard_count = 4, - tags = [ - "noasan", # TODO(b/337374867) fails with -fsanitize=null - ], - deps = [ - ":denormalization", - ":preprocessing_test_utils", - "//:expect_absl_installed", - "//:expect_tensorflow_installed", - "//keras", - "//keras/testing_infra:test_combinations", - ], -) - tf_py_test( name = "integer_lookup_test", srcs = ["integer_lookup_test.py"], diff --git a/keras/layers/preprocessing/denormalization.py b/keras/layers/preprocessing/denormalization.py deleted file mode 100644 index 45f4838d80d..00000000000 --- a/keras/layers/preprocessing/denormalization.py +++ /dev/null @@ -1,326 +0,0 @@ -# Copyright 2019 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Denormalization preprocessing layer.""" - -# pylint: disable=g-classes-have-attributes -# pylint: disable=g-direct-tensorflow-import - -from keras import backend -from keras.engine import base_preprocessing_layer -from keras.layers.preprocessing import preprocessing_utils as utils -import numpy as np -import tensorflow.compat.v2 as tf -from tensorflow.python.util.tf_export import keras_export - - -@keras_export('keras.layers.Denormalization', - 'keras.layers.experimental.preprocessing.Denormalization') -class Denormalization(base_preprocessing_layer.PreprocessingLayer): - """A preprocessing layer which denormalizes continuous features. - - This layer will shift and scale inputs into a distribution of training data. It accomplishes this by precomputing the mean and - variance of the data, and calling `(input + mean) * sqrt(var)` at runtime. - - The mean and variance values for the layer must be either supplied on - construction or learned via `adapt()`. `adapt()` will compute the mean and - variance of the data and store them as the layer's weights. `adapt()` should - be called before `fit()`, `evaluate()`, or `predict()`. - - For an overview and full list of preprocessing layers, see the preprocessing - [guide](https://www.tensorflow.org/guide/keras/preprocessing_layers). - - Args: - axis: Integer, tuple of integers, or None. The axis or axes that should - have a separate mean and variance for each index in the shape. For - example, if shape is `(None, 5)` and `axis=1`, the layer will track 5 - separate mean and variance values for the last axis. If `axis` is set to - `None`, the layer will normalize all elements in the input by a scalar - mean and variance. Defaults to -1, where the last axis of the input is - assumed to be a feature dimension and is normalized per index. Note that - in the specific case of batched scalar inputs where the only axis is the - batch axis, the default will normalize each index in the batch - separately. In this case, consider passing `axis=None`. - mean: The mean value(s) to use during denormalization. The passed value(s) - will be broadcast to the shape of the kept axes above; if the value(s) - cannot be broadcast, an error will be raised when this layer's `build()` - method is called. - variance: The variance value(s) to use during denormalization. The passed - value(s) will be broadcast to the shape of the kept axes above; if the - value(s) cannot be broadcast, an error will be raised when this layer's - `build()` method is called. - - Examples: - - Calculate a global mean and variance by analyzing the dataset in `adapt()`. - - >>> adapt_data = np.array([1., 2., 3., 4., 5.], dtype='float32') - >>> input_data = np.array([1., 2., 3.], dtype='float32') - >>> layer = tf.keras.layers.Denormalization(axis=None) - >>> layer.adapt(adapt_data) - >>> layer(input_data) - - - Calculate a mean and variance for each index on the last axis. - - >>> adapt_data = np.array([[0., 7., 4.], - ... [2., 9., 6.], - ... [0., 7., 4.], - ... [2., 9., 6.]], dtype='float32') - >>> input_data = np.array([[0., 7., 4.]], dtype='float32') - >>> layer = tf.keras.layers.Denormalization(axis=-1) - >>> layer.adapt(adapt_data) - >>> layer(input_data) - - - Pass the mean and variance directly. - - >>> input_data = np.array([[1.], [2.], [3.]], dtype='float32') - >>> layer = tf.keras.layers.Denormalization(mean=3., variance=2.) - >>> layer(input_data) - - """ - - def __init__(self, axis=-1, mean=None, variance=None, **kwargs): - super().__init__(**kwargs) - base_preprocessing_layer.keras_kpl_gauge.get_cell('Denormalization').set(True) - - # Standardize `axis` to a tuple. - if axis is None: - axis = () - elif isinstance(axis, int): - axis = (axis,) - else: - axis = tuple(axis) - self.axis = axis - - # Set `mean` and `variance` if passed. - if isinstance(mean, tf.Variable): - raise ValueError('Denormalization does not support passing a Variable ' - 'for the `mean` init arg.') - if isinstance(variance, tf.Variable): - raise ValueError('Denormalization does not support passing a Variable ' - 'for the `variance` init arg.') - if (mean is not None) != (variance is not None): - raise ValueError( - 'When setting values directly, both `mean` and `variance` ' - 'must be set. Got mean: {} and variance: {}'.format(mean, variance)) - self.input_mean = mean - self.input_variance = variance - - def build(self, input_shape): - super().build(input_shape) - - if (isinstance(input_shape, (list, tuple)) and - all(isinstance(shape, tf.TensorShape) for shape in input_shape)): - raise ValueError('Denormalization only accepts a single input. If you are ' - 'passing a python list or tuple as a single input, ' - 'please convert to a numpy array or `tf.Tensor`.') - - input_shape = tf.TensorShape(input_shape).as_list() - ndim = len(input_shape) - - if any(a < -ndim or a >= ndim for a in self.axis): - raise ValueError('All `axis` values must be in the range [-ndim, ndim). ' - 'Found ndim: `{}`, axis: {}'.format(ndim, self.axis)) - - # Axes to be kept, replacing negative values with positive equivalents. - # Sorted to avoid transposing axes. - self._keep_axis = sorted([d if d >= 0 else d + ndim for d in self.axis]) - # All axes to be kept should have known shape. - for d in self._keep_axis: - if input_shape[d] is None: - raise ValueError( - 'All `axis` values to be kept must have known shape. Got axis: {}, ' - 'input shape: {}, with unknown axis at index: {}'.format( - self.axis, input_shape, d)) - # Axes to be reduced. - self._reduce_axis = [d for d in range(ndim) if d not in self._keep_axis] - # 1 if an axis should be reduced, 0 otherwise. - self._reduce_axis_mask = [ - 0 if d in self._keep_axis else 1 for d in range(ndim) - ] - # Broadcast any reduced axes. - self._broadcast_shape = [ - input_shape[d] if d in self._keep_axis else 1 for d in range(ndim) - ] - mean_and_var_shape = tuple(input_shape[d] for d in self._keep_axis) - - if self.input_mean is None: - self.adapt_mean = self.add_weight( - name='mean', - shape=mean_and_var_shape, - dtype=self.compute_dtype, - initializer='zeros', - trainable=False) - self.adapt_variance = self.add_weight( - name='variance', - shape=mean_and_var_shape, - dtype=self.compute_dtype, - initializer='ones', - trainable=False) - self.count = self.add_weight( - name='count', - shape=(), - dtype=tf.int64, - initializer='zeros', - trainable=False) - self.finalize_state() - else: - # In the no adapt case, make constant tensors for mean and variance with - # proper broadcast shape for use during call. - mean = self.input_mean * np.ones(mean_and_var_shape) - variance = self.input_variance * np.ones(mean_and_var_shape) - mean = tf.reshape(mean, self._broadcast_shape) - variance = tf.reshape(variance, self._broadcast_shape) - self.mean = tf.cast(mean, self.compute_dtype) - self.variance = tf.cast(variance, self.compute_dtype) - - # We override this method solely to generate a docstring. - def adapt(self, data, batch_size=None, steps=None): - """Computes the mean and variance of values in a dataset. - - Calling `adapt()` on a `Denormalization` layer is an alternative to passing in - `mean` and `variance` arguments during layer construction. A `Denormalization` - layer should always either be adapted over a dataset or passed `mean` and - `variance`. - - During `adapt()`, the layer will compute a `mean` and `variance` separately - for each position in each axis specified by the `axis` argument. To - calculate a single `mean` and `variance` over the input data, simply pass - `axis=None`. - - In order to make `Denormalization` efficient in any distribution context, the - computed mean and variance are kept static with respect to any compiled - `tf.Graph`s that call the layer. As a consequence, if the layer is adapted a - second time, any models using the layer should be re-compiled. For more - information see - `tf.keras.layers.experimental.preprocessing.PreprocessingLayer.adapt`. - - `adapt()` is meant only as a single machine utility to compute layer state. - To analyze a dataset that cannot fit on a single machine, see - [Tensorflow Transform](https://www.tensorflow.org/tfx/transform/get_started) - for a multi-machine, map-reduce solution. - - Arguments: - data: The data to train on. It can be passed either as a - `tf.data.Dataset`, or as a numpy array. - batch_size: Integer or `None`. - Number of samples per state update. - If unspecified, `batch_size` will default to 32. - Do not specify the `batch_size` if your data is in the - form of datasets, generators, or `keras.utils.Sequence` instances - (since they generate batches). - steps: Integer or `None`. - Total number of steps (batches of samples) - When training with input tensors such as - TensorFlow data tensors, the default `None` is equal to - the number of samples in your dataset divided by - the batch size, or 1 if that cannot be determined. If x is a - `tf.data` dataset, and 'steps' is None, the epoch will run until - the input dataset is exhausted. When passing an infinitely - repeating dataset, you must specify the `steps` argument. This - argument is not supported with array inputs. - """ - super().adapt(data, batch_size=batch_size, steps=steps) - - def update_state(self, data): - if self.input_mean is not None: - raise ValueError( - 'Cannot `adapt` a Denormalization layer that is initialized with ' - 'static `mean` and `variance`, you passed mean {} and variance {}.' - .format(self.input_mean, self.input_variance)) - - if not self.built: - raise RuntimeError('`build` must be called before `update_state`.') - - data = self._standardize_inputs(data) - data = tf.cast(data, self.adapt_mean.dtype) - batch_mean, batch_variance = tf.nn.moments(data, axes=self._reduce_axis) - batch_shape = tf.shape(data, out_type=self.count.dtype) - if self._reduce_axis: - batch_reduce_shape = tf.gather(batch_shape, self._reduce_axis) - batch_count = tf.reduce_prod(batch_reduce_shape) - else: - batch_count = 1 - - total_count = batch_count + self.count - batch_weight = ( - tf.cast(batch_count, dtype=self.compute_dtype) / - tf.cast(total_count, dtype=self.compute_dtype)) - existing_weight = 1. - batch_weight - - total_mean = self.adapt_mean * existing_weight + batch_mean * batch_weight - # The variance is computed using the lack-of-fit sum of squares - # formula (see https://en.wikipedia.org/wiki/Lack-of-fit_sum_of_squares). - total_variance = ((self.adapt_variance + - (self.adapt_mean - total_mean)**2) * existing_weight + - (batch_variance + - (batch_mean - total_mean)**2) * batch_weight) - self.adapt_mean.assign(total_mean) - self.adapt_variance.assign(total_variance) - self.count.assign(total_count) - - def reset_state(self): # pylint: disable=method-hidden - if self.input_mean is not None or not self.built: - return - - self.adapt_mean.assign(tf.zeros_like(self.adapt_mean)) - self.adapt_variance.assign(tf.ones_like(self.adapt_variance)) - self.count.assign(tf.zeros_like(self.count)) - - def finalize_state(self): - if self.input_mean is not None or not self.built: - return - - # In the adapt case, we make constant tensors for mean and variance with - # proper broadcast shape and dtype each time `finalize_state` is called. - self.mean = tf.reshape(self.adapt_mean, self._broadcast_shape) - self.mean = tf.cast(self.mean, self.compute_dtype) - self.variance = tf.reshape(self.adapt_variance, self._broadcast_shape) - self.variance = tf.cast(self.variance, self.compute_dtype) - - def call(self, inputs): - inputs = self._standardize_inputs(inputs) - # The base layer automatically casts floating-point inputs, but we - # explicitly cast here to also allow integer inputs to be passed - inputs = tf.cast(inputs, self.compute_dtype) - return ((inputs + self.mean) * - tf.maximum(tf.sqrt(self.variance), backend.epsilon())) - - def compute_output_shape(self, input_shape): - return input_shape - - def compute_output_signature(self, input_spec): - return input_spec - - def get_config(self): - config = super().get_config() - config.update({ - 'axis': self.axis, - 'mean': utils.listify_tensors(self.input_mean), - 'variance': utils.listify_tensors(self.input_variance), - }) - return config - - def _standardize_inputs(self, inputs): - inputs = tf.convert_to_tensor(inputs) - if inputs.dtype != self.compute_dtype: - inputs = tf.cast(inputs, self.compute_dtype) - return inputs diff --git a/keras/layers/preprocessing/denormalization_test.py b/keras/layers/preprocessing/denormalization_test.py deleted file mode 100644 index a3b79d2467f..00000000000 --- a/keras/layers/preprocessing/denormalization_test.py +++ /dev/null @@ -1,413 +0,0 @@ -# Copyright 2019 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Tests for keras.layers.preprocessing.denormalization.""" - -import tensorflow.compat.v2 as tf - -import os - -from absl.testing import parameterized - -import numpy as np - -import keras -from keras.testing_infra import test_combinations -from keras.testing_infra import test_utils -from keras.layers.preprocessing import denormalization -from keras.layers.preprocessing import preprocessing_test_utils -from keras.mixed_precision import policy - - -def _get_layer_computation_test_cases(): - test_cases = ({ - "adapt_data": np.array([[1.], [2.], [3.], [4.], [5.]], dtype=np.float32), - "axis": -1, - "test_data": np.array([[1.], [2.], [3.]], np.float32), - "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), - "testcase_name": "2d_single_element" - }, { - "adapt_data": np.array([[1], [2], [3], [4], [5]], dtype=np.int32), - "axis": -1, - "test_data": np.array([[1], [2], [3]], np.int32), - "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), - "testcase_name": "2d_int_data" - }, { - "adapt_data": np.array([[1.], [2.], [3.], [4.], [5.]], dtype=np.float32), - "axis": None, - "test_data": np.array([[1.], [2.], [3.]], np.float32), - "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), - "testcase_name": "2d_single_element_none_axis" - }, { - "adapt_data": np.array([[1., 2., 3., 4., 5.]], dtype=np.float32), - "axis": None, - "test_data": np.array([[1.], [2.], [3.]], np.float32), - "expected": np.array([[-1.414214], [-.707107], [0]], np.float32), - "testcase_name": "2d_single_element_none_axis_flat_data" - }, { - "adapt_data": - np.array([[[1., 2., 3.], [2., 3., 4.]], [[3., 4., 5.], [4., 5., 6.]]], - np.float32), - "axis": - 1, - "test_data": - np.array([[[1., 2., 3.], [2., 3., 4.]], [[3., 4., 5.], [4., 5., 6.]]], - np.float32), - "expected": - np.array([[[-1.549193, -0.774597, 0.], [-1.549193, -0.774597, 0.]], - [[0., 0.774597, 1.549193], [0., 0.774597, 1.549193]]], - np.float32), - "testcase_name": - "3d_internal_axis" - }, { - "adapt_data": - np.array( - [[[1., 0., 3.], [2., 3., 4.]], [[3., -1., 5.], [4., 5., 8.]]], - np.float32), - "axis": (1, 2), - "test_data": - np.array( - [[[3., 1., -1.], [2., 5., 4.]], [[3., 0., 5.], [2., 5., 8.]]], - np.float32), - "expected": - np.array( - [[[1., 3., -5.], [-1., 1., -1.]], [[1., 1., 1.], [-1., 1., 1.]]], - np.float32), - "testcase_name": - "3d_multiple_axis" - }, { - "adapt_data": - np.zeros((3, 4)), - "axis": -1, - "test_data": - np.zeros((3, 4)), - "expected": - np.zeros((3, 4)), - "testcase_name": - "zero_variance" - }) - - crossed_test_cases = [] - # Cross above test cases with use_dataset in (True, False) - for use_dataset in (True, False): - for case in test_cases: - case = case.copy() - if use_dataset: - case["testcase_name"] = case["testcase_name"] + "_with_dataset" - case["use_dataset"] = use_dataset - crossed_test_cases.append(case) - - return crossed_test_cases - - -@test_combinations.run_all_keras_modes -class NormalizationTest(test_combinations.TestCase, - preprocessing_test_utils.PreprocessingLayerTest): - - def test_broadcasting_during_direct_setting(self): - layer = denormalization.Denormalization(axis=-1, mean=[1.0], variance=[2.0]) - output = layer(np.array([[1., 2.]])) - expected_output = [[4., 6.]] - self.assertAllClose(output, expected_output) - self.assertAllClose(layer.get_weights(), []) - - def test_broadcasting_during_direct_setting_with_tensors(self): - if not tf.executing_eagerly(): - self.skipTest("Only supported in TF2.") - - layer = denormalization.Denormalization( - axis=-1, - mean=tf.constant([1.0]), - variance=tf.constant([2.0])) - output = layer(np.array([[1., 2.]])) - expected_output = [[4., 6.]] - self.assertAllClose(output, expected_output) - self.assertAllClose(layer.get_weights(), []) - - def test_1d_data(self): - data = np.array([0., 2., 0., 2.]) - layer = denormalization.Denormalization(mean=1.0, variance=2.0) - output = layer(data) - self.assertListEqual(output.shape.as_list(), [4]) - self.assertAllClose(output, [2, 6, -2, 6]) - - def test_0d_data(self): - layer = denormalization.Denormalization(axis=None, mean=1.0, variance=2.0) - output = layer(0.) - self.assertListEqual(output.shape.as_list(), []) - self.assertAllClose(output, 2) - - def test_broadcasting_during_direct_setting_with_variables_fails(self): - with self.assertRaisesRegex(ValueError, "passing a Variable"): - _ = denormalization.Denormalization( - axis=-1, - mean=tf.Variable([1.0]), - variance=tf.Variable([2.0])) - - def test_keeping_an_unknown_axis_fails(self): - layer = denormalization.Denormalization(axis=-1) - with self.assertRaisesRegex(ValueError, "axis.*must have known shape"): - layer.build([None]) - - @parameterized.parameters( - # Out of bounds - {"axis": 3}, - {"axis": -4}, - # In a tuple - {"axis": (1, 3)}, - {"axis": (1, -4)}, - ) - def test_bad_axis_fail_build(self, axis): - layer = denormalization.Denormalization(axis=axis) - with self.assertRaisesRegex(ValueError, "in the range"): - layer.build([None, 2, 3]) - - def test_list_input(self): - with self.assertRaisesRegex( - ValueError, ("Normalization only accepts a single input. If you are " - "passing a python list or tuple as a single input, " - "please convert to a numpy array or `tf.Tensor`.")): - denormalization.Denormalization()([1, 2, 3]) - - def test_scalar_input(self): - with self.assertRaisesRegex(ValueError, - "axis.*values must be in the range"): - denormalization.Denormalization()(1) - - def test_output_dtype(self): - if not tf.__internal__.tf2.enabled(): - self.skipTest("set_global_policy only supported in TF2.") - # Output should respect an explicit dtype, and default to the global policy. - policy.set_global_policy("float64") - input_data = keras.Input(batch_size=16, shape=(1,)) - layer = denormalization.Denormalization(mean=1.0, variance=2.0, dtype="float16") - output = layer(input_data) - self.assertAllEqual(output.dtype, tf.float16) - layer = denormalization.Denormalization(mean=1.0, variance=2.0) - output = layer(input_data) - self.assertAllEqual(output.dtype, tf.float64) - - -@test_combinations.run_all_keras_modes(always_skip_v1=True) -class NormalizationAdaptTest(test_combinations.TestCase, - preprocessing_test_utils.PreprocessingLayerTest): - - def test_layer_api_compatibility(self): - cls = denormalization.Denormalization - output_data = test_utils.layer_test( - cls, - kwargs={"axis": -1}, - input_shape=(None, 3), - input_data=np.array([[3, 1, 2], [6, 5, 4]], dtype=np.float32), - validate_training=False, - adapt_data=np.array([[1, 2, 1], [2, 3, 4], [1, 2, 1], [2, 3, 4]])) - expected = np.array([[3., -3., -0.33333333], [9., 5., 1.]]) - self.assertAllClose(expected, output_data) - - @parameterized.named_parameters(*_get_layer_computation_test_cases()) - def test_layer_computation(self, adapt_data, axis, test_data, use_dataset, - expected): - input_shape = tuple([test_data.shape[i] for i in range(1, test_data.ndim)]) - if use_dataset: - # Keras APIs expect batched datasets - adapt_data = tf.data.Dataset.from_tensor_slices(adapt_data).batch( - test_data.shape[0] // 2) - test_data = tf.data.Dataset.from_tensor_slices(test_data).batch( - test_data.shape[0] // 2) - - layer = denormalization.Denormalization(axis=axis) - layer.adapt(adapt_data) - - input_data = keras.Input(shape=input_shape) - output = layer(input_data) - model = keras.Model(input_data, output) - model._run_eagerly = test_utils.should_run_eagerly() - output_data = model.predict(test_data) - self.assertAllClose(expected, output_data) - - def test_1d_unbatched_adapt(self): - ds = tf.data.Dataset.from_tensor_slices([ - [2., 0., 2., 0.], - [0., 2., 0., 2.], - ]) - layer = denormalization.Denormalization(axis=-1) - layer.adapt(ds) - output_ds = ds.map(layer) - self.assertAllClose( - list(output_ds.as_numpy_iterator()), [ - [1., -1., 1., -1.], - [-1., 1., -1., 1.], - ]) - - def test_0d_unbatched_adapt(self): - ds = tf.data.Dataset.from_tensor_slices([2., 0., 2., 0.]) - layer = denormalization.Denormalization(axis=None) - layer.adapt(ds) - output_ds = ds.map(layer) - self.assertAllClose(list(output_ds.as_numpy_iterator()), [1., -1., 1., -1.]) - - @parameterized.parameters( - # Results should be identical no matter how the axes are specified (3d). - {"axis": (1, 2)}, - {"axis": (2, 1)}, - {"axis": (1, -1)}, - {"axis": (-1, 1)}, - ) - def test_axis_permutations(self, axis): - layer = denormalization.Denormalization(axis=axis) - # data.shape = [2, 2, 3] - data = np.array([[[0., 1., 2.], [0., 2., 6.]], - [[2., 3., 4.], [3., 6., 10.]]]) - expect = np.array([[[-1., -1., -1.], [-1., -1., -1.]], - [[1., 1., 1.], [1., 1., 1.]]]) - layer.adapt(data) - self.assertAllClose(expect, layer(data)) - - def test_model_summary_after_layer_adapt(self): - data = np.array([[[0., 1., 2.], [0., 2., 6.]], - [[2., 3., 4.], [3., 6., 10.]]]) - layer = denormalization.Denormalization(axis=-1) - layer.adapt(data) - model = keras.Sequential( - [layer, - keras.layers.Dense(64, activation="relu"), - keras.layers.Dense(1)]) - model.summary() - - def test_multiple_adapts(self): - first_adapt = [[0], [2], [0], [2]] - second_adapt = [[2], [4], [2], [4]] - predict_input = [[2], [2]] - expected_first_output = [[1], [1]] - expected_second_output = [[-1], [-1]] - - inputs = keras.Input(shape=(1,), dtype=tf.int32) - layer = denormalization.Denormalization(axis=-1) - layer.adapt(first_adapt) - outputs = layer(inputs) - model = keras.Model(inputs=inputs, outputs=outputs) - - actual_output = model.predict(predict_input) - self.assertAllClose(actual_output, expected_first_output) - - # Re-adapt the layer on new inputs. - layer.adapt(second_adapt) - # Re-compile the model. - model.compile() - # `predict` should now use the new model state. - actual_output = model.predict(predict_input) - self.assertAllClose(actual_output, expected_second_output) - - @parameterized.parameters( - {"adapted": True}, - {"adapted": False}, - ) - def test_saved_model_tf(self, adapted): - input_data = [[0.], [2.], [0.], [2.]] - expected_output = [[-1.], [1.], [-1.], [1.]] - - inputs = keras.Input(shape=(1,), dtype=tf.float32) - if adapted: - layer = denormalization.Normalization(axis=-1) - layer.adapt(input_data) - else: - layer = denormalization.Normalization(mean=1., variance=2.) - outputs = layer(inputs) - model = keras.Model(inputs=inputs, outputs=outputs) - - output_data = model.predict(input_data) - self.assertAllClose(output_data, expected_output) - - # Save the model to disk. - output_path = os.path.join(self.get_temp_dir(), "tf_saved_model") - tf.saved_model.save(model, output_path) - loaded_model = tf.saved_model.load(output_path) - f = loaded_model.signatures["serving_default"] - - # Ensure that the loaded model is unique (so that the save/load is real) - self.assertIsNot(model, loaded_model) - - # Validate correctness of the new model. - new_output_data = f(tf.constant(input_data))["normalization"] - self.assertAllClose(new_output_data, expected_output) - - @parameterized.product( - save_format=["tf", "h5"], - adapt=[True, False], - ) - def test_saved_model_keras(self, save_format, adapt): - input_data = [[0.], [2.], [0.], [2.]] - expected_output = [[-1.], [1.], [-1.], [1.]] - - cls = denormalization.Denormalization - inputs = keras.Input(shape=(1,), dtype=tf.float32) - if adapt: - layer = cls(axis=-1) - layer.adapt(input_data) - else: - layer = cls(mean=1., variance=2.) - outputs = layer(inputs) - model = keras.Model(inputs=inputs, outputs=outputs) - - output_data = model.predict(input_data) - self.assertAllClose(output_data, expected_output) - - # Save the model to disk. - output_path = os.path.join(self.get_temp_dir(), "tf_keras_saved_model") - model.save(output_path, save_format=format) - loaded_model = keras.models.load_model( - output_path, custom_objects={"Normalization": cls}) - - # Ensure that the loaded model is unique (so that the save/load is real) - self.assertIsNot(model, loaded_model) - - # Validate correctness of the new model. - new_output_data = loaded_model.predict(input_data) - self.assertAllClose(new_output_data, expected_output) - - @parameterized.parameters( - {"adapted": True}, - {"adapted": False}, - ) - def test_saved_weights_keras(self, adapted): - input_data = [[0.], [2.], [0.], [2.]] - expected_output = [[-1.], [1.], [-1.], [1.]] - - cls = denormalization.Denormalization - inputs = keras.Input(shape=(1,), dtype=tf.float32) - if adapted: - layer = cls(axis=-1) - layer.adapt(input_data) - else: - layer = cls(mean=1., variance=2.) - outputs = layer(inputs) - model = keras.Model(inputs=inputs, outputs=outputs) - - output_data = model.predict(input_data) - self.assertAllClose(output_data, expected_output) - - # Save the model to disk. - output_path = os.path.join(self.get_temp_dir(), "tf_keras_saved_weights") - model.save_weights(output_path, save_format="tf") - new_model = keras.Model.from_config( - model.get_config(), custom_objects={"Normalization": cls}) - new_model.load_weights(output_path) - - # Validate correctness of the new model. - new_output_data = new_model.predict(input_data) - self.assertAllClose(new_output_data, expected_output) - - -if __name__ == "__main__": - tf.test.main() diff --git a/keras/layers/preprocessing/normalization.py b/keras/layers/preprocessing/normalization.py index be052a59ec5..4ecd2e98586 100644 --- a/keras/layers/preprocessing/normalization.py +++ b/keras/layers/preprocessing/normalization.py @@ -61,6 +61,7 @@ class Normalization(base_preprocessing_layer.PreprocessingLayer): value(s) will be broadcast to the shape of the kept axes above; if the value(s) cannot be broadcast, an error will be raised when this layer's `build()` method is called. + invert: If True, this layer will return the denormalized values of inputs. Default to False. Examples: @@ -98,7 +99,7 @@ class Normalization(base_preprocessing_layer.PreprocessingLayer): [ 0. ]], dtype=float32)> """ - def __init__(self, axis=-1, mean=None, variance=None, **kwargs): + def __init__(self, axis=-1, mean=None, variance=None, invert=False, **kwargs): super().__init__(**kwargs) base_preprocessing_layer.keras_kpl_gauge.get_cell('Normalization').set(True) @@ -124,6 +125,7 @@ def __init__(self, axis=-1, mean=None, variance=None, **kwargs): 'must be set. Got mean: {} and variance: {}'.format(mean, variance)) self.input_mean = mean self.input_variance = variance + self.invert = invert def build(self, input_shape): super().build(input_shape) @@ -302,7 +304,11 @@ def call(self, inputs): # The base layer automatically casts floating-point inputs, but we # explicitly cast here to also allow integer inputs to be passed inputs = tf.cast(inputs, self.compute_dtype) - return ((inputs - self.mean) / + if self.invert: + return ((inputs + self.mean) * + tf.maximum(tf.sqrt(self.variance), backend.epsilon())) + else: + return ((inputs - self.mean) / tf.maximum(tf.sqrt(self.variance), backend.epsilon())) def compute_output_shape(self, input_shape): diff --git a/keras/layers/preprocessing/normalization_test.py b/keras/layers/preprocessing/normalization_test.py index 4edf789089b..9fc2521ac4c 100644 --- a/keras/layers/preprocessing/normalization_test.py +++ b/keras/layers/preprocessing/normalization_test.py @@ -198,6 +198,13 @@ def test_output_dtype(self): output = layer(input_data) self.assertAllEqual(output.dtype, tf.float64) + def test_invert(self): + data = np.array([0., 2., 0., 2.]) + layer = normalization.Normalization(mean=2.0, variance=3.0, invert=True) + output = layer(data) + self.assertListEqual(output.shape.as_list(), [4]) + self.assertAllClose(output, [6, 12, 6, 12]) + @test_combinations.run_all_keras_modes(always_skip_v1=True) class NormalizationAdaptTest(test_combinations.TestCase, From 6d22887761f2cd6d695b4eac9b0bf6b0256087c8 Mon Sep 17 00:00:00 2001 From: Martin Kubovcik Date: Mon, 4 Apr 2022 20:37:36 +0200 Subject: [PATCH 3/5] better unit test --- keras/layers/preprocessing/normalization_test.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/keras/layers/preprocessing/normalization_test.py b/keras/layers/preprocessing/normalization_test.py index 9fc2521ac4c..472c4a21341 100644 --- a/keras/layers/preprocessing/normalization_test.py +++ b/keras/layers/preprocessing/normalization_test.py @@ -200,10 +200,12 @@ def test_output_dtype(self): def test_invert(self): data = np.array([0., 2., 0., 2.]) - layer = normalization.Normalization(mean=2.0, variance=3.0, invert=True) + layer = normalization.Normalization(mean=1.0, variance=1.0) + layer2 = normalization.Normalization(mean=1.0, variance=1.0, invert=True) output = layer(data) - self.assertListEqual(output.shape.as_list(), [4]) - self.assertAllClose(output, [6, 12, 6, 12]) + output2 = layer2(output) + self.assertListEqual(output2.shape.as_list(), [4]) + self.assertAllClose(output2, [0., 2., 0., 2.]) @test_combinations.run_all_keras_modes(always_skip_v1=True) From 3374be7d09a1e946e0f3b52bad502dae4988b29a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bc=2E=20Martin=20Kubov=C4=8D=C3=ADk?= <74611856+markub3327@users.noreply.github.com> Date: Tue, 5 Apr 2022 08:38:22 +0200 Subject: [PATCH 4/5] Update normalization_test.py --- .../preprocessing/normalization_test.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/keras/layers/preprocessing/normalization_test.py b/keras/layers/preprocessing/normalization_test.py index 472c4a21341..79cf334d3c8 100644 --- a/keras/layers/preprocessing/normalization_test.py +++ b/keras/layers/preprocessing/normalization_test.py @@ -200,13 +200,22 @@ def test_output_dtype(self): def test_invert(self): data = np.array([0., 2., 0., 2.]) - layer = normalization.Normalization(mean=1.0, variance=1.0) - layer2 = normalization.Normalization(mean=1.0, variance=1.0, invert=True) - output = layer(data) - output2 = layer2(output) + norm = normalization.Normalization(mean=1.0, variance=1.0) + inv_norm = normalization.Normalization(mean=1.0, variance=1.0, invert=True) + output = norm(data) + output2 = inv_norm(output) self.assertListEqual(output2.shape.as_list(), [4]) self.assertAllClose(output2, [0., 2., 0., 2.]) - + + def test_invert_adapt(self): + input_data = [[0.], [2.], [0.], [2.]] + norm = keras.layers.Normalization(axis=-1) + norm.adapt(input_data) + inv_norm = keras.layers.Normalization(axis=-1, invert=True) + inv_norm.adapt(input_data) + output = norm(input_data) + output2 = inv_norm(output) + self.assertAllClose(input_data, output2) @test_combinations.run_all_keras_modes(always_skip_v1=True) class NormalizationAdaptTest(test_combinations.TestCase, From 3a3d7b6c59e8f46eeeac617c484cd0d11a3c5c58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bc=2E=20Martin=20Kubov=C4=8D=C3=ADk?= <74611856+markub3327@users.noreply.github.com> Date: Wed, 6 Apr 2022 08:47:51 +0200 Subject: [PATCH 5/5] Update normalization.py --- keras/layers/preprocessing/normalization.py | 23 +++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/keras/layers/preprocessing/normalization.py b/keras/layers/preprocessing/normalization.py index 4ecd2e98586..ef2252d7328 100644 --- a/keras/layers/preprocessing/normalization.py +++ b/keras/layers/preprocessing/normalization.py @@ -97,6 +97,29 @@ class Normalization(base_preprocessing_layer.PreprocessingLayer): array([[-1.4142135 ], [-0.70710677], [ 0. ]], dtype=float32)> + + Using it in the invert manner for denormalizing the inputs with calculating a mean and variance for each index on the last axis. + + >>> adapt_data = np.array([[0., 7., 4.], + ... [2., 9., 6.], + ... [0., 7., 4.], + ... [2., 9., 6.]], dtype='float32') + >>> input_data = np.array([[1., 2., 3.]], dtype='float32') + >>> layer = tf.keras.layers.Normalization(axis=-1, invert=True) + >>> layer.adapt(adapt_data) + >>> layer(input_data) + + + Using it in the invert manner for denormalizing the inputs with passing the mean and variance directly. + + >>> input_data = np.array([[-1.4142135], [-0.70710677], [0.]], dtype='float32') + >>> layer = tf.keras.layers.Normalization(mean=3., variance=2., invert=True) + >>> layer(input_data) + """ def __init__(self, axis=-1, mean=None, variance=None, invert=False, **kwargs):