Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow set self signed certs #59

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ with Actual(
encryption_password=None, # Optional: Password for the file encryption. Will not use it if set to None.
file="<file_id_or_name>", # Set the file to work with. Can be either the file id or file name, if name is unique
data_dir="<path_to_data_directory>" # Optional: Directory to store downloaded files. Will use a temporary if not provided
cert="<path_to_cert_file>" # Optional: Path to the certificate file to use for the connection, can also be set as False to disable SSL verification
) as actual:
transactions = get_transactions(actual.session)
for t in transactions:
Expand Down
3 changes: 2 additions & 1 deletion actual/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(
file: str = None,
encryption_password: str = None,
data_dir: Union[str, pathlib.Path] = None,
cert: str | bool = False,
bootstrap: bool = False,
sa_kwargs: dict = None,
):
Expand All @@ -69,7 +70,7 @@ def __init__(
by default), `autocommit` (disabled by default). For a list of all parameters, check the SQLAlchemy
documentation: https://docs.sqlalchemy.org/en/20/orm/session_api.html#sqlalchemy.orm.Session.__init__
"""
super().__init__(base_url, token, password, bootstrap)
super().__init__(base_url, token, password, bootstrap, cert)
self._file: RemoteFileListDTO | None = None
self._data_dir = pathlib.Path(data_dir) if data_dir else None
self.engine = None
Expand Down
45 changes: 31 additions & 14 deletions actual/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
token: str = None,
password: str = None,
bootstrap: bool = False,
cert: str | bool = False,
):
self.api_url = base_url
self._token = token
self.cert = cert
if token is None and password is None:
raise ValueError("Either provide a valid token or a password.")
# already try to login if password was provided
Expand All @@ -62,12 +64,13 @@
if not password:
raise AuthorizationError("Trying to login but not password was provided.")
if method == "password":
response = requests.post(f"{self.api_url}/{Endpoints.LOGIN}", json={"password": password})
response = requests.post(f"{self.api_url}/{Endpoints.LOGIN}", json={"password": password}, verify=self.cert)
else:
response = requests.post(
f"{self.api_url}/{Endpoints.LOGIN}",
json={"loginMethod": method},
headers={"X-ACTUAL-PASSWORD": password},
verify=self.cert,
)
response_dict = response.json()
if response.status_code == 400 and "invalid-password" in response.text:
Expand Down Expand Up @@ -98,38 +101,40 @@

def info(self) -> InfoDTO:
"""Gets the information from the Actual server, like the name and version."""
response = requests.get(f"{self.api_url}/{Endpoints.INFO}")
response = requests.get(f"{self.api_url}/{Endpoints.INFO}", verify=self.cert)
response.raise_for_status()
return InfoDTO.model_validate(response.json())

def validate(self) -> ValidateDTO:
"""Validates if the user is valid and logged in, and if the token is also valid and bound to a session."""
response = requests.get(f"{self.api_url}/{Endpoints.ACCOUNT_VALIDATE}", headers=self.headers())
response = requests.get(
f"{self.api_url}/{Endpoints.ACCOUNT_VALIDATE}", headers=self.headers(), verify=self.cert
)
response.raise_for_status()
return ValidateDTO.model_validate(response.json())

def needs_bootstrap(self) -> BootstrapInfoDTO:
"""Checks if the Actual needs bootstrap, in other words, if it needs a master password for the server."""
response = requests.get(f"{self.api_url}/{Endpoints.NEEDS_BOOTSTRAP}")
response = requests.get(f"{self.api_url}/{Endpoints.NEEDS_BOOTSTRAP}", verify=self.cert)
response.raise_for_status()
return BootstrapInfoDTO.model_validate(response.json())

def bootstrap(self, password: str) -> LoginDTO:
response = requests.post(f"{self.api_url}/{Endpoints.BOOTSTRAP}", json={"password": password})
response = requests.post(f"{self.api_url}/{Endpoints.BOOTSTRAP}", json={"password": password}, verify=self.cert)
response.raise_for_status()
login_response = LoginDTO.model_validate(response.json())
self._token = login_response.data.token
return login_response

def data_file_index(self) -> List[str]:
"""Gets all the migration file references for the actual server."""
response = requests.get(f"{self.api_url}/{Endpoints.DATA_FILE_INDEX}")
response = requests.get(f"{self.api_url}/{Endpoints.DATA_FILE_INDEX}", verify=self.cert)
response.raise_for_status()
return response.content.decode().splitlines()

def data_file(self, file_path: str) -> bytes:
"""Gets the content of the individual migration file from server."""
response = requests.get(f"{self.api_url}/data/{file_path}")
response = requests.get(f"{self.api_url}/data/{file_path}", verify=self.cert)
response.raise_for_status()
return response.content

Expand All @@ -139,7 +144,9 @@
if file_id is None:
raise UnknownFileId("Could not reset the file without a valid 'file_id'")
request = requests.post(
f"{self.api_url}/{Endpoints.RESET_USER_FILE}", json={"fileId": file_id, "token": self._token}
f"{self.api_url}/{Endpoints.RESET_USER_FILE}",
json={"fileId": file_id, "token": self._token},
verify=self.cert,
)
request.raise_for_status()
return StatusDTO.model_validate(request.json())
Expand All @@ -148,7 +155,9 @@
"""Downloads the user file based on the file_id provided. Returns the `bytes` from the response, which is a
zipped folder of the database `db.sqlite` and the `metadata.json`. If the database is encrypted, the key id
has to be retrieved additionally using user_get_key()."""
db = requests.get(f"{self.api_url}/{Endpoints.DOWNLOAD_USER_FILE}", headers=self.headers(file_id))
db = requests.get(
f"{self.api_url}/{Endpoints.DOWNLOAD_USER_FILE}", headers=self.headers(file_id), verify=self.cert
)
db.raise_for_status()
return db.content

Expand All @@ -170,20 +179,23 @@
f"{self.api_url}/{Endpoints.UPLOAD_USER_FILE}",
data=binary_data,
headers=self.headers(extra_headers=base_headers),
verify=self.cert,
)
request.raise_for_status()
return UploadUserFileDTO.model_validate(request.json())

def list_user_files(self) -> ListUserFilesDTO:
"""Lists the user files. If the response item contains `encrypt_key_id` different from `None`, then the
file must be decrypted on retrieval."""
response = requests.get(f"{self.api_url}/{Endpoints.LIST_USER_FILES}", headers=self.headers())
response = requests.get(f"{self.api_url}/{Endpoints.LIST_USER_FILES}", headers=self.headers(), verify=self.cert)
response.raise_for_status()
return ListUserFilesDTO.model_validate(response.json())

def get_user_file_info(self, file_id: str) -> GetUserFileInfoDTO:
"""Gets the user file information, including the encryption metadata."""
response = requests.get(f"{self.api_url}/{Endpoints.GET_USER_FILE_INFO}", headers=self.headers(file_id))
response = requests.get(
f"{self.api_url}/{Endpoints.GET_USER_FILE_INFO}", headers=self.headers(file_id), verify=self.cert
)
response.raise_for_status()
return GetUserFileInfoDTO.model_validate(response.json())

Expand All @@ -193,6 +205,7 @@
f"{self.api_url}/{Endpoints.UPDATE_USER_FILE_NAME}",
json={"fileId": file_id, "name": file_name, "token": self._token},
headers=self.headers(),
verify=self.cert,
)
response.raise_for_status()
return StatusDTO.model_validate(response.json())
Expand All @@ -203,6 +216,7 @@
f"{self.api_url}/{Endpoints.DELETE_USER_FILE}",
json={"fileId": file_id, "token": self._token},
headers=self.headers(),
verify=self.cert,
)
return StatusDTO.model_validate(response.json())

Expand All @@ -215,6 +229,7 @@
"token": self._token,
},
headers=self.headers(file_id),
verify=self.cert,
)
response.raise_for_status()
return UserGetKeyDTO.model_validate(response.json())
Expand All @@ -234,6 +249,7 @@
"testContent": json.dumps(test_content),
"token": self._token,
},
verify=self.cert,
)
return StatusDTO.model_validate(response.json())

Expand All @@ -246,19 +262,20 @@
f"{self.api_url}/{Endpoints.SYNC}",
headers=self.headers(request.fileId, extra_headers={"Content-Type": "application/actual-sync"}),
data=SyncRequest.serialize(request),
verify=self.cert,
)
response.raise_for_status()
parsed_response = SyncResponse.deserialize(response.content)
return parsed_response # noqa

def bank_sync_status(self, bank_sync: Literal["gocardless", "simplefin"] | str) -> BankSyncStatusDTO:
endpoint = Endpoints.BANK_SYNC_STATUS.value.format(bank_sync=bank_sync)
response = requests.post(f"{self.api_url}/{endpoint}", headers=self.headers(), json={})
response = requests.post(f"{self.api_url}/{endpoint}", headers=self.headers(), json={}, verify=self.cert)
return BankSyncStatusDTO.model_validate(response.json())

def bank_sync_accounts(self, bank_sync: Literal["gocardless", "simplefin"]) -> BankSyncAccountResponseDTO:
endpoint = Endpoints.BANK_SYNC_ACCOUNTS.value.format(bank_sync=bank_sync)
response = requests.post(f"{self.api_url}/{endpoint}", headers=self.headers(), json={})
response = requests.post(f"{self.api_url}/{endpoint}", headers=self.headers(), json={}, verify=self.cert)

Check warning on line 278 in actual/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

actual/api/__init__.py#L278

Added line #L278 was not covered by tests
return BankSyncAccountResponseDTO.model_validate(response.json())

def bank_sync_transactions(
Expand All @@ -274,5 +291,5 @@
payload = {"accountId": account_id, "startDate": start_date.strftime("%Y-%m-%d")}
if requisition_id:
payload["requisitionId"] = requisition_id
response = requests.post(f"{self.api_url}/{endpoint}", headers=self.headers(), json=payload)
response = requests.post(f"{self.api_url}/{endpoint}", headers=self.headers(), json=payload, verify=self.cert)
return BankSyncTransactionResponseDTO.model_validate(response.json())
1 change: 1 addition & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ def test_rename_delete_budget_without_file():
def test_api_login_unknown_error(_post):
actual = Actual.__new__(Actual)
actual.api_url = "localhost"
actual.cert = False
with pytest.raises(AuthorizationError, match="Something went wrong on login"):
actual.login("foo")
Loading