Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizing Cloudflare undetection #144

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyaarlo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ def cameras(self):
:return: a list of cameras.
:rtype: list(ArloCamera)
"""
if not hasattr(self, '_cameras'):
self._cameras = []
return self._cameras

@property
Expand Down Expand Up @@ -555,6 +557,8 @@ def base_stations(self):
:return: a list of base stations.
:rtype: list(ArloBase)
"""
if not hasattr(self, '_bases'):
self._bases = []
return self._bases

@property
Expand Down
147 changes: 117 additions & 30 deletions pyaarlo/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from .constant import (
AUTH_FINISH_PATH,
AUTH_GET_FACTORID,
AUTH_GET_FACTORS,
AUTH_PATH,
AUTH_START_PATH,
Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(self, arlo):

# login
self._session = None
self._cookies = self._load_cookies()
self._logged_in = self._login()
if not self._logged_in:
self.debug("failed to log in")
Expand Down Expand Up @@ -133,6 +135,18 @@ def _save_session(self):
except Exception as e:
self._arlo.warning("session file not written" + str(e))

def _save_cookies(self, requests_cookiejar):
with open(self._arlo.cfg.cookies_file, 'wb') as f:
pickle.dump(requests_cookiejar, f)

def _load_cookies(self):
try:
with open(self._arlo.cfg.cookies_file, 'rb') as f:
return pickle.load(f)
except Exception as e:
self._arlo.info("cookies file not created yet: " + str(e))
return None

def _transaction_id(self):
return 'FE!' + str(uuid.uuid4())

Expand All @@ -152,6 +166,7 @@ def _request(
timeout=None,
host=None,
authpost=False,
cookies=None,
):
if params is None:
params = {}
Expand All @@ -173,30 +188,43 @@ def _request(
self.vdebug("request-url={}".format(url))
self.vdebug("request-params=\n{}".format(pprint.pformat(params)))
self.vdebug("request-headers=\n{}".format(pprint.pformat(headers)))

if method == "GET":
r = self._session.get(
url,
params=params,
headers=headers,
stream=stream,
timeout=timeout,
cookies=cookies,
)
self._save_cookies(r.cookies)
if stream is True:
return r
elif method == "PUT":
r = self._session.put(
url, json=params, headers=headers, timeout=timeout
url, json=params, headers=headers, timeout=timeout, cookies=cookies,
)
self._save_cookies(r.cookies)
elif method == "POST":
r = self._session.post(
url, json=params, headers=headers, timeout=timeout, cookies=cookies,
)
self._save_cookies(r.cookies)
elif method == "OPTIONS":
r = self._session.options(
url, json=params, headers=headers, timeout=timeout
)
self._save_cookies(r.cookies)
except Exception as e:
self._arlo.warning("request-error={}".format(type(e).__name__))
return None

try:
body = r.json()
if "application/json" in r.headers["Content-Type"]:
body = r.json()
else:
body = r.text
self.vdebug("request-body=\n{}".format(pprint.pformat(body)))
except Exception as e:
self._arlo.warning("body-error={}".format(type(e).__name__))
Expand Down Expand Up @@ -679,6 +707,8 @@ def _auth_headers(self):
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-GB,en;q=0.9,en-US;q=0.8",
"Cache-Control": "no-cache",
"Content-Type": "application/json",
"Dnt": "1",
"Origin": ORIGIN_HOST,
"Pragma": "no-cache",
"Referer": REFERER_HOST,
Expand All @@ -689,7 +719,7 @@ def _auth_headers(self):
# "Sec-Fetch-Mode": "cors",
# "Sec-Fetch-Site": "same-site",
"User-Agent": self._user_agent,
"X-User-Device-Automation-name": "QlJPV1NFUg==",
"X-User-Device-Automation-Name": "QlJPV1NFUg==",
"X-User-Device-Id": self._user_device_id,
"X-User-Device-Type": "BROWSER",
}
Expand All @@ -711,6 +741,7 @@ def _headers(self):
"Authorization": self._token,
"Cache-Control": "no-cache",
"Content-Type": "application/json; charset=utf-8;",
"Dnt": "1",
"Origin": ORIGIN_HOST,
"Pragma": "no-cache",
"Referer": REFERER_HOST,
Expand All @@ -733,6 +764,9 @@ def _auth(self):
while attempt < 3:
attempt += 1
self.debug("login attempt #{}".format(attempt))
self._options = self.auth_options(AUTH_PATH, headers)
self._cookies = self._load_cookies()

body = self.auth_post(
AUTH_PATH,
{
Expand All @@ -742,10 +776,11 @@ def _auth(self):
"EnvSource": "prod",
},
headers,
cookies=self._cookies,
)
if body is not None:
break
time.sleep(1)
time.sleep(3)
if body is None:
self._arlo.error("authentication failed")
return False
Expand All @@ -763,31 +798,48 @@ def _auth(self):

# get available 2fa choices,
self.debug("getting tfa choices")
factors = self.auth_get(
AUTH_GET_FACTORS + "?data = {}".format(int(time.time())), {}, headers
)
if factors is None:
self._arlo.error("2fa: no secondary choices available")
return False

self._options = self.auth_options(AUTH_GET_FACTORID, headers)
self._cookies = self._load_cookies()

# look for code source choice
self.debug(f"looking for {self._arlo.cfg.tfa_type}/{self._arlo.cfg.tfa_nickname}")
factors_of_type = []
factor_id = None

for factor in factors["items"]:
if factor["factorType"].lower() == self._arlo.cfg.tfa_type:
factors_of_type.append(factor)
payload = {
"factorType": "BROWSER",
"factorData": "",
"userId": self._user_id
}

r = self.auth_post(
AUTH_GET_FACTORID, payload, headers, cookies=self._cookies
)

if len(factors_of_type) > 0:
# Try to match the factorNickname with the tfa_nickname
for factor in factors_of_type:
if self._arlo.cfg.tfa_nickname == factor["factorNickname"]:
factor_id = factor["factorId"]
break
# Otherwise fallback to using the first option
else:
factor_id = factors_of_type[0]["factorId"]
if r != None:
factor_id = r["factorId"]
else:
factors = self.auth_get(
AUTH_GET_FACTORS + "?data = {}".format(int(time.time())), {}, headers
)
if factors is None:
self._arlo.error("2fa: no secondary choices available")
return False

for factor in factors["items"]:
if factor["factorType"].lower() == self._arlo.cfg.tfa_type:
factors_of_type.append(factor)

if len(factors_of_type) > 0:
# Try to match the factorNickname with the tfa_nickname
for factor in factors_of_type:
if self._arlo.cfg.tfa_nickname == factor["factorNickname"]:
factor_id = factor["factorId"]
break
# Otherwise fallback to using the first option
else:
factor_id = factors_of_type[0]["factorId"]

if factor_id is None:
self._arlo.error("2fa no suitable secondary choice available")
Expand All @@ -808,6 +860,7 @@ def _auth(self):
"factorType": "BROWSER",
"userId": self._user_id
}
self._options = self.auth_options(AUTH_START_PATH, headers)
body = self.auth_post(AUTH_START_PATH, payload, headers)
if body is None:
self._arlo.error("2fa startAuth failed")
Expand Down Expand Up @@ -900,6 +953,12 @@ def _login(self):
# pickup user configured user agent
self._user_agent = self.user_agent(self._arlo.cfg.user_agent)

