Skip to content

Commit

Permalink
Merge pull request #579 from yajiwa/main
Browse files Browse the repository at this point in the history
更新"微博热搜"接口
  • Loading branch information
HibiKier authored May 24, 2022
2 parents 3bcef18 + 570b5b1 commit 72ace2d
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 95 deletions.
25 changes: 0 additions & 25 deletions plugins/alapi/_data_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from utils.image_utils import BuildImage
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from typing import Optional, Tuple, Union
from configs.config import Config
from utils.http_utils import AsyncHttpx
Expand All @@ -28,24 +24,3 @@ async def get_data(url: str, params: Optional[dict] = None) -> Tuple[Union[dict,
return f'发生了错误...code:{data["code"]}', 999
except TimeoutError:
return "超时了....", 998


def gen_wbtop_pic(data: dict) -> MessageSegment:
"""
生成微博热搜图片
:param data: 微博热搜数据
"""
bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979")
wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png")
bk.paste(wbtop_bk)
text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979")
for i, data in enumerate(data):
title = f"{i+1}. {data['hot_word']}"
hot = data["hot_word_num"]
img = BuildImage(700, 30, font_size=20)
w, h = img.getsize(title)
img.text((10, int((30 - h) / 2)), title)
img.text((580, int((30 - h) / 2)), hot)
text_bk.paste(img)
bk.paste(text_bk, (0, 280))
return image(b64=bk.pic2bs4())
148 changes: 78 additions & 70 deletions plugins/alapi/wbtop.py → plugins/wbtop/__init__.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,70 +1,78 @@
from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message
from nonebot.params import CommandArg
from services.log import logger
from ._data_source import get_data, gen_wbtop_pic
from utils.utils import is_number
from configs.path_config import IMAGE_PATH
from utils.http_utils import AsyncPlaywright
import asyncio

__zx_plugin_name__ = "微博热搜"
__plugin_usage__ = """
usage:
在QQ上吃个瓜
指令:
微博热搜:发送实时热搜
微博热搜 [id]:截图该热搜页面
示例:微博热搜 5
""".strip()
__plugin_des__ = "刚买完瓜,在吃瓜现场"
__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"]
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["微博热搜"],
}

wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True)


wbtop_url = "https://v2.alapi.cn/api/new/wbtop"

wbtop_data = []


@wbtop.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
global wbtop_data
msg = arg.extract_plain_text().strip()
if not wbtop_data or not msg:
data, code = await get_data(wbtop_url)
if code != 200:
await wbtop.finish(data, at_sender=True)
wbtop_data = data["data"]
if not msg:
img = await asyncio.get_event_loop().run_in_executor(
None, gen_wbtop_pic, wbtop_data
)
await wbtop.send(img)
logger.info(
f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 查询微博热搜"
)
if is_number(msg) and 0 < int(msg) <= 50:
url = wbtop_data[int(msg) - 1]["url"]
await wbtop.send("开始截取数据...")
img = await AsyncPlaywright.screenshot(
url,
f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png",
"#pl_feed_main",
wait_time=12
)
if img:
await wbtop.send(img)
else:
await wbtop.send("发生了一些错误.....")

from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message
from nonebot.params import CommandArg
from services.log import logger
from .data_source import gen_wbtop_pic,get_wbtop
from utils.utils import is_number
from configs.path_config import IMAGE_PATH
from utils.http_utils import AsyncPlaywright
import asyncio
import datetime
__zx_plugin_name__ = "微博热搜"
__plugin_usage__ = """
usage:
在QQ上吃个瓜
指令:
微博热搜:发送实时热搜
微博热搜 [id]:截图该热搜页面
示例:微博热搜 5
""".strip()
__plugin_des__ = "刚买完瓜,在吃瓜现场"
__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"]
__plugin_version__ = 0.2
__plugin_author__ = "HibiKier & yajiwa"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["微博热搜"],
}

wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True)


wbtop_url = "https://weibo.com/ajax/side/hotSearch"

wbtop_data = []


@wbtop.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
global wbtop_data
msg = arg.extract_plain_text().strip()
if not wbtop_data or not msg:
if wbtop_data:
now_time = datetime.datetime.now()
if now_time > wbtop_data["time"] + datetime.timedelta(minutes=5):
data, code = await get_wbtop(wbtop_url)
if code != 200:
await wbtop.finish(data, at_sender=True)
wbtop_data = data
else:
data, code = await get_wbtop(wbtop_url)
if code != 200:
await wbtop.finish(data, at_sender=True)
wbtop_data = data
if not msg:
img = await asyncio.get_event_loop().run_in_executor(
None, gen_wbtop_pic, wbtop_data["data"]
)
await wbtop.send(img)
logger.info(
f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 查询微博热搜"
)
if is_number(msg) and 0 < int(msg) <= 50:
url = wbtop_data["data"][int(msg) - 1]["url"]
await wbtop.send("开始截取数据...")
img = await AsyncPlaywright.screenshot(
url,
f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png",
"#pl_feed_main",
wait_time=12
)
if img:
await wbtop.send(img)
else:
await wbtop.send("发生了一些错误.....")

62 changes: 62 additions & 0 deletions plugins/wbtop/data_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from utils.image_utils import BuildImage
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from typing import Tuple, Union
from utils.http_utils import AsyncHttpx
import datetime


async def get_wbtop(url: str) -> Tuple[Union[dict, str], int]:
"""
:param url: 请求链接
"""
n = 0
while True:
try:
data = []
get_response = (await AsyncHttpx.get(url, timeout=20))
if get_response.status_code == 200:
data_json = get_response.json()['data']['realtime']
for data_item in data_json:
# 如果是广告,则不添加
if 'is_ad' in data_item:
continue
dic = {
'hot_word': data_item['note'],
'hot_word_num': str(data_item['num']),
'url': 'https://s.weibo.com/weibo?q=%23' + data_item['word'] + '%23',
}
data.append(dic)
if not data:
return "没有搜索到...", 997
return {'data': data, 'time': datetime.datetime.now()}, 200
else:
if n > 2:
return f'获取失败,请十分钟后再试', 999
else:
n += 1
continue
except TimeoutError:
return "超时了....", 998


def gen_wbtop_pic(data: dict) -> MessageSegment:
"""
生成微博热搜图片
:param data: 微博热搜数据
"""
bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979")
wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png")
bk.paste(wbtop_bk)
text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979")
for i, data in enumerate(data):
title = f"{i + 1}. {data['hot_word']}"
hot = data["hot_word_num"]
img = BuildImage(700, 30, font_size=20)
w, h = img.getsize(title)
img.text((10, int((30 - h) / 2)), title)
img.text((580, int((30 - h) / 2)), hot)
text_bk.paste(img)
bk.paste(text_bk, (0, 280))
return image(b64=bk.pic2bs4())

0 comments on commit 72ace2d

Please sign in to comment.