forked from elastic/connectors
-
Notifications
You must be signed in to change notification settings - Fork 0
/
google_drive.py
1344 lines (1094 loc) · 52 KB
/
google_drive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import asyncio
from functools import cached_property, partial
from aiogoogle import HTTPError
from connectors.access_control import (
ACCESS_CONTROL,
es_access_control_query,
prefix_identity,
)
from connectors.es.sink import OP_DELETE, OP_INDEX
from connectors.source import (
CURSOR_SYNC_TIMESTAMP,
BaseDataSource,
ConfigurableFieldValueError,
)
from connectors.sources.google import (
GoogleServiceAccountClient,
UserFields,
load_service_account_json,
remove_universe_domain,
validate_service_account_json,
)
from connectors.utils import (
EMAIL_REGEX_PATTERN,
iso_zulu,
validate_email_address,
)
GOOGLE_DRIVE_SERVICE_NAME = "Google Drive"
GOOGLE_ADMIN_DIRECTORY_SERVICE_NAME = "Google Admin Directory"
CURSOR_GOOGLE_DRIVE_KEY = "google_drives"
RETRIES = 3
RETRY_INTERVAL = 2
GOOGLE_API_MAX_CONCURRENCY = 25 # Max open connections to Google API
DRIVE_API_TIMEOUT = 1 * 60 # 1 min
FOLDER_MIME_TYPE = "application/vnd.google-apps.folder"
DRIVE_ITEMS_FIELDS = "id,createdTime,driveId,modifiedTime,name,size,mimeType,fileExtension,webViewLink,owners,parents,trashed,trashedTime"
DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS = f"{DRIVE_ITEMS_FIELDS},permissions"
# Export Google Workspace documents to TIKA compatible format, prefer 'text/plain' where possible to be
# mindful of the content extraction service resources
GOOGLE_MIME_TYPES_MAPPING = {
"application/vnd.google-apps.document": "text/plain",
"application/vnd.google-apps.presentation": "text/plain",
"application/vnd.google-apps.spreadsheet": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
}
class SyncCursorEmpty(Exception):
"""Exception class to notify that incremental sync can't run because sync_cursor is empty."""
pass
class GoogleDriveClient(GoogleServiceAccountClient):
"""A google drive client to handle api calls made to Google Drive API."""
def __init__(self, json_credentials, subject=None):
"""Initialize the GoogleApiClient superclass.
Args:
json_credentials (dict): Service account credentials json.
"""
remove_universe_domain(json_credentials)
if subject:
json_credentials["subject"] = subject
super().__init__(
json_credentials=json_credentials,
api="drive",
api_version="v3",
scopes=[
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/drive.metadata.readonly",
],
api_timeout=DRIVE_API_TIMEOUT,
)
async def ping(self):
return await self.api_call(resource="about", method="get", fields="kind")
async def list_drives(self):
"""Fetch all shared drive (id, name) from Google Drive
Yields:
dict: Shared drive metadata.
"""
async for drive in self.api_call_paged(
resource="drives",
method="list",
fields="nextPageToken,drives(id,name)",
pageSize=100,
):
yield drive
async def get_all_drives(self):
"""Retrieves all shared drives from Google Drive
Returns:
dict: mapping between drive id and its name
"""
drives = {}
async for page in self.list_drives():
drives_chunk = page.get("drives", [])
for drive in drives_chunk:
drives[drive["id"]] = drive["name"]
return drives
async def list_folders(self):
"""Fetch all folders (id, name, parent) from Google Drive
Yields:
dict: Folder metadata.
"""
async for folder in self.api_call_paged(
resource="files",
method="list",
corpora="allDrives",
fields="nextPageToken,files(id,name,parents)",
q=f"mimeType='{FOLDER_MIME_TYPE}' and trashed=false",
includeItemsFromAllDrives=True,
supportsAllDrives=True,
pageSize=1000,
):
yield folder
async def get_all_folders(self):
"""Retrieves all folders from Google Drive
Returns:
dict: mapping between folder id and its (name, parents)
"""
folders = {}
async for page in self.list_folders():
folders_chunk = page.get("files", [])
for folder in folders_chunk:
folders[folder["id"]] = {
"name": folder["name"],
"parents": folder.get("parents", None),
}
return folders
async def list_files(self, fetch_permissions=False, last_sync_time=None):
"""Get files from Google Drive. Files can have any type.
Args:
include_permissions (bool): flag to select permissions in the request query
last_sync_time (str): time when last sync happened
Yields:
dict: Documents from Google Drive.
"""
files_fields = (
DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
if fetch_permissions
else DRIVE_ITEMS_FIELDS
)
if last_sync_time is None:
list_query = "trashed=false"
else:
list_query = f"trashed=true or modifiedTime > '{last_sync_time}' or createdTime > '{last_sync_time}'"
async for file in self.api_call_paged(
resource="files",
method="list",
corpora="allDrives",
q=list_query,
orderBy="modifiedTime desc",
fields=f"files({files_fields}),incompleteSearch,nextPageToken",
includeItemsFromAllDrives=True,
supportsAllDrives=True,
pageSize=100,
):
yield file
async def list_files_from_my_drive(
self, fetch_permissions=False, last_sync_time=None
):
"""Retrieves files from Google Drive, with an option to fetch permissions (DLS).
This function optimizes the retrieval process based on the 'fetch_permissions' flag.
If 'fetch_permissions' is True, the function filters for files the user can edit
("trashed=false and 'me' in writers") as permission fetching requires write access.
If 'fetch_permissions' is False, it simply filters out trashed files ("trashed=false"),
allowing a broader file retrieval.
Args:
include_permissions (bool): flag to select permissions in the request query
last_sync_time (str): time when last sync happened
Yields:
dict: Documents from Google Drive.
"""
if fetch_permissions and last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
list_query = f"(trashed=true or modifiedTime > '{last_sync_time}' or createdTime > '{last_sync_time}') and 'me' in writers"
elif fetch_permissions and not last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
# Google Drive API required write access to fetch file's permissions
list_query = "trashed=false and 'me' in writers"
elif not fetch_permissions and last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS
list_query = f"trashed=true or modifiedTime > '{last_sync_time}' or createdTime > '{last_sync_time}'"
else:
files_fields = DRIVE_ITEMS_FIELDS
list_query = "trashed=false"
async for file in self.api_call_paged(
resource="files",
method="list",
corpora="user",
q=list_query,
orderBy="modifiedTime desc",
fields=f"files({files_fields}),incompleteSearch,nextPageToken",
includeItemsFromAllDrives=False,
supportsAllDrives=False,
pageSize=100,
):
yield file
async def list_permissions(self, file_id):
"""Get permissions for a given file ID from Google Drive.
Args:
file_id (str): File ID
Yields:
dictionary: Permissions from Google Drive for a file.
"""
async for permission in self.api_call_paged(
resource="permissions",
method="list",
fileId=file_id,
fields="permissions(type,emailAddress,domain),nextPageToken",
supportsAllDrives=True,
pageSize=100,
):
yield permission
class GoogleAdminDirectoryClient(GoogleServiceAccountClient):
"""A google admin directory client to handle api calls made to Google Admin API."""
def __init__(self, json_credentials, subject):
"""Initialize the GoogleApiClient superclass.
Args:
json_credentials (dict): Service account credentials json.
subject (str): For service accounts with domain-wide delegation enabled. A user
account to impersonate - e.g "[email protected]"
"""
remove_universe_domain(json_credentials)
if subject:
json_credentials["subject"] = subject
super().__init__(
json_credentials=json_credentials,
api="admin",
api_version="directory_v1",
scopes=[
"https://www.googleapis.com/auth/admin.directory.group.readonly",
"https://www.googleapis.com/auth/admin.directory.user.readonly",
],
api_timeout=DRIVE_API_TIMEOUT,
)
self.domain = _get_domain_from_email(subject)
async def list_users(self):
"""Get files from Google Drive. Files can have any type.
Yields:
dict: Documents from Google Drive.
"""
async for user in self.api_call_paged(
resource="users",
method="list",
domain=self.domain,
fields="kind,users(id,name,primaryEmail),nextPageToken",
):
yield user
async def users(self):
async for users_page in self.list_users():
for user in users_page.get("users", []):
yield user
async def list_groups_for_user(self, user_id):
"""Get files from Google Drive. Files can have any type.
Yields:
dict: Documents from Google Drive.
"""
async for group in self.api_call_paged(
resource="groups",
method="list",
userKey=user_id,
fields="kind,groups(email),nextPageToken",
):
yield group
def _prefix_group(group):
return prefix_identity("group", group)
def _prefix_user(user):
return prefix_identity("user", user)
def _prefix_domain(domain):
return prefix_identity("domain", domain)
def _is_user_permission(permission_type):
return permission_type == "user"
def _is_group_permission(permission_type):
return permission_type == "group"
def _is_domain_permission(permission_type):
return permission_type == "domain"
def _is_anyone_permission(permission_type):
return permission_type == "anyone"
def _get_domain_from_email(email):
return email.split("@")[-1]
class GoogleDriveDataSource(BaseDataSource):
"""Google Drive"""
name = "Google Drive"
service_type = "google_drive"
dls_enabled = True
incremental_sync_enabled = True
def __init__(self, configuration):
"""Set up the data source.
Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
"""
super().__init__(configuration=configuration)
def _set_internal_logger(self):
if self._domain_wide_delegation_sync_enabled() or self._dls_enabled():
self.google_admin_directory_client.set_logger(self._logger)
@classmethod
def get_default_configuration(cls):
"""Get the default configuration for Google Drive.
Returns:
dict: Default configuration.
"""
return {
"service_account_credentials": {
"display": "textarea",
"label": "Google Drive service account JSON",
"sensitive": True,
"order": 1,
"tooltip": "This connectors authenticates as a service account to synchronize content from Google Drive.",
"type": "str",
},
"use_domain_wide_delegation_for_sync": {
"display": "toggle",
"label": "Use domain-wide delegation for data sync",
"order": 2,
"tooltip": "Enable domain-wide delegation to automatically sync content from all shared and personal drives in the Google workspace. This eliminates the need to manually share Google Drive data with your service account, though it may increase sync time. If disabled, only items and folders manually shared with the service account will be synced. Please refer to the connector documentation to ensure domain-wide delegation is correctly configured and has the appropriate scopes.",
"type": "bool",
"value": False,
},
"google_workspace_admin_email_for_data_sync": {
"depends_on": [
{"field": "use_domain_wide_delegation_for_sync", "value": True}
],
"display": "text",
"label": "Google Workspace admin email",
"order": 3,
"tooltip": "Provide the admin email to be used with domain-wide delegation for data sync. This email enables the connector to utilize the Admin Directory API for listing organization users. Please refer to the connector documentation to ensure domain-wide delegation is correctly configured and has the appropriate scopes.",
"type": "str",
"validations": [{"type": "regex", "constraint": EMAIL_REGEX_PATTERN}],
},
"google_workspace_email_for_shared_drives_sync": {
"depends_on": [
{"field": "use_domain_wide_delegation_for_sync", "value": True}
],
"display": "text",
"label": "Google Workspace email for syncing shared drives",
"order": 4,
"tooltip": "Provide the Google Workspace user email for discovery and syncing of shared drives. Only the shared drives this user has access to will be synced.",
"type": "str",
"validations": [{"type": "regex", "constraint": EMAIL_REGEX_PATTERN}],
},
"use_document_level_security": {
"display": "toggle",
"label": "Enable document level security",
"order": 5,
"tooltip": "Document level security ensures identities and permissions set in Google Drive are maintained in Elasticsearch. This enables you to restrict and personalize read-access users and groups have to documents in this index. Access control syncs ensure this metadata is kept up to date in your Elasticsearch documents.",
"type": "bool",
"value": False,
},
"google_workspace_admin_email": {
"depends_on": [
{"field": "use_document_level_security", "value": True},
{"field": "use_domain_wide_delegation_for_sync", "value": False},
],
"display": "text",
"label": "Google Workspace admin email",
"order": 6,
"tooltip": "In order to use Document Level Security you need to enable Google Workspace domain-wide delegation of authority for your service account. A service account with delegated authority can impersonate admin user with sufficient permissions to fetch all users and their corresponding permissions. Please refer to the connector documentation to ensure domain-wide delegation is correctly configured and has the appropriate scopes.",
"type": "str",
"validations": [{"type": "regex", "constraint": EMAIL_REGEX_PATTERN}],
},
"max_concurrency": {
"default_value": GOOGLE_API_MAX_CONCURRENCY,
"display": "numeric",
"label": "Maximum concurrent HTTP requests",
"order": 7,
"required": False,
"tooltip": "This setting determines the maximum number of concurrent HTTP requests sent to the Google API to fetch data. Increasing this value can improve data retrieval speed, but it may also place higher demands on system resources and network bandwidth.",
"type": "int",
"ui_restrictions": ["advanced"],
"validations": [{"type": "greater_than", "constraint": 0}],
},
"use_text_extraction_service": {
"display": "toggle",
"label": "Use text extraction service",
"order": 8,
"tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.",
"type": "bool",
"ui_restrictions": ["advanced"],
"value": False,
},
}
def google_drive_client(self, impersonate_email=None):
"""
Initialize and return an instance of the GoogleDriveClient.
This method sets up a Google Drive client using service account credentials.
If an impersonate_email is provided, the client will be set up for domain-wide
delegation, allowing it to impersonate the provided user account within
a Google Workspace domain.
GoogleDriveClient needs to be reinstantiated for different values of impersonate_email,
therefore the client is not cached.
Args:
impersonate_email (str, optional): The email of the user account to impersonate.
Defaults to None, in which case no impersonation is set up (in case domain-wide delegation is disabled).
Returns:
GoogleDriveClient: An initialized instance of the GoogleDriveClient.
"""
service_account_credentials = self.configuration["service_account_credentials"]
validate_service_account_json(
service_account_credentials, GOOGLE_DRIVE_SERVICE_NAME
)
json_credentials = load_service_account_json(
service_account_credentials, GOOGLE_DRIVE_SERVICE_NAME
)
# handle domain-wide delegation
user_account_impersonation = (
{"subject": impersonate_email} if impersonate_email else {}
)
drive_client = GoogleDriveClient(
json_credentials=json_credentials, **user_account_impersonation
)
drive_client.set_logger(self._logger)
return drive_client
@cached_property
def google_admin_directory_client(self):
"""Initialize and return the GoogleAdminDirectoryClient
Returns:
GoogleAdminDirectoryClient: An instance of the GoogleAdminDirectoryClient.
"""
service_account_credentials = self.configuration["service_account_credentials"]
validate_service_account_json(
service_account_credentials, GOOGLE_ADMIN_DIRECTORY_SERVICE_NAME
)
self._validate_google_workspace_admin_email()
json_credentials = load_service_account_json(
service_account_credentials, GOOGLE_ADMIN_DIRECTORY_SERVICE_NAME
)
directory_client = GoogleAdminDirectoryClient(
json_credentials=json_credentials,
subject=self._get_google_workspace_admin_email(),
)
directory_client.set_logger(self._logger)
return directory_client
async def validate_config(self):
"""Validates whether user inputs are valid or not for configuration field.
Raises:
Exception: The format of service account json is invalid.
"""
await super().validate_config()
validate_service_account_json(
self.configuration["service_account_credentials"], GOOGLE_DRIVE_SERVICE_NAME
)
self._validate_google_workspace_admin_email()
self._validate_google_workspace_email_for_shared_drives_sync()
def _validate_google_workspace_admin_email(self):
"""
This method is used to validate the Google Workspace admin email address when Document Level Security (DLS) is enabled
for the current configuration. The email address should not be empty, and it should have a valid email format (no
whitespace and a valid domain).
Raises:
ConfigurableFieldValueError: If the Google Workspace admin email is empty when DLS is enabled,
or if the email is malformed or contains whitespace characters.
Note:
- This function assumes that `_dls_enabled()` is used to determine whether Document Level Security is enabled.
- The email address is validated using a basic regular expression pattern which might not cover all
possible valid email formats. For more accurate validation, consider using a comprehensive email validation
library or service.
"""
if self._dls_enabled():
google_workspace_admin_email = self._get_google_workspace_admin_email()
if google_workspace_admin_email is None:
msg = "Google Workspace admin email cannot be empty."
raise ConfigurableFieldValueError(msg)
if not validate_email_address(google_workspace_admin_email):
msg = "Google Workspace admin email is malformed or contains whitespace characters."
raise ConfigurableFieldValueError(msg)
def _validate_google_workspace_email_for_shared_drives_sync(self):
"""
Validates the Google Workspace email address specified for shared drives synchronization.
When 'Use domain-wide delegation for data sync' is enabled, this method ensures that the
email address provided for syncing shared drives is neither empty nor malformed.
Raises:
ConfigurableFieldValueError:
- If the Google Workspace email for shared drives sync is empty when the domain-wide delegation sync is enabled.
- If the provided email address is malformed or contains whitespace characters.
"""
if self._domain_wide_delegation_sync_enabled():
google_workspace_email = self.configuration[
"google_workspace_email_for_shared_drives_sync"
]
if google_workspace_email is None:
msg = "Google Workspace admin email for shared drives sync cannot be empty when 'Use domain-wide delegation for data sync' is enabled."
raise ConfigurableFieldValueError(msg)
if not validate_email_address(google_workspace_email):
msg = "Google Workspace email for shared drives sync is malformed or contains whitespace characters."
raise ConfigurableFieldValueError(msg)
async def ping(self):
"""Verify the connection with Google Drive"""
try:
if self._domain_wide_delegation_sync_enabled():
admin_email = self._get_google_workspace_admin_email()
await self.google_drive_client(impersonate_email=admin_email).ping()
else:
await self.google_drive_client().ping()
self._logger.info("Successfully connected to the Google Drive.")
except Exception:
self._logger.exception("Error while connecting to the Google Drive.")
raise
def _get_google_workspace_admin_email(self):
"""
Retrieves the Google Workspace admin email based on the current configuration.
If domain-wide delegation for data sync is enabled, this method will return the admin email
provided for shared drives sync. If Document Level Security (DLS) is enabled but not domain-wide
delegation, it will return the the admin email specified for DLS.
This ensures that if the admin email for domain-wide delegation is provided, it is utilized
for both sync and DLS without requiring the same email to be provided again for DLS.
Returns:
str or None: The Google Workspace admin email based on the current configuration or None if
neither domain-wide delegation nor DLS is enabled.
"""
if self._domain_wide_delegation_sync_enabled():
return self.configuration["google_workspace_admin_email_for_data_sync"]
elif self._dls_enabled():
return self.configuration["google_workspace_admin_email"]
else:
return None
def _google_google_workspace_email_for_shared_drives_sync(self):
return self.configuration.get("google_workspace_email_for_shared_drives_sync")
def _dls_enabled(self):
"""Check if Document Level Security is enabled"""
if self._features is None:
return False
if not self._features.document_level_security_enabled():
return False
return bool(self.configuration.get("use_document_level_security", False))
def _domain_wide_delegation_sync_enabled(self):
"""Check if Domain Wide delegation sync is enabled"""
return bool(
self.configuration.get("use_domain_wide_delegation_for_sync", False)
)
def _max_concurrency(self):
"""Get maximum concurrent open connections from the user config"""
return self.configuration.get("max_concurrency") or GOOGLE_API_MAX_CONCURRENCY
def access_control_query(self, access_control):
return es_access_control_query(access_control)
async def _process_items_concurrently(self, items, process_item_func):
"""Process a list of items concurrently using a semaphore for concurrency control.
This function applies the `process_item_func` to each item in the `items` list
using a semaphore to control the level of concurrency.
Args:
items (list): List of items to process.
process_item_func (function): The function to be called for each item.
This function should be asynchronous.
Returns:
list: A list containing the results of processing each item.
Note:
The `process_item_func` should be an asynchronous function that takes
one argument (item) and returns a coroutine.
"""
async def process_item(item, semaphore):
async with semaphore:
return await process_item_func(item)
# Create a semaphore with a concurrency limit of max_concurrency in the config
semaphore = asyncio.Semaphore(self._max_concurrency())
# Create tasks for each item, processing them concurrently with the semaphore
tasks = [process_item(item, semaphore) for item in items]
# Gather the results of all tasks concurrently
return await asyncio.gather(*tasks)
async def prepare_single_access_control_document(self, user):
"""Generate access control document for a single user. Fetch group memberships for a given user.
Generate a user_access_control query that includes information about user email, groups and domain.
Args:
user (dict): User object.
Yields:
dict: Access control doc.
"""
user_id = user.get("id")
user_email = user.get("primaryEmail")
user_domain = _get_domain_from_email(user_email)
user_groups = []
async for (
groups_page
) in self.google_admin_directory_client.list_groups_for_user(user_id):
for group in groups_page.get("groups", []):
user_groups.append(group.get("email"))
user_access_control = [
_prefix_user(user_email),
_prefix_domain(user_domain),
] + [_prefix_group(group) for group in user_groups]
return {
"_id": user_email,
"identity": {
"name": user.get("name").get("fullName"),
"email": user_email,
},
} | self.access_control_query(access_control=user_access_control)
async def prepare_access_control_documents(self, users_page):
"""Generate access control document.
Args:
users_page (list): List with user objects.
Yields:
dict: Access control doc.
"""
users = users_page.get("users", [])
prepared_ac_docs = await self._process_items_concurrently(
users, self.prepare_single_access_control_document
)
for ac_doc in prepared_ac_docs:
yield ac_doc
async def get_access_control(self):
"""Yields an access control document for every user of Google Workspace organization.
Yields:
dict: dictionary representing a user access control document
"""
if not self._dls_enabled():
self._logger.warning("DLS is not enabled. Skipping access controls sync.")
return
async for user_page in self.google_admin_directory_client.list_users():
async for access_control_doc in self.prepare_access_control_documents(
users_page=user_page
):
yield access_control_doc
async def resolve_paths(self, google_drive_client=None):
"""Builds a lookup between a folder id and its absolute path in Google Drive structure
Returns:
dict: mapping between folder id and its (name, parents, path)
"""
if not google_drive_client:
google_drive_client = self.google_drive_client()
folders = await google_drive_client.get_all_folders()
drives = await google_drive_client.get_all_drives()
# for paths let's treat drives as top level folders
for id_, drive_name in drives.items():
folders[id_] = {"name": drive_name, "parents": []}
self._logger.info(f"Resolving folder paths for {len(folders)} folders")
for folder in folders.values():
path = [folder["name"]] # Start with the folder name
parents = folder["parents"]
parent_id = parents[0] if parents else None
# Traverse the parents until reaching the root or a missing parent
while parent_id and parent_id in folders:
parent_folder = folders[parent_id]
# break the loop early if the path is resolved for the parent folder
if "path" in parent_folder:
path.insert(0, parent_folder["path"])
break
path.insert(
0, parent_folder["name"]
) # Insert parent name at the beginning
parents = parent_folder["parents"]
parent_id = parents[0] if parents else None
folder["path"] = "/".join(path) # Join path elements with '/'
return folders
async def _download_content(self, file, file_extension, download_func):
"""Downloads the file from Google Drive and returns the encoded file content.
Args:
file (dict): Formatted file document.
download_func (partial func): Partial function that gets the file content from Google Drive API.
Returns:
attachment, file_size (tuple): base64 encoded contnet of the file and size in bytes of the attachment
"""
file_name = file["name"]
attachment, body, file_size = None, None, 0
async with self.create_temp_file(file_extension) as async_buffer:
await download_func(
pipe_to=async_buffer,
)
await async_buffer.close()
doc = await self.handle_file_content_extraction(
{}, file_name, async_buffer.name
)
attachment = doc.get("_attachment")
body = doc.get("body")
return attachment, body, file_size
async def get_google_workspace_content(self, client, file, timestamp=None):
"""Exports Google Workspace documents to an allowed file type and extracts its text content.
Shared Google Workspace documents are different than regular files. When shared from
a different account they don't count against the user storage quota and therefore have size 0.
They need to be exported to a supported file type before the content extraction phase.
Args:
file (dict): Formatted file document.
timestamp (timestamp, optional): Timestamp of file last modified. Defaults to None.
Returns:
dict: Content document with id, timestamp & text
"""
file_name, file_id, file_mime_type, file_extension = (
file["name"],
file["id"],
file["mime_type"],
f".{file['file_extension']}",
)
document = {
"_id": file_id,
"_timestamp": file["_timestamp"],
}
attachment, body, file_size = await self._download_content(
file=file,
file_extension=file_extension,
download_func=partial(
client.api_call,
resource="files",
method="export",
fileId=file_id,
mimeType=GOOGLE_MIME_TYPES_MAPPING[file_mime_type],
),
)
# We need to do sanity size after downloading the file because:
# 1. We use files/export endpoint which converts large media-rich google slides/docs
# into text/plain format. We usually we end up with tiny .txt files.
# 2. Google will offer report the Google Workspace shared documents to have size 0
# as they don't count against user's storage quota.
if not self.is_file_size_within_limit(file_size, file_name):
return
if attachment is not None:
document["_attachment"] = attachment
elif body is not None:
document["body"] = body
return document
async def get_generic_file_content(self, client, file, timestamp=None):
"""Extracts the content from allowed file types supported by Apache Tika.
Args:
file (dict): Formatted file document .
timestamp (timestamp, optional): Timestamp of file last modified. Defaults to None.
Returns:
dict: Content document with id, timestamp & text
"""
file_size = int(file["size"])
if file_size == 0:
return
file_name, file_id, file_extension = (
file["name"],
file["id"],
f".{file['file_extension']}",
)
if not self.can_file_be_downloaded(file_extension, file_name, file_size):
return
document = {
"_id": file_id,
"_timestamp": file["_timestamp"],
}
attachment, body, _ = await self._download_content(
file=file,
file_extension=file_extension,
download_func=partial(
client.api_call,
resource="files",
method="get",
fileId=file_id,
supportsAllDrives=True,
alt="media",
),
)
if attachment is not None:
document["_attachment"] = attachment
elif body is not None:
document["body"] = body
return document
async def get_content(self, client, file, timestamp=None, doit=None):
"""Extracts the content from a file file.
Args:
file (dict): Formatted file document.
timestamp (timestamp, optional): Timestamp of file last_modified. Defaults to None.
doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.
Returns:
dict: Content document with id, timestamp & text
"""
if not doit:
return
file_mime_type = file["mime_type"]
if file_mime_type in GOOGLE_MIME_TYPES_MAPPING:
# Get content from native google workspace files (docs, slides, sheets)
return await self.get_google_workspace_content(
client, file, timestamp=timestamp
)
else:
# Get content from all other file types
return await self.get_generic_file_content(
client, file, timestamp=timestamp
)
async def _get_permissions_on_shared_drive(self, client, file_id):
"""Retrieves the permissions on a shared drive for the given file ID.
Args:
file_id (str): The ID of the file.
Returns:
list: A list of permissions on the shared drive for a file.
"""
permissions = []
async for permissions_page in client.list_permissions(file_id):
permissions.extend(permissions_page.get("permissions", []))
return permissions
def _process_permissions(self, permissions):
"""Formats the access permission list for Google Drive object.
Args:
permissions (list): List of permissions of Google Drive file returned from API.
Returns:
list: A list of processed access permissions for a given file.
"""
processed_permissions = []
for permission in permissions:
permission_type = permission["type"]
access_permission = None
if _is_user_permission(permission_type):
access_permission = _prefix_user(permission.get("emailAddress"))
elif _is_group_permission(permission_type):
access_permission = _prefix_group(permission.get("emailAddress"))
elif _is_domain_permission(permission_type):
access_permission = _prefix_domain(permission.get("domain"))
elif _is_anyone_permission(permission_type):
access_permission = "anyone"
else:
self._logger.warning(