Skip to content

Commit

Permalink
use onnx for captcha
Browse files Browse the repository at this point in the history
  • Loading branch information
ARC-MX committed May 12, 2024
1 parent e773a2b commit 1d287b7
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 88 deletions.
5 changes: 3 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
"type": "debugpy",
"request": "launch",
// "program": "${file}",
"program": "scripts\\main.py",
"console": "integratedTerminal"
"program": "main.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}\\scripts"
}
]
}
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ selenium==4.5.0
schedule==1.1.0
Pillow==9.2.0
undetected_chromedriver==3.4.7
# pypushdeer==0.0.3
pymongo~=3.12.0
python-dotenv
opencv-python
onnxruntime
Binary file added scripts/captcha.onnx
Binary file not shown.
178 changes: 96 additions & 82 deletions scripts/data_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,61 +22,70 @@

from const import *

import cv2
from PIL import Image
import numpy as np
import cv2
from onnx import ONNX
import platform

def __ease_out_expo(sep):
if sep == 1:
return 1
else:
return 1 - pow(2, -10 * sep)

def _get_tracks(distance):
# ---- 拖拽轨迹计算 start ----
"""
根据偏移量获取移动轨迹3
:param distance: 偏移量
:return: 移动轨迹
拿到移动轨迹,模仿人的滑动行为,先匀加速后匀减速
匀变速运动基本公式:
①v = v0+at
②s = v0t+1/2at^2
"""
# print('------------------------ 进度4:计算图片拖拽轨迹')
track = []
mid1 = round(distance * random.uniform(0.1, 0.2))
mid2 = round(distance * random.uniform(0.65, 0.76))
mid3 = round(distance * random.uniform(0.84, 0.88))
if distance == 0:
return [0]
#初速度
v = 0
#单位时间为0.3s来统计轨迹,轨迹即0.3内的位移
t = 0.31
#位置/轨迹列表,列表内的一个元素代表0.3s的位移
tracks = []
#当前位移
current = 0
#到达mid值开始减速
mid = distance*4/5

# 设置初始位置、初始速度、时间间隔
current, v, t = 0, 0, 0.2
distance = round(distance)

# 四段加速度
while current < distance:
if current < mid1:
a = random.randint(10, 15)
elif current < mid2:
a = random.randint(30, 40)
elif current < mid3:
a = -70
if current < mid: #加速度越小,单位时间内的位移越小,模拟的轨迹就越多越详细
a = 20
else:
a = random.randint(-25, -18)

v0 = v # 初速度 v0
v = v0 + a * t # 当前速度 v = v0 + at
v = v if v >= 0 else 0
move = v0 * t + 1 / 2 * a * (t ** 2)
move = round(move if move >= 0 else 1)
current += move # 当前位移
track.append(move) # 加入轨迹

# 超出范围
back_tracks = []
out_range = distance - current
# print("当前= {}, 距离= {}, 超出范围 = {}".format(current, distance, out_range))

if out_range < -8:
sub = int(out_range + 8)
back_tracks = [-1, sub, -3, -1, -1, -1, -1]
elif out_range < -2:
sub = int(out_range + 3)
back_tracks = [-1, -1, sub]

# print("向前轨道= {}, 返回轨道={}".format(track, back_tracks))
return {'forward_tracks': track, 'back_tracks': back_tracks}
a = -30
#初速度
v0 = v
#0.3秒内的位移
s = v0*t+0.5*a*(t**2)
#当前的位置
current += s
#添加到轨迹列表
tracks.append(round(s))
#速度已经到达v,该速度作为下次的初速度
v = v0+a*t
print("sum(tracks) is {}, sum(tracks) - distance is {}",sum(tracks),sum(tracks)-round(distance*1.02))
tracks.append(sum(tracks)-distance)
logging.info(f"image tracks distance is {sum(tracks)}")
return tracks

