-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source Freshdesk: Migrated to latest CDK #12334
🎉 Source Freshdesk: Migrated to latest CDK #12334
Conversation
/test connector=connectors/source-freshdesk
|
I'll start code reviewing this. |
Thanks @itaseskii , feel free to ping me if you need an additional review. |
fda8157
to
1f0cdbf
Compare
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py
Outdated
Show resolved
Hide resolved
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
authenticator = self._create_authenticator(config["api_key"]) | ||
return [ | ||
Agents(authenticator=authenticator, config=config), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't you missing the Surveys stream from the original implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I should have missed it when migrating individual streams. Added back in 4e2f437.
alive = True | ||
error_msg = None | ||
try: | ||
url = f"https://{config['domain'].rstrip('/')}/api/v2/settings/helpdesk" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using urljoin for url concatenation is a bit more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I didn't know about that function. Changed in 4e2f437.
requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", responses) | ||
ok, error_msg = SourceFreshdesk().check_connection(logger, config=config) | ||
|
||
assert not ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use single line statements/assertions assert not ok and error_msg == ""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 4e2f437.
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None | ||
) -> Mapping[str, Any]: | ||
return { | ||
"Content-Type": "application/json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is "Content-Type": "application/json" needed when sending a request? Aren't all the requests of GET type and without a body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Turns out it works without having to send these headers. I simplified the code by removing this method in 4e2f437.
return "time_entries" | ||
|
||
|
||
class Tickets(IncrementalFreshdeskStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that Tickets will be retrieved in ascending order maybe we can set a state_checkpoint_interval = 100 in order to persist state on every 100 records and avoid sending previously sent records if the replication crashes midway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. I added this in 4e2f437. Thank you for the suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest you set state_checkpoint_interval
on all the Incremental streams, or you can implement stream_slices
. If you implement stream slicing the state will be checkpointed once a slice was properly consumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in abcae41.
pagination_complete = False | ||
|
||
next_page_token = None | ||
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind explaining why copying large amounts of the parent streamread_records()
is needed instead of invoking super().read_records()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You made me re-think how I migrated this particular stream and I ended up overriding the next_page_token
method instead of the read_records
one. In the end, I had to intervene the next_page_token
if the next page to get was above 300. The method now has a comment that was ported from the original implementation. See b0bcf0e.
Will do another round of code reviewing tomorrow. |
yield element | ||
|
||
|
||
class IncrementalFreshdeskStream(FreshdeskStream, ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class IncrementalFreshdeskStream(FreshdeskStream, ABC): | |
class IncrementalFreshdeskStream(FreshdeskStream, IncrementalMixin): |
The latest CDK introduces the IncrementalMixin
class. I suggest you use it and set the state value in read_records
as suggested here. get_updated_state
is now deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in abcae41
return "time_entries" | ||
|
||
|
||
class Tickets(IncrementalFreshdeskStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest you set state_checkpoint_interval
on all the Incremental streams, or you can implement stream_slices
. If you implement stream slicing the state will be checkpointed once a slice was properly consumed.
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py
Outdated
Show resolved
Hide resolved
alive = True | ||
error_msg = None | ||
try: | ||
url = f"https://{config['domain'].rstrip('/')}/api/v2/settings/helpdesk" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a more robust way of checking connection would be to request several actual streams endpoints
try: | ||
url = f"https://{config['domain'].rstrip('/')}/api/v2/settings/helpdesk" | ||
r = requests.get(url=url, auth=self._create_authenticator(config["api_key"])) | ||
if not r.ok: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest leveraging r.raise_for_status()
and handling the error if it's raised.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Updated in 24d1581.
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
Hi @alafanechere. Thank you for reviewing. I've updated this PR as per your comments. Please take a look again when you get a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went for another partial review but need to switch context. I'll review the rest asap.
except ValueError: | ||
error_msg = "Invalid credentials" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which situation ValueError
is raised here?
If whatever non HTTPError is raised is I don't think the cause would be Invalid Credential
.
I think this try except block is redundant as non HttpError are watched by the higher level except Exception as error:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I've removed the handling of this kind of error.
return alive, error_msg | ||
|
||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
authenticator = self._create_authenticator(config["api_key"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
authenticator = self._create_authenticator(config["api_key"]) | |
authenticator = HTTPBasicAuthNoPassword(config["api_key"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed authenticator class with FreshdeskAuth
and used here.
|
||
class HTTPBasicAuthNoPassword(HTTPBasicAuth): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
class HTTPBasicAuthNoPassword(HTTPBasicAuth): | |
class FreshdeskAuth(HTTPBasicAuth): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
# Since this logic rely not on updated tickets, it can break tickets dependant streams - conversations. | ||
# So updated_since parameter will be always used in tickets streams. And start_date will be used too | ||
# with default value 30 days look back. | ||
self.start_date = pendulum.parse(start_date) if start_date else pendulum.now() - pendulum.duration(days=30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.start_date = pendulum.parse(start_date) if start_date else pendulum.now() - pendulum.duration(days=30) | |
self.start_date = pendulum.parse(config["start_date"]) if config["start_date"] else pendulum.now() - pendulum.duration(days=30) |
|
||
class FreshdeskStream(HttpStream, ABC): | ||
"""Basic stream API that allows to iterate over entities""" | ||
call_credit = 1 # see https://developers.freshdesk.com/api/#embedding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This value never changes, I'd prefer to decrement from 1
rather than decrementing from this variable if it never changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked and it was actually being used with a different value in the Tickets
stream. I've updated it accordingly.
|
||
def backoff_time(self, response: requests.Response) -> Optional[float]: | ||
if response.status_code == requests.codes.too_many_requests: | ||
return float(response.headers.get("Retry-After")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return float(response.headers.get("Retry-After")) | |
return float(response.headers.get("Retry-After", 0)) |
You will have a TypeError
if Retry-After
is not in the headers: float(None) -> TypeError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Done.
params = parse.parse_qs(parse.urlparse(next_url).query) | ||
return self.parse_link_params(link_query_params=params) | ||
except Exception as e: | ||
raise KeyError(f"error parsing next_page token: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you choose to raise a KeyError here?
Could you please explain what kind of error you expect in this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I updated this method and removed the exception handling.
match = self.link_regex.search(link_header) | ||
next_url = match.group(1) | ||
params = parse.parse_qs(parse.urlparse(next_url).query) | ||
return self.parse_link_params(link_query_params=params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self.parse_link_params(link_query_params=params) | |
return {"per_page": params['per_page'][0], "page": params['page'][0]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
def parse_link_params(self, link_query_params: Mapping[str, List[str]]) -> Mapping[str, Any]: | ||
return {"per_page": link_query_params['per_page'][0], "page": link_query_params['page'][0]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def parse_link_params(self, link_query_params: Mapping[str, List[str]]) -> Mapping[str, Any]: | |
return {"per_page": link_query_params['per_page'][0], "page": link_query_params['page'][0]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None | ||
) -> MutableMapping[str, Any]: | ||
params = {"per_page": self.result_return_limit} | ||
if next_page_token and "page" in next_page_token: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could page
not be in next_page_token
if next_page_token
is not None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm done reviewing source.py
and streams.py
:) I made a lot of suggestions to simplify and increase readability + match CDK best practices.
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Show resolved
Hide resolved
airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py
Outdated
Show resolved
Hide resolved
24d1581
to
5751b4a
Compare
@alafanechere I've addressed your comments. Please take a look again when you get a chance and let me know if there's anything else you consider I should update. Thanks! |
/test connector=connectors/source-freshdesk
|
Thank you for the changes @lgomezm , I'm running the test now and will go for a final review asap. |
/test connector=connectors/source-freshdesk
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lgomezm the acceptance tests are not passing. Could you please provide the required fixes? Feel free to ask if you need help understanding why the tests are not passing.
Hi @alafanechere. I've pushed a change that fixes acceptance tests. Please give it a try again when you get a chance. Here's the results of them running locally: |
/test connector=connectors/source-freshdesk
|
We are facing an error while running the acceptance test with our Freshdesk account:
I'm trying to remove the |
/test connector=connectors/source-freshdesk
|
77ea41b
to
9cf2ba1
Compare
Hi @alafanechere. It looks like the test account that's used to run integration tests is not allowed to get skills. I have overwritten the configured catalog file so that the |
/test connector=connectors/source-freshdesk
Build PassedTest summary info:
|
/publish connector=connectors/source-freshdesk
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contribution @lgomezm !
What
Closes #11456.
How
Recommended reading order
source.py
streams.py
unit_tests
.🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.