diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index ad84e8db7c..6f6c2fb45c 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -106,6 +106,39 @@ def delete_entity_values(self, config: RepoConfig, join_keys: List[str]): logger.debug(f"Deleted {deleted_count} rows for entity {', '.join(join_keys)}") + def delete_table(self, config: RepoConfig, table: FeatureView): + """ + Delete all rows in Redis for a specific feature view + + Args: + config: Feast config + table: Feature view to delete + """ + client = self._get_client(config.online_store) + deleted_count = 0 + prefix = _redis_key_prefix(table.join_keys) + + redis_hash_keys = [_mmh3(f"{table.name}:{f.name}") for f in table.features] + redis_hash_keys.append(bytes(f"_ts:{table.name}", "utf8")) + + with client.pipeline(transaction=False) as pipe: + for _k in client.scan_iter( + b"".join([prefix, b"*", config.project.encode("utf8")]) + ): + _tables = { + _hk[4:] for _hk in client.hgetall(_k) if _hk.startswith(b"_ts:") + } + if bytes(table.name, "utf8") not in _tables: + continue + if len(_tables) == 1: + pipe.delete(_k) + else: + pipe.hdel(_k, *redis_hash_keys) + deleted_count += 1 + pipe.execute() + + logger.debug(f"Deleted {deleted_count} rows for feature view {table.name}") + @log_exceptions_and_usage(online_store="redis") def update( self, @@ -117,16 +150,19 @@ def update( partial: bool, ): """ - Look for join_keys (list of entities) that are not in use anymore - (usually this happens when the last feature view that was using specific compound key is deleted) - and remove all features attached to this "join_keys". + Delete data from feature views that are no longer in use. + + Args: + config: Feast config + tables_to_delete: Feature views to delete + tables_to_keep: Feature views to keep + entities_to_delete: Entities to delete + entities_to_keep: Entities to keep + partial: Whether to do a partial update """ - join_keys_to_keep = set(tuple(table.join_keys) for table in tables_to_keep) - join_keys_to_delete = set(tuple(table.join_keys) for table in tables_to_delete) - - for join_keys in join_keys_to_delete - join_keys_to_keep: - self.delete_entity_values(config, list(join_keys)) + for table in tables_to_delete: + self.delete_table(config, table) def teardown( self,