# If we want to use cloudscraper's random user agent and set the cookies during a first call to the origin host
# Used during a debugging session about CloudFlare's 403.
# _cookies, self._user_agent = cloudscraper.get_tokens(ORIGIN_HOST)
# if self._cookies is None:
# self._cookies = _cookies

# If token looks invalid we'll try the whole process.
get_new_session = days_until(self._expires_in) < 2
if get_new_session:
Expand All @@ -908,7 +967,15 @@ def _login(self):
for curve in self._arlo.cfg.ecdh_curves:
self.debug(f"CloudFlare curve set to: {curve}")
self._session = cloudscraper.create_scraper(
ecdhCurve=curve
# browser={
# 'browser': 'chrome',
# 'platform': 'darwin',
# 'desktop': True,
# 'mobile': False,
# },
disableCloudflareV1=True,
ecdhCurve=curve,
debug=False,
)
if self._auth() and self._validate():
success = True
Expand All @@ -917,7 +984,17 @@ def _login(self):
if not success:
return False
else:
self._session = requests.session()
# self._session = requests.session()
# Use cloudscraper to initiate newer sessions
# https://github.com/VeNoMouS/cloudscraper?tab=readme-ov-file#note
self._session = cloudscraper.create_scraper(
# browser={
# 'browser': 'chrome',
# 'platform': 'darwin',
# 'desktop': True,
# 'mobile': False,
# }
)
self.debug("newish sessions, re-using")

