-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
116 lines (99 loc) · 3.88 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio, os
from config import *
from decouple import RepositoryEnv
from aiohttp import ClientSession
from aiohttp.client_exceptions import ContentTypeError
from create import createVideo
from instagrapi import Client
from datetime import datetime
from instagrapi.types import StoryLink
iclient = Client()
if os.path.exists(SETTINGS_PATH):
iclient.load_settings(SETTINGS_PATH)
else:
iclient.login(IG_USERNAME, IG_PASSWORD)
iclient.dump_settings(SETTINGS_PATH)
def publishToInstagram(path, track_url):
iclient.video_upload_to_story(
path,
links=[
StoryLink(webUri=track_url),
StoryLink(webUri="https://github.com/New-dev0/SpotifyIG",
x=0.3, y=0.3, width=0.3, height=0.3),
],
)
dailyCache = {}
async def main():
data = RepositoryEnv(".env").data
access_token = data.get("ACCESS_TOKEN")
refresh_token = data.get("REFRESH_TOKEN")
sleepTime = 15
currentlyPlaying = None
oauth = {"Authorization": f"Bearer {access_token}"}
while True:
async with ClientSession() as session:
response = await session.get(
"https://api.spotify.com/v1/me/player/currently-playing", headers=oauth
)
status_code = response.status
if status_code == 401:
data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
res = await session.post(
"https://accounts.spotify.com/api/token", data=data
)
response = await res.json()
data = {
"ACCESS_TOKEN": response["access_token"],
}
if response.get("refresh_token"):
data["REFRESH_TOKEN"] = response["refresh_token"]
updateConfig(data)
return await main()
try:
videoInfo = await response.json()
if videoInfo.get("item"):
uri = videoInfo["item"]["uri"]
trackUrl = videoInfo["item"]["external_urls"]["spotify"]
if currentlyPlaying and currentlyPlaying == uri:
print("Already playing!")
await asyncio.sleep(sleepTime)
continue
if dailyCache.get(uri) and (
(datetime.now() - dailyCache[uri]).days < 1
):
print("Already in cache!")
await asyncio.sleep(sleepTime)
continue
if (
videoInfo.get("is_playing")
and videoInfo.get("progress_ms", 0) > 10000
) and videoInfo.get("item").get("preview_url"):
sleepTime = 15
currentlyPlaying = uri
videoPath, __ = await createVideo(videoInfo)
dailyCache[uri] = datetime.now()
try:
await publishToInstagram(videoPath, trackUrl)
except Exception as er:
print(er)
os.remove(videoPath)
thumbPath = f"{videoPath}.jpg"
if os.path.exists(thumbPath):
os.remove(thumbPath)
else:
sleepTime = 15
else:
sleepTime = 15
except ContentTypeError:
# Not Playing state
sleepTime = 15
except Exception as er:
print(er)
await asyncio.sleep(sleepTime)
print("Started Script!")
asyncio.run(main())