Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

更新"微博热搜"接口 #579

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions plugins/alapi/_data_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from utils.image_utils import BuildImage
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from typing import Optional, Tuple, Union
from configs.config import Config
from utils.http_utils import AsyncHttpx
Expand All @@ -28,24 +24,3 @@ async def get_data(url: str, params: Optional[dict] = None) -> Tuple[Union[dict,
return f'发生了错误...code:{data["code"]}', 999
except TimeoutError:
return "超时了....", 998


def gen_wbtop_pic(data: dict) -> MessageSegment:
"""
生成微博热搜图片
:param data: 微博热搜数据
"""
bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979")
wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png")
bk.paste(wbtop_bk)
text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979")
for i, data in enumerate(data):
title = f"{i+1}. {data['hot_word']}"
hot = data["hot_word_num"]
img = BuildImage(700, 30, font_size=20)
w, h = img.getsize(title)
img.text((10, int((30 - h) / 2)), title)
img.text((580, int((30 - h) / 2)), hot)
text_bk.paste(img)
bk.paste(text_bk, (0, 280))
return image(b64=bk.pic2bs4())
148 changes: 78 additions & 70 deletions plugins/alapi/wbtop.py → plugins/wbtop/__init__.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,70 +1,78 @@
from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message
from nonebot.params import CommandArg
from services.log import logger
from ._data_source import get_data, gen_wbtop_pic
from utils.utils import is_number
from configs.path_config import IMAGE_PATH
from utils.http_utils import AsyncPlaywright
import asyncio

__zx_plugin_name__ = "微博热搜"
__plugin_usage__ = """
usage:
在QQ上吃个瓜
指令:
微博热搜:发送实时热搜
微博热搜 [id]:截图该热搜页面
示例:微博热搜 5
""".strip()
__plugin_des__ = "刚买完瓜,在吃瓜现场"
__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"]
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["微博热搜"],
}

wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True)


wbtop_url = "https://v2.alapi.cn/api/new/wbtop"

wbtop_data = []


@wbtop.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
global wbtop_data
msg = arg.extract_plain_text().strip()
if not wbtop_data or not msg:
data, code = await get_data(wbtop_url)
if code != 200:
await wbtop.finish(data, at_sender=True)
wbtop_data = data["data"]
if not msg:
img = await asyncio.get_event_loop().run_in_executor(
None, gen_wbtop_pic, wbtop_data
)
await wbtop.send(img)
logger.info(
f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 查询微博热搜"
)
if is_number(msg) and 0 < int(msg) <= 50:
url = wbtop_data[int(msg) - 1]["url"]
await wbtop.send("开始截取数据...")
img = await AsyncPlaywright.screenshot(
url,
f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png",
"#pl_feed_main",
wait_time=12
)
if img:
await wbtop.send(img)
else:
await wbtop.send("发生了一些错误.....")

from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message
from nonebot.params import CommandArg
from services.log import logger
from .data_source import gen_wbtop_pic,get_wbtop
from utils.utils import is_number
from configs.path_config import IMAGE_PATH
from utils.http_utils import AsyncPlaywright
import asyncio
import datetime
__zx_plugin_name__ = "微博热搜"
__plugin_usage__ = """
usage:
在QQ上吃个瓜
指令:
微博热搜:发送实时热搜
微博热搜 [id]:截图该热搜页面
示例:微博热搜 5
""".strip()
__plugin_des__ = "刚买完瓜,在吃瓜现场"
__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"]
__plugin_version__ = 0.2
__plugin_author__ = "HibiKier & yajiwa"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["微博热搜"],
}

wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True)


wbtop_url = "https://weibo.com/ajax/side/hotSearch"

wbtop_data = []


@wbtop.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
global wbtop_data
msg = arg.extract_plain_text().strip()
if not wbtop_data or not msg:
if wbtop_data:
now_time = datetime.datetime.now()
if now_time > wbtop_data["time"] + datetime.timedelta(minutes=5):
data, code = await get_wbtop(wbtop_url)
if code != 200:
await wbtop.finish(data, at_sender=True)
wbtop_data = data
else:
data, code = await get_wbtop(wbtop_url)
if code != 200:
await wbtop.finish(data, at_sender=True)
wbtop_data = data
if not msg:
img = await asyncio.get_event_loop().run_in_executor(
None, gen_wbtop_pic, wbtop_data["data"]
)
await wbtop.send(img)
logger.info(
f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 查询微博热搜"
)
if is_number(msg) and 0 < int(msg) <= 50:
url = wbtop_data["data"][int(msg) - 1]["url"]
await wbtop.send("开始截取数据...")
img = await AsyncPlaywright.screenshot(
url,
f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png",
"#pl_feed_main",
wait_time=12
)
if img:
await wbtop.send(img)
else:
await wbtop.send("发生了一些错误.....")

62 changes: 62 additions & 0 deletions plugins/wbtop/data_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from utils.image_utils import BuildImage
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from typing import Tuple, Union
from utils.http_utils import AsyncHttpx
import datetime


async def get_wbtop(url: str) -> Tuple[Union[dict, str], int]:
"""
:param url: 请求链接
"""
n = 0
while True:
try:
data = []
get_response = (await AsyncHttpx.get(url, timeout=20))
if get_response.status_code == 200:
data_json = get_response.json()['data']['realtime']
for data_item in data_json:
# 如果是广告,则不添加
if 'is_ad' in data_item:
continue
dic = {
'hot_word': data_item['note'],
'hot_word_num': str(data_item['num']),
'url': 'https://s.weibo.com/weibo?q=%23' + data_item['word'] + '%23',
}
data.append(dic)
if not data:
return "没有搜索到...", 997
return {'data': data, 'time': datetime.datetime.now()}, 200
else:
if n > 2:
return f'获取失败,请十分钟后再试', 999
else:
n += 1
continue
except TimeoutError:
return "超时了....", 998


def gen_wbtop_pic(data: dict) -> MessageSegment:
"""
生成微博热搜图片
:param data: 微博热搜数据
"""
bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979")
wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png")
bk.paste(wbtop_bk)
text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979")
for i, data in enumerate(data):
title = f"{i + 1}. {data['hot_word']}"
hot = data["hot_word_num"]
img = BuildImage(700, 30, font_size=20)
w, h = img.getsize(title)
img.text((10, int((30 - h) / 2)), title)
img.text((580, int((30 - h) / 2)), hot)
text_bk.paste(img)
bk.paste(text_bk, (0, 280))
return image(b64=bk.pic2bs4())