# save session in case we updated it
Expand Down Expand Up @@ -1044,11 +1121,12 @@ def get(
timeout=None,
host=None,
wait_for="response",
cookies=None,
):
if wait_for == "response":
self.vdebug("get+response running")
return self._request(
path, "GET", params, headers, stream, raw, timeout, host
path, "GET", params, headers, stream, raw, timeout, host, cookies
)
else:
self.vdebug("get sent")
Expand All @@ -1064,10 +1142,11 @@ def put(
raw=False,
timeout=None,
wait_for="response",
cookies=None,
):
if wait_for == "response":
self.vdebug("put+response running")
return self._request(path, "PUT", params, headers, False, raw, timeout)
return self._request(path, "PUT", params, headers, False, raw, timeout, cookies)
else:
self.vdebug("put sent")
self._arlo.bg.run(
Expand All @@ -1083,6 +1162,7 @@ def post(
timeout=None,
tid=None,
wait_for="response",
cookies=None,
):
"""Post a request to the Arlo servers.

Expand Down Expand Up @@ -1114,16 +1194,23 @@ def post(
self._request, path, "POST", params, headers, False, raw, timeout
)

def auth_post(self, path, params=None, headers=None, raw=False, timeout=None):
def auth_post(self, path, params=None, headers=None, raw=False, timeout=None, cookies=None):
return self._request(
path, "POST", params, headers, False, raw, timeout, self._arlo.cfg.auth_host, authpost=True
path, "POST", params, headers, False, raw, timeout, self._arlo.cfg.auth_host, authpost=True, cookies=cookies
)

def auth_get(
self, path, params=None, headers=None, stream=False, raw=False, timeout=None
self, path, params=None, headers=None, stream=False, raw=False, timeout=None, cookies=None
):
return self._request(
path, "GET", params, headers, stream, raw, timeout, self._arlo.cfg.auth_host, authpost=True
path, "GET", params, headers, stream, raw, timeout, self._arlo.cfg.auth_host, authpost=True, cookies=cookies
)

def auth_options(
self, path, headers=None, timeout=None
):
return self._request(
path, "OPTIONS", headers, timeout, self._arlo.cfg.auth_host, authpost=True
)

@property
Expand Down
4 changes: 4 additions & 0 deletions pyaarlo/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ def session_file(self):
def save_session(self):
return self._kw.get("save_session", True)

@property
def cookies_file(self):
return self.storage_dir + "/cookies.txt"

@property
def dump_file(self):
if self.dump:
Expand Down
7 changes: 4 additions & 3 deletions pyaarlo/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
AUTH_START_PATH = "/api/startAuth"
AUTH_FINISH_PATH = "/api/finishAuth"
AUTH_GET_FACTORS = "/api/getFactors"
AUTH_GET_FACTORID = "/api/getFactorId"
AUTH_VALIDATE_PATH = "/api/validateAccessToken"

TFA_CONSOLE_SOURCE = "console"
Expand Down Expand Up @@ -296,8 +297,8 @@
"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Mobile/15E148 Safari/604.1",
"mac":
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1.2 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"firefox":
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:85.0) "
"Gecko/20100101 Firefox/85.0",
Expand All @@ -309,6 +310,6 @@
CERT_BEGIN = '-----BEGIN CERTIFICATE-----\n'
CERT_END = '-----END CERTIFICATE-----\n'

ECDH_CURVES = ['prime256v1', 'secp384r1']
ECDH_CURVES = ['secp384r1', 'prime256v1']

VALID_DEVICE_STATES = ["provisioned", "synced"]
2 changes: 2 additions & 0 deletions pyaarlo/tfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def start(self):
ctx = None

self._imap = imaplib.IMAP4_SSL(self._arlo.cfg.tfa_host, port=self._arlo.cfg.tfa_port, ssl_context=ctx)
if self._arlo._cfg.verbose:
self._imap.debug = 4
res, status = self._imap.login(
self._arlo.cfg.tfa_username, self._arlo.cfg.tfa_password
)
Expand Down