Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Onedrive] Add logs #2553

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions connectors/sources/onedrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def session(self):
Returns:
ClientSession: Base client session
"""
self._logger.debug("Creating a client session")
connector = aiohttp.TCPConnector(limit=DEFAULT_PARALLEL_CONNECTION_COUNT)
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
return aiohttp.ClientSession(
Expand All @@ -270,6 +271,7 @@ async def close_session(self):
skipped_exceptions=NotFound,
)
async def get(self, url, header=None):
self._logger.debug(f"Calling GET {url}")
access_token = await self.token.get()
headers = {"authorization": f"Bearer {access_token}"}
if header:
Expand All @@ -290,6 +292,7 @@ async def get(self, url, header=None):
skipped_exceptions=NotFound,
)
async def post(self, url, payload=None):
self._logger.debug(f"Calling POST {url} with payload: {payload}")
access_token = await self.token.get()
headers = {
"authorization": f"Bearer {access_token}",
Expand Down Expand Up @@ -348,6 +351,9 @@ async def paginated_api_call(
params["$top"] = fetch_size
params = "&".join(f"{key}={val}" for key, val in params.items())

self._logger.debug(
f"Started pagination for url: {url} with parameters: {params}"
)
url = f"{url}?{params}"
while True:
try:
Expand Down Expand Up @@ -379,7 +385,7 @@ async def list_users(self, include_groups=False):
if include_groups:
params["$expand"] = "transitiveMemberOf($select=id)"

# This condtion is executed during content sync where connector fetches only licensed accounts as unlicensed users won't have any files.
# This condition is executed during content sync where connector fetches only licensed accounts as unlicensed users won't have any files.
else:
params["$filter"] += " and assignedLicenses/$count ne 0&$count=true"
header = {"ConsistencyLevel": "eventual"}
Expand Down Expand Up @@ -530,9 +536,10 @@ async def ping(self):
try:
url = parse.urljoin(BASE_URL, ENDPOINTS[PING])
await anext(self.client.get(url=url))
self._logger.info("Successfully connected to OneDrive")
except Exception:
self._logger.exception("Error while connecting to OneDrive")
self._logger.warning(
"Error while connecting to OneDrive. Please check the configurations"
)
raise

async def get_content(self, file, download_url, timestamp=None, doit=False):
Expand Down Expand Up @@ -562,6 +569,7 @@ async def get_content(self, file, download_url, timestamp=None, doit=False):
"_id": file["_id"],
"_timestamp": file["_timestamp"],
}
self._logger.debug(f"Downloading file content from url: {download_url}")
return await self.download_and_extract_file(
document,
filename,
Expand Down Expand Up @@ -606,6 +614,7 @@ async def _decorate_with_access_control(self, document, user_id):
return document

async def _user_access_control_doc(self, user):
self._logger.info("Fetching users for access control sync")
email = user.get("mail")
username = user.get("userPrincipalName")

Expand Down Expand Up @@ -648,6 +657,9 @@ async def get_entity_permission(self, user_id, file_id):
if not self._dls_enabled():
return []

self._logger.debug(
f"Fetching entity permissions for user_id: {user_id} and file_id: {file_id}"
)
permissions = []
async for permission in self.client.list_file_permission(
user_id=user_id, file_id=file_id
Expand Down Expand Up @@ -723,7 +735,7 @@ def send_document_to_es(self, entity, download_url):
else:
return entity, None

async def _bounbed_concurrent_tasks(
async def _bounded_concurrent_tasks(
self, items, max_concurrency, calling_func, **kwargs
):
async def process_item(item, semaphore):
Expand Down Expand Up @@ -751,9 +763,13 @@ async def get_docs(self, filtering=None):
Yields:
dictionary: dictionary containing meta-data of the files.
"""
self._logger.debug("Successfully connected to OneDrive")

if filtering and filtering.has_advanced_rules():
advanced_rules = filtering.get_advanced_rules()
self._logger.info(
f"Retrieving documents from OneDrive using advanced sync rules: {advanced_rules}"
)

user_mail_id_map = {}
async for user in self.client.list_users():
Expand Down Expand Up @@ -782,7 +798,7 @@ async def get_docs(self, filtering=None):
files = response.get("body", {}).get("value", [])
if entities := [file for file in files if file.get("name") != "root"]:
if self._dls_enabled():
entities = await self._bounbed_concurrent_tasks(
entities = await self._bounded_concurrent_tasks(
entities,
self.concurrent_downloads,
self._decorate_with_access_control,
Expand Down