This repository has been archived by the owner on Feb 7, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
rarbg.py
339 lines (307 loc) · 12.8 KB
/
rarbg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import time
from pytesseract import pytesseract
import cv2
import numpy as np
from typing import Optional
from dateutil.parser import parse
import torrent_parser as tp
import unicodedata
import random
from expiringdict import ExpiringDict
class IPBanException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return self.message
def convert_size(size) -> float:
if "kb" in size.lower():
return float(size.split(" ")[0]) * 1000
elif "mb" in size.lower():
return float(size.split(" ")[0]) * 1000000
elif "gb" in size.lower():
return float(size.split(" ")[0]) * 1000000000
else:
return float(0)
class Rarbg:
def __init__(self,
max_retries: int = 25,
proxies: Optional[str] = None
):
self.ipbans = ExpiringDict(max_len=10000, max_age_seconds=60 * 60 * 2)
self.url: str = "https://rarbgto.org"
self.headers: dict = {"sec-ch-ua": '"Google Chrome";v="107", "Chromium";v="107", "Not=A?Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "Windows",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,"
"image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Dest": "document",
}
self.retries = max_retries
self.s = requests.Session()
self.s.headers.update(self.headers)
self.proxies = []
self.delay = 0.1
if proxies:
print("Loading proxies from {}".format(proxies))
with open(proxies, "r") as f:
for line in f.readlines():
data = line.rstrip().split(":")
self.proxies.append(f"{data[2]}:{data[3]}@{data[0]}:{data[1]}")
self.next_proxy()
def tvdb_to_imdb(self, tvdb: str):
url = "http://www.thetvdb.com/?tab=series&id=" + tvdb
resp = self.s.get(url)
# <span><a href="https://www.imdb.com/title/tt8111088/">IMDB</a></span>
for line in resp.text.splitlines():
if "imdb" in line:
return line.split('href="')[1].split('">')[0].replace("https://www.imdb.com/title/", "")\
.replace("/", "")
def themoviedb_to_imdb(self, tmdb: str):
url = f"https://api.themoviedb.org/3/movie/{tmdb}" \
"?api_key=8d6d91941230817f7807d643736e8a49&append_to_response=external_ids"
# random guy's api key from stackoverflow lol
resp = self.s.get(url).json()
return resp["external_ids"]["imdb_id"]
def next_proxy(self):
if self.proxies:
random.shuffle(self.proxies)
for proxy in self.proxies:
if proxy.split("@")[1] not in self.ipbans:
self.s.proxies = {
'http': 'http://' + proxy,
'https': 'http://' + proxy,
}
return
exit("No proxies left, all ips banned")
def ban_proxy(self):
self.ipbans[self.s.proxies["https"].split("@")[1]] = True
def renew_session(self, next_proxy=False):
"""
Renew session
"""
print("Renewing session")
if next_proxy:
self.ban_proxy()
self.s = requests.Session()
self.s.headers.update(self.headers)
self.next_proxy()
date = datetime.now() + timedelta(days=7) # Wed, 30 Nov 2022 20:38:54 GMT
date = date.strftime("%a, %d %b %Y %H:%M:%S GMT")
r1 = self.s.get('https://rarbgto.org/threat_defence.php')
soup1 = BeautifulSoup(r1.text, 'html.parser')
scripts = soup1.findAll("script")
data: str = ""
for script in scripts:
script = script.text
if "value_sk" in script:
data = script
value_sk = data.split("value_sk = '")[1].split("'")[0]
value_c = data.split("value_c = '")[1].split("'")[0]
value_i = data.split("value_i = '")[1].split("'")[0]
value_r_1 = data.split("value_i+'&r=")[1].split("'")[0]
value_r_2 = data.split("""&ref_cookie="+ref_cookie+"&r=""")[1].split('"')[0]
cookies = {"sk": ";expires=" + date + ";path=/;domain=.rarbgto.org"}
self.s.cookies.update(cookies)
self.s.get(self.url + f"/threat_defence_ajax.php?sk={value_sk}&cid={value_c}&i={value_i}&r={value_r_1}",
headers={'Content-type': 'text/plain'})
time.sleep(4)
try:
r2 = self.s.get(
self.url + "/threat_defence.php?defence=2&sk=" + value_sk + "&cid=" + value_c + "&i=" + value_i +
"&ref_cookie=rarbgto.org&r=" + value_r_2)
except requests.exceptions.ProxyError:
self.renew_session(next_proxy=True)
return
soup2 = BeautifulSoup(r2.text, 'html.parser')
captcha_id = soup2.find('input', {'name': 'captcha_id'})["value"]
captcha_r = soup2.find('input', {'name': 'r'})["value"]
captcha_img = ""
imgs = soup2.findAll('img')
for img in imgs:
if "captcha" in img["src"]:
captcha_img = self.url + img["src"]
break
r3 = self.s.get(captcha_img)
arr = np.asarray(bytearray(r3.content), dtype=np.uint8)
# Providing the tesseract executable
# location to pytesseract library
pytesseract.tesseract_cmd = "/usr/bin/tesseract"
# Passing the image object to image_to_string() function
# This function will extract the text from the image
image = cv2.imdecode(arr, -1)
results = pytesseract.image_to_string(image).rstrip()
self.s.get(
self.url + "/threat_defence.php?defence=2&sk=" + value_sk + "&cid=" + value_c + "&i=" + value_i +
"&ref_cookie=rarbgto.org&r=" + captcha_r + "&solve_string=" + results + "&captcha_id=" +
captcha_id + "&submitted_bot_captcha=1")
def get(self, *args, **kwargs) -> str:
return self.get_resp(*args, **kwargs).text
def get_resp(self, url: str, params=None, attempts: int = 0) -> requests.Response:
if params is None:
params = {}
if attempts == self.retries:
raise LookupError("Maximum Retries Reached")
resp = self.s.get(self.url + url, params=params)
# print(resp.status_code)
if "Please wait while we try to verify your browser..." in resp.text:
self.renew_session()
return self.get_resp(url, params, attempts + 1)
elif "pure flooding so you are limited to downloading only using magnets" in resp.text \
or "We have too many requests from your ip in the past 24h." in resp.text:
self.renew_session(next_proxy=True)
return self.get_resp(url, params, attempts + 1)
else:
return resp
def get_content(self, *args, **kwargs) -> bytes:
return self.get_resp(*args, **kwargs).content
def search(self, search: str, categories=None, page: int = 1) -> list["Torrent"]:
"""
Search torrents on RARBG
Returns list of Torrent objects
"""
if categories is None:
categories = []
torrents: list[Torrent] = []
params = {"search": search, "category[]": categories, "page": page}
data = self.get("/torrents.php", params=params)
soup = BeautifulSoup(data, "html.parser")
rows = soup.findAll('tr', {'class': "lista2"})
for row in rows:
items = row.findAll('td')
torrents.append(Torrent(
indexer=self,
name=items[1].find('a').text,
size=convert_size(items[3].text),
seeders=int(items[4].text),
leechers=int(items[5].text),
date=parse(items[2].text),
_id=items[1].find('a')['href'].split("/")[-1]
))
time.sleep(self.delay)
return torrents
def search_all(self, search: str, categories=None, before: Optional[datetime] = None,
after: Optional[datetime] = None, limit: int = 9999999) -> list["Torrent"]:
"""
Search all pages for torrents
"""
if categories is None:
categories = []
if not before:
before = datetime.now()
if not after:
after = datetime(1970, 1, 1)
torrents: list[Torrent] = []
page = 1
while True:
results = self.search(search, categories, page)
if len(results) == 0:
break
for torrent in results:
if len(torrents) >= limit:
return torrents
if torrent.date > before:
continue
if torrent.date < after:
return torrents
torrents.append(torrent)
if len(results) < 25:
break
page += 1
return torrents
class Torrent:
def __init__(self,
indexer: "Rarbg",
name: str,
size: float,
seeders: int,
leechers: int,
date: datetime,
_id: str):
self.indexer = indexer
self.name: str = name
self.size: float = size # Size in bytes
self.seeders: int = seeders
self.leechers: int = leechers
self.date: datetime = date
self.id: str = _id
self.page = None
def __getattr__(self, name):
if name == "magnet":
self.magnet = self.get_magnet()
return self.magnet
elif name == "torrent":
self.torrent = self.get_torrent_file()
return self.torrent
elif name == "data":
self.data = tp.BDecoder(self.torrent).hash_field('pieces').decode()
return self.data
elif name == "files":
self.files = self.get_files()
return self.files
else:
try:
return self[name]
except KeyError:
raise AttributeError(f"'Torrent' object has no attribute '{name}'")
def get_magnet(self) -> str:
"""
Get magnet link for torrent
"""
if not self.page:
self.page = self.indexer.get("/torrent/" + self.id)
soup = BeautifulSoup(self.page, "html.parser")
magnet = soup.select_one("a[href^=magnet]")
return magnet["href"]
def get_torrent_url(self, full: bool = True) -> str:
"""
Get torrent url for torrent
"""
if not self.page:
self.page = self.indexer.get("/torrent/" + self.id)
soup = BeautifulSoup(self.page, "html.parser")
torrent_url = soup.select_one('a[href^="/download.php"]')
if full:
return self.indexer.url + torrent_url["href"]
else:
return torrent_url["href"]
def get_torrent_file(self) -> bytes:
"""
Get torrent
"""
return self.indexer.get_content(self.get_torrent_url(full=False))
def get_files(self) -> list[dict]:
"""
Get files in torrent
"""
if not self.page:
self.page = self.indexer.get("/torrent/" + self.id)
soup = BeautifulSoup(self.page, "html.parser")
rows = soup.select("div#files table.lista tr")
files = []
for row in rows:
items = row.findAll('td')
if "File Name" in items[0]:
continue
files.append({
"path": unicodedata.normalize("NFKD", items[0].text).strip().split("/"),
"size": convert_size(items[1].text.strip())
})
if len(files) == 0:
try:
files = [{"path": x["path"], "size": x["length"]} for x in self.data["info"]["files"]]
except KeyError:
files = [{"path": [self.data["info"]["name"]], "size": self.data["info"]["length"]}]
return sorted(files, key=lambda d: d['size'], reverse=True)
def __getitem__(self, key):
return self.data[key]
def __str__(self):
return f"Name: {self.name} Size: {self.size} Seeders: {self.seeders} Leechers: {self.leechers} " \
f": {self.date} ID: {self.id}"