-
-
Notifications
You must be signed in to change notification settings - Fork 30.9k
/
translation.py
494 lines (413 loc) · 17 KB
/
translation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
"""Translation string lookup helpers."""
from __future__ import annotations
import asyncio
from collections.abc import Iterable, Mapping
from contextlib import suppress
from dataclasses import dataclass
import logging
import pathlib
import string
from typing import Any
from homeassistant.const import (
EVENT_CORE_CONFIG_UPDATE,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Event, HomeAssistant, async_get_hass, callback
from homeassistant.loader import (
Integration,
async_get_config_flows,
async_get_integrations,
bind_hass,
)
from homeassistant.util.json import load_json
from . import singleton
_LOGGER = logging.getLogger(__name__)
TRANSLATION_FLATTEN_CACHE = "translation_flatten_cache"
LOCALE_EN = "en"
def recursive_flatten(
prefix: str, data: dict[str, dict[str, Any] | str]
) -> dict[str, str]:
"""Return a flattened representation of dict data."""
output: dict[str, str] = {}
for key, value in data.items():
if isinstance(value, dict):
output.update(recursive_flatten(f"{prefix}{key}.", value))
else:
output[f"{prefix}{key}"] = value
return output
def _load_translations_files_by_language(
translation_files: dict[str, dict[str, pathlib.Path]],
) -> dict[str, dict[str, Any]]:
"""Load and parse translation.json files."""
loaded: dict[str, dict[str, Any]] = {}
for language, component_translation_file in translation_files.items():
loaded_for_language: dict[str, Any] = {}
loaded[language] = loaded_for_language
for component, translation_file in component_translation_file.items():
loaded_json = load_json(translation_file)
if not isinstance(loaded_json, dict):
_LOGGER.warning(
"Translation file is unexpected type %s. Expected dict for %s",
type(loaded_json),
translation_file,
)
continue
loaded_for_language[component] = loaded_json
return loaded
def build_resources(
translation_strings: dict[str, dict[str, dict[str, Any] | str]],
components: set[str],
category: str,
) -> dict[str, dict[str, Any] | str]:
"""Build the resources response for the given components."""
# Build response
return {
component: category_strings
for component in components
if (component_strings := translation_strings.get(component))
and (category_strings := component_strings.get(category))
}
async def _async_get_component_strings(
hass: HomeAssistant,
languages: Iterable[str],
components: set[str],
integrations: dict[str, Integration],
) -> dict[str, dict[str, Any]]:
"""Load translations."""
translations_by_language: dict[str, dict[str, Any]] = {}
# Determine paths of missing components/platforms
files_to_load_by_language: dict[str, dict[str, pathlib.Path]] = {}
loaded_translations_by_language: dict[str, dict[str, Any]] = {}
has_files_to_load = False
for language in languages:
file_name = f"{language}.json"
files_to_load: dict[str, pathlib.Path] = {
domain: integration.file_path / "translations" / file_name
for domain in components
if (
(integration := integrations.get(domain))
and integration.has_translations
)
}
files_to_load_by_language[language] = files_to_load
has_files_to_load |= bool(files_to_load)
if has_files_to_load:
loaded_translations_by_language = await hass.async_add_executor_job(
_load_translations_files_by_language, files_to_load_by_language
)
for language in languages:
loaded_translations = loaded_translations_by_language.setdefault(language, {})
for domain in components:
# Translations that miss "title" will get integration put in.
component_translations = loaded_translations.setdefault(domain, {})
if "title" not in component_translations and (
integration := integrations.get(domain)
):
component_translations["title"] = integration.name
translations_by_language.setdefault(language, {}).update(loaded_translations)
return translations_by_language
@dataclass(slots=True)
class _TranslationsCacheData:
"""Data for the translation cache.
This class contains data that is designed to be shared
between multiple instances of the translation cache so
we only have to load the data once.
"""
loaded: dict[str, set[str]]
cache: dict[str, dict[str, dict[str, dict[str, str]]]]
class _TranslationCache:
"""Cache for flattened translations."""
__slots__ = ("hass", "cache_data", "lock")
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the cache."""
self.hass = hass
self.cache_data = _TranslationsCacheData({}, {})
self.lock = asyncio.Lock()
@callback
def async_is_loaded(self, language: str, components: set[str]) -> bool:
"""Return if the given components are loaded for the language."""
return components.issubset(self.cache_data.loaded.get(language, set()))
async def async_load(
self,
language: str,
components: set[str],
) -> None:
"""Load resources into the cache."""
loaded = self.cache_data.loaded.setdefault(language, set())
if components_to_load := components - loaded:
# Translations are never unloaded so if there are no components to load
# we can skip the lock which reduces contention when multiple different
# translations categories are being fetched at the same time which is
# common from the frontend.
async with self.lock:
# Check components to load again, as another task might have loaded
# them while we were waiting for the lock.
if components_to_load := components - loaded:
await self._async_load(language, components_to_load)
async def async_fetch(
self,
language: str,
category: str,
components: set[str],
) -> dict[str, str]:
"""Load resources into the cache and return them."""
await self.async_load(language, components)
return self.get_cached(language, category, components)
def get_cached(
self,
language: str,
category: str,
components: set[str],
) -> dict[str, str]:
"""Read resources from the cache."""
category_cache = self.cache_data.cache.get(language, {}).get(category, {})
# If only one component was requested, return it directly
# to avoid merging the dictionaries and keeping additional
# copies of the same data in memory.
if len(components) == 1 and (component := next(iter(components))):
return category_cache.get(component, {})
result: dict[str, str] = {}
for component in components.intersection(category_cache):
result.update(category_cache[component])
return result
async def _async_load(self, language: str, components: set[str]) -> None:
"""Populate the cache for a given set of components."""
loaded = self.cache_data.loaded
_LOGGER.debug(
"Cache miss for %s: %s",
language,
components,
)
# Fetch the English resources, as a fallback for missing keys
languages = [LOCALE_EN] if language == LOCALE_EN else [LOCALE_EN, language]
integrations: dict[str, Integration] = {}
ints_or_excs = await async_get_integrations(self.hass, components)
for domain, int_or_exc in ints_or_excs.items():
if isinstance(int_or_exc, Exception):
_LOGGER.warning(
"Failed to load integration for translation: %s", int_or_exc
)
continue
integrations[domain] = int_or_exc
translation_by_language_strings = await _async_get_component_strings(
self.hass, languages, components, integrations
)
# English is always the fallback language so we load them first
self._build_category_cache(
language, components, translation_by_language_strings[LOCALE_EN]
)
if language != LOCALE_EN:
# Now overlay the requested language on top of the English
self._build_category_cache(
language, components, translation_by_language_strings[language]
)
loaded_english_components = loaded.setdefault(LOCALE_EN, set())
# Since we just loaded english anyway we can avoid loading
# again if they switch back to english.
if loaded_english_components.isdisjoint(components):
self._build_category_cache(
LOCALE_EN, components, translation_by_language_strings[LOCALE_EN]
)
loaded_english_components.update(components)
loaded[language].update(components)
def _validate_placeholders(
self,
language: str,
updated_resources: dict[str, str],
cached_resources: dict[str, str] | None = None,
) -> dict[str, str]:
"""Validate if updated resources have same placeholders as cached resources."""
if cached_resources is None:
return updated_resources
mismatches: set[str] = set()
for key, value in updated_resources.items():
if key not in cached_resources:
continue
try:
tuples = list(string.Formatter().parse(value))
except ValueError:
_LOGGER.error(
("Error while parsing localized (%s) string %s"), language, key
)
continue
updated_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
tuples = list(string.Formatter().parse(cached_resources[key]))
cached_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
if updated_placeholders != cached_placeholders:
_LOGGER.error(
(
"Validation of translation placeholders for localized (%s) string "
"%s failed: (%s != %s)"
),
language,
key,
updated_placeholders,
cached_placeholders,
)
mismatches.add(key)
for mismatch in mismatches:
del updated_resources[mismatch]
return updated_resources
@callback
def _build_category_cache(
self,
language: str,
components: set[str],
translation_strings: dict[str, dict[str, Any]],
) -> None:
"""Extract resources into the cache."""
resource: dict[str, Any] | str
cached = self.cache_data.cache.setdefault(language, {})
categories = {
category
for component in translation_strings.values()
for category in component
}
for category in categories:
new_resources = build_resources(translation_strings, components, category)
category_cache = cached.setdefault(category, {})
for component, resource in new_resources.items():
component_cache = category_cache.setdefault(component, {})
if not isinstance(resource, dict):
component_cache[f"component.{component}.{category}"] = resource
continue
prefix = f"component.{component}.{category}."
flat = recursive_flatten(prefix, resource)
flat = self._validate_placeholders(language, flat, component_cache)
component_cache.update(flat)
@bind_hass
async def async_get_translations(
hass: HomeAssistant,
language: str,
category: str,
integrations: Iterable[str] | None = None,
config_flow: bool | None = None,
) -> dict[str, str]:
"""Return all backend translations.
If integration is specified, load it for that one.
Otherwise, default to loaded integrations combined with config flow
integrations if config_flow is true.
"""
if integrations is None and config_flow:
components = (await async_get_config_flows(hass)) - hass.config.components
elif integrations is not None:
components = set(integrations)
else:
components = hass.config.top_level_components
return await _async_get_translations_cache(hass).async_fetch(
language, category, components
)
@callback
def async_get_cached_translations(
hass: HomeAssistant,
language: str,
category: str,
integration: str | None = None,
) -> dict[str, str]:
"""Return all cached backend translations.
If integration is specified, return translations for it.
Otherwise, default to all loaded integrations.
"""
components = {integration} if integration else hass.config.top_level_components
return _async_get_translations_cache(hass).get_cached(
language, category, components
)
@singleton.singleton(TRANSLATION_FLATTEN_CACHE)
def _async_get_translations_cache(hass: HomeAssistant) -> _TranslationCache:
"""Return the translation cache."""
return _TranslationCache(hass)
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Create translation cache and register listeners for translation loaders.
Listeners load translations for every loaded component and after config change.
"""
cache = _TranslationCache(hass)
current_language = hass.config.language
_async_get_translations_cache(hass)
@callback
def _async_load_translations_filter(event_data: Mapping[str, Any]) -> bool:
"""Filter out unwanted events."""
nonlocal current_language
if (
new_language := event_data.get("language")
) and new_language != current_language:
current_language = new_language
return True
return False
async def _async_load_translations(event: Event) -> None:
new_language = event.data["language"]
_LOGGER.debug("Loading translations for language: %s", new_language)
await cache.async_load(new_language, hass.config.components)
hass.bus.async_listen(
EVENT_CORE_CONFIG_UPDATE,
_async_load_translations,
event_filter=_async_load_translations_filter,
)
async def async_load_integrations(hass: HomeAssistant, integrations: set[str]) -> None:
"""Load translations for integrations."""
await _async_get_translations_cache(hass).async_load(
hass.config.language, integrations
)
@callback
def async_translations_loaded(hass: HomeAssistant, components: set[str]) -> bool:
"""Return if the given components are loaded for the language."""
return _async_get_translations_cache(hass).async_is_loaded(
hass.config.language, components
)
@callback
def async_get_exception_message(
translation_domain: str,
translation_key: str,
translation_placeholders: dict[str, str] | None = None,
) -> str:
"""Return a translated exception message.
Defaults to English, requires translations to already be cached.
"""
language = "en"
hass = async_get_hass()
localize_key = (
f"component.{translation_domain}.exceptions.{translation_key}.message"
)
translations = async_get_cached_translations(hass, language, "exceptions")
if localize_key in translations:
if message := translations[localize_key]:
message = message.rstrip(".")
if not translation_placeholders:
return message
with suppress(KeyError):
message = message.format(**translation_placeholders)
return message
# We return the translation key when was not found in the cache
return translation_key
@callback
def async_translate_state(
hass: HomeAssistant,
state: str,
domain: str,
platform: str | None,
translation_key: str | None,
device_class: str | None,
) -> str:
"""Translate provided state using cached translations for currently selected language."""
if state in [STATE_UNAVAILABLE, STATE_UNKNOWN]:
return state
language = hass.config.language
if platform is not None and translation_key is not None:
localize_key = (
f"component.{platform}.entity.{domain}.{translation_key}.state.{state}"
)
translations = async_get_cached_translations(hass, language, "entity")
if localize_key in translations:
return translations[localize_key]
translations = async_get_cached_translations(hass, language, "entity_component")
if device_class is not None:
localize_key = (
f"component.{domain}.entity_component.{device_class}.state.{state}"
)
if localize_key in translations:
return translations[localize_key]
localize_key = f"component.{domain}.entity_component._.state.{state}"
if localize_key in translations:
return translations[localize_key]
return state