forked from dompling/PagerMaid_Plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
neteasemusic.py
542 lines (526 loc) · 29.1 KB
/
neteasemusic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# -*- coding: utf-8 -*-
import json
import requests
import re
import base64
import codecs
import random
import math
from time import sleep
from pagermaid.listener import listener
from pagermaid import bot
from pagermaid.utils import alias_command
from os import remove, path, mkdir, getcwd
from os.path import exists
from collections import defaultdict
from telethon.tl.types import DocumentAttributeAudio
class RetryError(Exception): # 重试错误,用于再次重试
pass
@listener(is_plugin=True, outgoing=True, command=alias_command("nem"),
description="网易云搜/点歌。\n指令s为搜索,p为点歌,id为歌曲ID点歌,r为随机热歌(无关键词)\n搜索在s后添加数字如`-nem` `s8` "
"`<关键词>`调整结果数量\n搜索灰色歌曲请尽量**指定歌手**\n可回复搜索结果消息`-nem` `p` `<歌曲数字序号>`点歌",
parameters="<指令> <关键词>")
async def nem(context):
proxies = {}
proxynum = 0
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063',
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,"
"application/signed-exchange;v=b3;q=0.9", "X-Real-IP": "223.252.199.66"}
proxy = [{'http': 'http://music.lolico.me:39000', 'https': 'http://music.lolico.me:39000'},
{'http': 'http://netease.unlock.feiwuis.me:6958', 'https': 'https://netease.unlock.feiwuis.me:6958'}]
helptext = "**使用方法:** `-nem` `<指令>` `<关键词>`\n\n指令s为搜索,p为点歌,id为歌曲ID点歌,r为随机热歌(无关键词)\n搜索在s后添加数字如`-nem` `s8` " \
"`<关键词>`调整结果数量\n搜索灰色歌曲请尽量**指定歌手**\n可回复搜索结果消息`-nem` `p` `<歌曲数字序号>`点歌 "
apifailtext = "出错了呜呜呜 ~ 试了好多好多次都无法访问到 API 服务器 。"
if len(context.parameter) < 2:
if len(context.parameter) == 0:
await context.edit(helptext)
return
elif context.parameter[0] == "r": # 随机热歌
await context.edit("随机中 . . .")
for _ in range(20): # 最多重试20次
apinum = random.randint(0, 1)
apitype = random.randint(0, 1)
if apitype == 0:
apitype = "飙升榜"
else:
apitype = "热歌榜"
if apinum == 0:
url = "https://api.vvhan.com/api/rand.music?type=json&sort=" + apitype
else:
url = "https://api.uomg.com/api/rand.music?sort=" + apitype + "&format=json"
status = False
try:
randsong = requests.get(url, headers=headers)
if randsong.status_code == 200:
randsong = json.loads(randsong.content)
if apinum == 0 and randsong['success'] is True:
context.parameter[0] = "id"
context.parameter.append(
str(randsong['info']['id']))
status = True
break
elif apinum == 1 and randsong['code'] == 1:
context.parameter[0] = "id"
context.parameter.append(
str(randsong['data']['url'][45:]))
status = True
break
except:
continue
if status is False:
await context.edit(apifailtext)
sleep(3)
await context.delete()
return
else: # 错误输入
await context.edit(helptext)
return
# 整理关键词
keyword = ''
for i in range(1, len(context.parameter)):
keyword += context.parameter[i] + " "
keyword = keyword[:-1]
idplay = False
if context.parameter[0] == "id": # ID点歌功能
if len(context.parameter) > 2:
await context.edit(helptext)
return
idplay = keyword
context.parameter[0] = "p"
if context.parameter[0][0] == "s": # 搜索功能
await context.edit(f"【{keyword}】搜索中 . . .")
if len(context.parameter[0]) > 1:
limit = str(context.parameter[0][1:])
else:
limit = "5"
url = "http://music.163.com/api/search/pc?&s=" + \
keyword + "&offset=0&limit=" + limit + "&type=1"
for _ in range(20): # 最多尝试20次
status = False
req = requests.request("GET", url, headers=headers)
if req.status_code == 200:
req = json.loads(req.content)
if req['code'] == 200:
result = req['result']
if req['result']['songCount'] == 0:
result = False
else:
result = False
if result:
info = defaultdict()
for i in range(len(req['result']['songs'])):
info[i] = {'id': '', 'title': '', 'alias': '',
'album': '', 'albumpic': '', 'artist': ''}
info[i]['id'] = req['result']['songs'][i]['id']
info[i]['title'] = req['result']['songs'][i]['name']
info[i]['alias'] = req['result']['songs'][i]['alias']
info[i]['album'] = req['result']['songs'][i]['album']['name']
info[i]['albumpic'] = req['result']['songs'][i]['album']['picUrl']
for j in range(len(req['result']['songs'][i]['artists'])):
info[i]['artist'] += req['result']['songs'][i]['artists'][j]['name'] + " "
text = f"<strong>关于【{keyword}】的结果如下</strong> \n"
for i in range(len(info)):
text += f"#{i+1}: \n<strong>歌名</strong>: {info[i]['title']}\n"
if info[i]['alias']:
text += f"<strong>别名</strong>: <i>{info[i]['alias'][0]}</i>\n"
if info[i]['album']:
res = '<a href="' + \
info[i]['albumpic'] + '">' + \
info[i]['album'] + '</a>'
text += f"<strong>专辑</strong>: {res} \n"
text += f"<strong>作者</strong>: {info[i]['artist']}\n<strong>歌曲ID</strong>: <code>{info[i]['id']}</code>\n————————\n"
text += "\n<strong>回复此消息</strong><code>-nem p <歌曲序号></code><strong>即可点歌</strong>"
await context.edit(text, parse_mode='html', link_preview=True)
status = True
break
else:
await context.edit("**未搜索到结果**")
sleep(3)
await context.delete()
status = True
break
else:
continue
if status is False:
await context.edit(apifailtext)
sleep(3)
await context.delete()
return
elif context.parameter[0] == "p": # 点歌功能
try:
reply = await context.get_reply_message()
except ValueError:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
search = ""
title = ""
if reply:
msg = reply.message
search = re.findall(".*【(.*)】.*", msg)
if search:
try:
start = "#" + context.parameter[1] + ":"
search = ".*" + start + "(.*?)" + '————————' + ".*"
msg = re.findall(search, msg, re.S)[0]
search = ".*歌曲ID: (.*)\n.*"
title = ".*歌名: (.*?)\n.*"
title = "【"+re.findall(title, msg, re.S)[0]+"】"
idplay = re.findall(search, msg, re.S)[0]
if reply.sender.is_self:
await reply.edit(f"{title}点歌完成")
except:
await context.edit("出错了呜呜呜 ~ 无效的歌曲序号。")
return
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
await context.edit("获取中 . . .")
try:
import eyed3
imported = True
except ImportError:
imported = False
await bot.send_message(context.chat_id, '(`eyeD3`支持库未安装,歌曲文件信息将无法导入\n请使用 `-sh` `pip3` `install` `eyed3` '
'安装,或自行ssh安装)')
url = "http://music.163.com/api/search/pc?&s=" + \
keyword + "&offset=0&limit=1&type=1"
for _ in range(20): # 最多尝试20次
status = False
if proxynum > (len(proxy) - 1): # 代理自动切换至下一个
proxynum = 0
proxies = proxy[proxynum]
proxynum += 1
if idplay: # 指定ID播放
url = "http://music.163.com/api/song/detail?id=" + \
idplay + "&ids=[" + idplay + "]"
# 搜索后播放
req = requests.request("GET", url, headers=headers)
if req.status_code == 200:
req = json.loads(req.content)
if req['code'] == 200:
if idplay:
req['result'] = req
result = req['result']
if not idplay:
if req['result']['songCount'] == 0:
result = False
else:
result = False
if result:
info = {'id': req['result']['songs'][0]['id'], 'title': req['result']['songs'][0]['name'],
'alias': req['result']['songs'][0]['alias'],
'album': req['result']['songs'][0]['album']['name'],
'albumpic': req['result']['songs'][0]['album']['picUrl'], 'artist': '', 'br': ''}
if req['result']['songs'][0]['hMusic']:
info['br'] = req['result']['songs'][0]['hMusic']['bitrate']
elif req['result']['songs'][0]['mMusic']:
info['br'] = req['result']['songs'][0]['mMusic']['bitrate']
elif req['result']['songs'][0]['lMusic']:
info['br'] = req['result']['songs'][0]['lMusic']['bitrate']
for j in range(len(req['result']['songs'][0]['artists'])):
info['artist'] += req['result']['songs'][0]['artists'][j]['name'] + "; "
info['artist'] = info['artist'][:-2]
if title:
title = ""
else:
title = f"【{info['title']}】"
await context.edit(f"{title}下载中 . . .")
try:
from Crypto.Cipher import AES
AES.new("0CoJUm6Qyw8W8jud".encode('utf-8'),
AES.MODE_CBC, "0102030405060708".encode('utf-8'))
ccimported = True
except ImportError:
ccimported = False
await bot.send_message(context.chat_id, '(`PyCryptodome`支持库未安装,音乐曲库/音质受限\n请使用 `-sh` `pip3` '
'`install` `pycryptodome` 安装,或自行ssh安装)')
name = info['title'].replace('/', " ") + ".mp3"
name = name.encode('utf-8').decode('utf-8')
if ccimported: # 尝试使用高清音质下载
songid = str(info['id'])
class WangyiyunDownload(object):
def __init__(self):
self.key = '0CoJUm6Qyw8W8jud'
self.public_key = "010001"
self.modulus = '00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b72515' \
'2b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92' \
'557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d' \
'3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7 '
# 偏移量
self.iv = "0102030405060708"
# 请求头
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ('
'KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36',
# 传入登录cookie,
'Cookie': 'MUSIC_U=f52f220df171da480dbf33ce899479615'
'85a7fdf08c89a2a4bdd6efebd86544233a649814e309366;',
"X-Real-IP": "223.252.199.66",
}
# 请求url
self.url = 'https://music.163.com/weapi/song/enhance/player/url/v1?csrf_token='
# 生成16位随机数字符串
def set_random_num(self):
random_num = ''
# 随机取16个字符
string = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
for ___ in range(16):
n = math.floor(
random.uniform(0, 1) * len(string))
random_num += string[n]
# 返回16位随机数字符串
return random_num
# 生成encSecKey
# 通过public_key和modulus对random_num进行RSA加密
def RSA_encrypt(self, random_num):
# 将16位随机数字符串倒序并以utf-8编码
random_num = random_num[::-1].encode('utf-8')
# 将其以hex(16进制)编码
random_num = codecs.encode(
random_num, 'hex_codec')
# 加密(三者均要从16进制转换为10进制)
# int(n, 16) --> 将16进制字符串n转换为10进制
encryption = int(
random_num, 16) ** int(self.public_key, 16) % int(self.modulus, 16)
# 将加密后的数据转换为16进制字符串
encryption = format(encryption, 'x')
# 返回加密后的字符串
return encryption
# 生成params
# 根据key和iv对msg进行AES加密,需调用两次
# key:
# 第一次: key
# 第二次: random_num
# iv: 偏移量iv
def AES_encrypt(self, msg, key, iv):
# 先将msg按需补全至16的倍数
# 需补全的位数
pad = (16 - len(msg) % 16)
# 补全
msg = msg + pad * chr(pad)
# 将key,iv和msg均以utf-8编码
key = key.encode('utf-8')
iv = iv.encode('utf-8')
msg = msg.encode('utf-8')
# 根据key和iv生成密钥,模式为CBC模式
encryptor = AES.new(key, AES.MODE_CBC, iv)
# 加密
encrypt_aes = encryptor.encrypt(msg)
# 先将加密后的值进行base64编码
encrypt_text = base64.encodebytes(encrypt_aes)
# 将其转换为utf-8字符串
encrypt_text = str(encrypt_text, 'utf-8')
# 返回加密后的字符串
return encrypt_text
# 根据歌曲song_id,生成需要传输的data
# 其中包括params和encSecKey
def construct_data(self, song_id):
# 生成16位随机数字符串
random_num = self.set_random_num()
# 生成encSecKey
encSecKey = self.RSA_encrypt(
random_num=random_num)
# 调用两次AES加密生成params
# 初始化歌曲song_info
song_info = '{"ids":"[%s]","level":"exhigh","encodeType":"mp3",' \
'"csrf_token":"477c1bd99fddedb3adc074f47fee2d35"}' % song_id
# 第一次加密,传入encText, key和iv
first_encryption = self.AES_encrypt(
msg=song_info, key=self.key, iv=self.iv)
# 第二次加密, 传入first_encryption, random_num和iv
encText = self.AES_encrypt(
msg=first_encryption, key=random_num, iv=self.iv)
# 生成data
data = {
'params': encText,
'encSecKey': encSecKey
}
# 返回data
return data
# 发送请求,获取下载链接
def get_real_url(self):
# 输入歌曲song_id
self.song_id = songid
# 获取data
data = self.construct_data(
song_id=self.song_id)
# 发送请求
request = requests.post(
url=self.url, headers=self.headers, data=data, proxies=proxies, verify=False)
# 初始化real_url
real_url = ''
# 处理返回信息
try:
js_text = json.loads(request.text)
data = js_text['data']
if len(data) != 0:
code = data[0]['code']
# 获取成功
if code == 200:
# 歌曲真实地址
real_url = data[0]['url']
else:
raise RetryError
except:
print('生成的params和encSecKey有误!重试中!')
raise RetryError
# 返回real_url
return real_url
def download(self):
# 获取下载链接
real_url = self.get_real_url()
if real_url == '':
print('链接获取失败!')
raise RetryError
else:
file = name
# 开始下载
try:
content = requests.get(
url=real_url, headers=self.headers).content
with open(file, 'wb') as fp:
fp.write(content)
except:
print('服务器连接出错')
raise RetryError
for __ in range(6): # 最多尝试6次
if proxynum > (len(proxy) - 1): # 代理自动切换至下一个
proxynum = 0
proxies = proxy[proxynum]
proxynum += 1
try:
WangyiyunDownload().download()
ccimported = True
break
except:
ccimported = False
if not exists(name):
ccimported = False
if ccimported is False: # 下载(普通音质)
music = requests.request(
"GET", "http://music.163.com/api/song/enhance/download/url?&br=" + str(info['br']) + "&id=" + str(info['id']), headers=headers, proxies=proxies, verify=False)
if music.status_code == 200:
music = json.loads(music.content)
if not music['data']['url']:
music = requests.request(
"GET", "https://music.163.com/song/media/outer/url?id=" + str(info['id']) + ".mp3", headers=headers, verify=False)
if music.status_code != 200:
continue
else:
music = requests.request(
"GET", music['data']['url'], headers=headers)
else:
continue
performers = info['artist'].replace(';', ',')
cap = performers + " - " + "**" + info['title'] + "**"
if ccimported is False:
with open(name, 'wb') as f:
f.write(music.content)
if (path.getsize(name) / 1024) < 100:
remove(name)
try:
if reply.sender.is_self:
await reply.delete()
except:
pass
await context.delete()
res = '或者你可以点击<a href="https://music.163.com/#/song?id=' + \
str(info['id']) + '">' + \
' <strong>这里</strong> ' + '</a>' + '前往网页版收听'
await bot.send_message(context.chat_id, f"<strong>【{info['title']}】</strong>\n" +
"歌曲获取失败,资源获取可能受限,你可以再次尝试。\n" + res, parse_mode='html', link_preview=True)
return
duration = 0
imagedata = requests.get(
info['albumpic'], headers=headers).content
if imported is True:
await context.edit(f"{title}信息导入中 . . .")
tag = eyed3.load(name)
duration = int(tag.info.time_secs)
tag.initTag()
tag = tag.tag
tag.artist = info['artist']
tag.title = info['title']
tag.album = info['album']
tag.images.remove('')
tag.images.set(6, imagedata, "image/jpeg", u"Media")
tag.save(version=eyed3.id3.ID3_DEFAULT_VERSION,
encoding='utf-8')
br = ""
if imported is True:
br = "#" + \
str(eyed3.mp3.Mp3AudioFile(
name).info.bit_rate[1]) + "kbps "
alias = ""
if info['alias']:
alias = "\n\n__" + info['alias'][0] + "__"
cap += "\n#NeteaseMusic " + br + alias
await context.edit(f"{title}上传中 . . .")
if not exists("plugins/NeteaseMusicExtra/FastTelethon.py"):
if not exists("plugins/NeteaseMusicExtra"):
mkdir("plugins/NeteaseMusicExtra")
for ____ in range(6): # 最多尝试6次
faster = requests.request(
"GET", "https://gist.githubusercontent.com/TNTcraftHIM"
"/ca2e6066ed5892f67947eb2289dd6439/raw"
"/86244b02c7824a3ca32ce01b2649f5d9badd2e49/FastTelethon.py")
if faster.status_code == 200:
with open("plugins/NeteaseMusicExtra/FastTelethon.py", "wb") as f:
f.write(faster.content)
break
else:
if exists("plugins/NeteaseMusicExtra/NoFastTelethon.txt"):
break
try:
from NeteaseMusicExtra.FastTelethon import upload_file
file = await upload_file(context.client, open(name, 'rb'), name)
except:
file = name
if not exists("plugins/NeteaseMusicExtra/NoFastTelethon.txt"):
with open("plugins/NeteaseMusicExtra/NoFastTelethon.txt", "w") as f:
f.write("此文件出现表示FastTelethon支持文件在首次运行NeteaseMusic插件时导入失败\n这可能是因为Github"
"服务器暂时性的访问出错导致的\nFastTelethon可以提升低网络性能机型在上传文件时的效率,但是正常情况提升并不明显\n"
"如想要手动导入,可以手动下载:\nhttps://gist.githubusercontent.com/TNTcraftHIM"
"/ca2e6066ed5892f67947eb2289dd6439/raw"
"/86244b02c7824a3ca32ce01b2649f5d9badd2e49/FastTelethon.py\n并放入当前文件夹")
await bot.send_message(context.chat_id, '`FastTelethon`支持文件导入失败,上传速度可能受到影响\n'
'此提示仅出现**一次**,手动导入可参考:\n`' + getcwd() +
'/plugins/NeteaseMusicExtra/NoFastTelethon.txt`')
await context.client.send_file(
context.chat_id,
file,
caption=cap,
link_preview=False,
force_document=False,
thumb=imagedata,
attributes=(DocumentAttributeAudio(
duration, False, info['title'], performers),)
)
try:
if reply.sender.is_self:
await reply.delete()
except:
pass
try:
remove(name)
except:
pass
await context.delete()
status = True
break
else:
await context.edit("**未搜索到结果**")
sleep(3)
await context.delete()
status = True
break
else:
continue
if status is False:
await context.edit(apifailtext)
sleep(3)
await context.delete()
else: # 错误输入
await context.edit(helptext)
return