# cv2转base64
def cv2_to_base64(img):
img = cv2.imencode('.jpg', img)[1]
image_code = str(base64.b64encode(img))[2:-1]

return image_code

# base64转cv2
def base64_to_cv2(base64_code):
img_data = base64.b64decode(base64_code)
img_array = np.fromstring(img_data, np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
return img

def bytes2cv(img):
'''二进制图片转cv2
Expand Down Expand Up @@ -104,7 +113,6 @@ def cv2_crop(im, box):
'''
return im.copy()[box[1]:box[3], box[0]:box[2], :]


def get_transparency_location(image):
'''获取基于透明元素裁切图片的左上角、右下角坐标
Expand Down Expand Up @@ -152,12 +160,11 @@ def get_transparency_location(image):

class DataFetcher:

def __init__(self, username: str, password: str, tujian_uname: str, tujian_password: str):
def __init__(self, username: str, password: str):
dotenv.load_dotenv()
self._username = username
self._password = password
self._tujian_uname = tujian_uname
self._tujian_passwd = tujian_password
self.onnx = ONNX("./captcha.onnx")
if platform.system() == 'Windows':
pass
else:
Expand Down Expand Up @@ -248,21 +255,22 @@ def _fetch(self):
driver = self._get_webdriver()

driver.maximize_window()
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
logging.info("Webdriver initialized.")

try:
self._login(driver)
if self._login(driver):
raise Exception("_login unsuccessed !")
logging.info(f"Login successfully on {LOGIN_URL}")
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
user_id_list = self._get_user_ids(driver)
logging.info(f"将获取{len(user_id_list)}户数据,user_id: {user_id_list}")
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
balance_list = self._get_electric_balances(driver, user_id_list) #
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
### get data except electricity charge balance
last_daily_date_list, last_daily_usage_list, yearly_charge_list, yearly_usage_list = self._get_other_data(driver, user_id_list)
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
driver.quit()

logging.info("Webdriver quit after fetching data successfully.")
Expand All @@ -287,21 +295,20 @@ def _get_webdriver(self):
def _login(self, driver):

driver.get(LOGIN_URL)
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# swtich to username-password login page
driver.find_element(By.CLASS_NAME, "user").click()
# time.sleep(1)

time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# input username and password
input_elements = driver.find_elements(By.CLASS_NAME, "el-input__inner")
input_elements[0].send_keys(self._username)
input_elements[1].send_keys(self._password)
# click agree button
driver.find_element(By.XPATH, '//*[@id="login_box"]/div[2]/div[1]/form/div[1]/div[3]/div/span[2]').click()

self._click_button(driver, By.XPATH, '//*[@id="login_box"]/div[2]/div[1]/form/div[1]/div[3]/div/span[2]')
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# click login button
self._click_button(driver, By.CLASS_NAME, "el-button.el-button--primary")
time.sleep(10)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT*2)
logging.info("Click login button.\r")
# sometimes ddddOCR may fail, so add retry logic)
for retry_times in range(1, self.RETRY_TIMES_LIMIT + 1):
Expand All @@ -311,27 +318,35 @@ def _login(self, driver):
targe_JS = 'return document.getElementsByClassName("slide-verify-block")[0].toDataURL("image/png");'
# get base64 image data
im_info = driver.execute_script(background_JS)
im_base64 = im_info.split(',')[1]
background = im_info.split(',')[1]
background_image = base64_to_cv2(background)
logging.info(f"Get electricity canvas image successfully.\r")
distance = int(self.base64_api(im_base64))
distance = self.onnx.get_distance(background_image)
logging.info(f"Image CaptCHA distance is {distance}.\r")
self._sliding_track(driver, distance)
time.sleep(2)

# slider = driver.find_element(By.CLASS_NAME, "slide-verify-slider-mask-item")
# ActionChains(driver).click_and_hold(slider).perform()
# ActionChains(driver).move_by_offset(xoffset=round(distance*1.06), yoffset=0).perform()
# ActionChains(driver).release().perform()

self._sliding_track(driver, round(distance*1.06)) #1.06是补偿
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
if (driver.current_url == LOGIN_URL): # if login not success
try:
self._click_button(driver, By.CLASS_NAME, "el-button.el-button--primary")
time.sleep(12)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT*2)
logging.info(f"Sliding CAPTCHA recognition failed and reloaded.\r")
continue
except:
logging.debug(
f"Login failed, maybe caused by invalid captcha, {self.RETRY_TIMES_LIMIT - retry_times} retry times left.")
else:
return
return False

logging.error(f"Login failed, maybe caused by Sliding CAPTCHA recognition failed")
raise Exception(
"Login failed, maybe caused by 1.incorrect phone_number and password, please double check. or 2. network, please mnodify LOGIN_EXPECTED_TIME in .env and run docker compose up --build.")
return True


def _get_electric_balances(self, driver, user_id_list):
Expand All @@ -340,7 +355,7 @@ def _get_electric_balances(self, driver, user_id_list):

# switch to electricity charge balance page
driver.get(BALANCE_URL)
time.sleep(2)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# get electricity charge balance for each user id
for i in range(1, len(user_id_list) + 1):
balance = self._get_eletric_balance(driver)
Expand All @@ -354,6 +369,7 @@ def _get_electric_balances(self, driver, user_id_list):
# swtich to next userid
if (i != len(user_id_list)):
self._click_button(driver, By.CLASS_NAME, "el-input__suffix")
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
self._click_button(driver, By.XPATH,
f"//ul[@class='el-scrollbar__view el-select-dropdown__list']/li[{i + 1}]")

Expand Down Expand Up @@ -405,6 +421,7 @@ def _get_other_data(self, driver, user_id_list):
# switch to next user id
if i != len(user_id_list):
self._click_button(driver, By.CLASS_NAME, "el-input__suffix")
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
self._click_button(driver, By.XPATH,
f"//body/div[@class='el-select-dropdown el-popper']//ul[@class='el-scrollbar__view el-select-dropdown__list']/li[{i + 1}]")

Expand All @@ -414,6 +431,7 @@ def _get_user_ids(self, driver):

# click roll down button for user id
self._click_button(driver, By.XPATH, "//div[@class='el-dropdown']/span")
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# wait for roll down menu displayed
target = driver.find_element(By.CLASS_NAME, "el-dropdown-menu.el-popper").find_element(By.TAG_NAME, "li")
WebDriverWait(driver, self.DRIVER_IMPLICITY_WAIT_TIME).until(EC.visibility_of(target))
Expand All @@ -438,7 +456,7 @@ def _get_yearly_data(self, driver):

try:
self._click_button(driver, By.XPATH, "//div[@class='el-tabs__nav is-top']/div[@id='tab-first']")
time.sleep(1)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# wait for data displayed
target = driver.find_element(By.CLASS_NAME, "total")
WebDriverWait(driver, self.DRIVER_IMPLICITY_WAIT_TIME).until(EC.visibility_of(target))
Expand All @@ -464,15 +482,15 @@ def _get_yesterday_usage(self, driver):
try:
# 点击日用电量
self._click_button(driver, By.XPATH, "//div[@class='el-tabs__nav is-top']/div[@id='tab-second']")
time.sleep(1)
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# wait for data displayed
usage_element = driver.find_element(By.XPATH,
"//div[@class='el-tab-pane dayd']//div[@class='el-table__body-wrapper is-scrolling-none']/table/tbody/tr[1]/td[2]/div")
WebDriverWait(driver, self.DRIVER_IMPLICITY_WAIT_TIME).until(EC.visibility_of(usage_element)) # 等待用电量出现

# 增加是哪一天
date_element = driver.find_element(By.XPATH,
"//div[@class='el-tab-pane dayd']//div[@class='el-table__body-wrapper is-scrolling-none']/table/tbody/tr[1]/td[1]/div")
"//div[@class='el-tab-pane dayd']//div[@class='el-table__body-wrapper is-scrolling-none']/table/tbody/tr[1]/td[1]/div")
last_daily_date = date_element.text # 获取最近一次用电量的日期
return last_daily_date, float(usage_element.text)
except:
Expand All @@ -482,6 +500,7 @@ def _get_yesterday_usage(self, driver):
def save_30_days_usage(self, driver, user_id):
"""储存30天用电量"""
self._click_button(driver, By.XPATH, "//*[@id='pane-second']/div[1]/label[2]/span[2]")
time.sleep(self.RETRY_WAIT_TIME_OFFSET_UNIT)
# 等待30天用电量的数据出现
usage_element = driver.find_element(By.XPATH,
"//div[@class='el-tab-pane dayd']//div[@class='el-table__body-wrapper is-scrolling-none']/table/tbody/tr[1]/td[2]/div")
Expand All @@ -506,7 +525,6 @@ def save_30_days_usage(self, driver, user_id):
except:
logging.debug(f"{day}的用电量存入数据库失败,可能已经存在")


@staticmethod
def _click_button(driver, button_search_type, button_search_key):
'''wrapped click function, click only when the element is clickable'''
Expand All @@ -530,22 +548,18 @@ def _get_chromium_version():
version = re.findall(r"(\d*)\.", result)[0]
logging.info(f"chromium-driver version is {version}")
return int(version)


@staticmethod
def _sliding_track(driver, distance):# 机器模拟人工滑动轨迹
# 获取按钮
slider = driver.find_element(By.CLASS_NAME, "slide-verify-slider-mask-item")
ActionChains(driver).click_and_hold(slider).perform()
# 获取轨迹
tracks = _get_tracks(distance)
for t in tracks['forward_tracks']:
yoffset_random = random.uniform(-2, 4)
ActionChains(driver).move_by_offset(xoffset=t, yoffset=yoffset_random).perform()

for t in tracks['back_tracks']:
yoffset_random = random.uniform(-2, 4)
ActionChains(driver).move_by_offset(xoffset=t, yoffset=yoffset_random).perform()
# tracks = _get_tracks(distance)
# for t in tracks:
yoffset_random = random.uniform(-2, 4)
ActionChains(driver).move_by_offset(xoffset=distance, yoffset=yoffset_random).perform()
# time.sleep(0.2)
ActionChains(driver).release().perform()


Expand Down
4 changes: 1 addition & 3 deletions scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ def main():
JOB_START_TIME = os.getenv("JOB_START_TIME")
FIRST_SLEEP_TIME = int(os.getenv("FIRST_SLEEP_TIME"))
LOG_LEVEL = os.getenv("LOG_LEVEL")
TUJIAN_UNAME = os.getenv("TUJIAN_UNAME")
TUJIAN_PASSWORD = os.getenv("TUJIAN_PASSWORD")

except Exception as e:
logging.error(f"读取.env文件失败,程序将退出,错误信息为{e}")
Expand All @@ -36,7 +34,7 @@ def main():
logger_init(LOG_LEVEL)
logging.info("程序开始,当前仓库版本为1.3.3,仓库地址为https://github.com/ARC-MX/sgcc_electricity_new.git")

fetcher = DataFetcher(PHONE_NUMBER, PASSWORD, TUJIAN_UNAME, TUJIAN_PASSWORD)
fetcher = DataFetcher(PHONE_NUMBER, PASSWORD)
updator = SensorUpdator(HASS_URL, HASS_TOKEN)
logging.info(f"当前登录的用户名为: {PHONE_NUMBER},homeassistant地址为{HASS_URL},程序将在每天{JOB_START_TIME}执行")
schedule.every().day.at(JOB_START_TIME).do(run_task, fetcher, updator)
Expand Down
Loading

0 comments on commit 1d287b7

Please sign in to comment.