-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
155 lines (112 loc) · 4.09 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
import time
import requests
import os
import gzip
from .config import config_dir
from sqlalchemy import (create_engine, Column, Integer, String, Table, Text,
ForeignKey)
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
import logging
logger = logging.getLogger(__name__)
CRC_URL = 'http://he3.magnatune.com/info/changed.txt'
DB_URL = 'http://he3.magnatune.com/info/sqlite_normalized.db.gz'
CRC_FILE = os.path.join(config_dir, 'changed.txt')
DB_FILE = os.path.join(config_dir, 'sqlite_normalized.db')
def download():
"""
Downloads the last version of the api file from the server.
"""
data = requests.get(DB_URL, stream=True).raw
with gzip.GzipFile(fileobj=data) as gzfile, open(DB_FILE, 'wb') as f:
f.write(gzfile.read())
def update_if_needed():
"""
Checks if an update of the api file if needed, and download it if needed.
"""
if os.path.exists(CRC_FILE):
updated = os.stat(CRC_FILE).st_mtime
if time.time() - updated < 60 * 60 * 24: # 24 hours
logger.debug(
'Database file updated less than 24 hours, not updating.')
return
with open(CRC_FILE, 'rb') as f:
crc = f.read()
else:
crc = None
logger.info('Updating CRC file')
new_crc = requests.get(CRC_URL).content
if crc == new_crc:
logger.debug('Database file up-to-date.')
return
logger.info('Updating database file.')
download()
with open(CRC_FILE, 'wb') as f:
f.write(new_crc)
def get_session():
"""Returns a session to access the database, updating it before if
needed"""
update_if_needed()
engine = create_engine('sqlite:///' + DB_FILE)
Session = sessionmaker(bind=engine)
return Session()
Base = declarative_base()
metadata = Base.metadata
t_genres_albums = Table(
'genres_albums', metadata,
Column('genre_id', Integer, ForeignKey('genres.genre_id'), index=True),
Column('album_id', Integer, ForeignKey('albums.album_id'), index=True)
)
t_playlist_songs = Table(
'playlist_songs', metadata,
Column('playlist_id', Integer, ForeignKey('playlists.playlist_id'),
index=True),
Column('sort_order', Integer),
Column('song_id', Integer, ForeignKey('songs.song_id'))
)
class Album(Base):
__tablename__ = 'albums'
album_id = Column(Integer, primary_key=True)
artist_id = Column(Integer, ForeignKey('artists.artists_id'), index=True)
name = Column(Text, index=True)
description = Column(Text)
sku = Column(Text)
upc = Column(String)
release_date = Column(Integer)
popularity = Column(Integer)
itunes_buy_url = Column(Text)
artist = relationship('Artist', backref=backref('albums',
order_by=release_date))
_genres = relationship('Genre', secondary=t_genres_albums, backref='songs')
genres = association_proxy('_genres', 'name')
class Artist(Base):
__tablename__ = 'artists'
artists_id = Column(Integer, primary_key=True)
name = Column(Text, index=True)
description = Column(Text)
homepage = Column(Text)
bio = Column(Text)
photo = Column(Text)
class Genre(Base):
__tablename__ = 'genres'
genre_id = Column(Integer, primary_key=True)
name = Column(Text)
class Playlist(Base):
__tablename__ = 'playlists'
playlist_id = Column(Integer, primary_key=True)
sort_order = Column(Integer)
owner_id = Column(Integer)
name = Column(Text)
songs = relationship('Song', secondary=t_playlist_songs,
order_by=t_playlist_songs.c.sort_order)
class Song(Base):
__tablename__ = 'songs'
song_id = Column(Integer, primary_key=True)
album_id = Column(Integer, ForeignKey('albums.album_id'), index=True)
name = Column(Text)
track_no = Column(Integer)
duration = Column(Integer)
mp3 = Column(Text)
album = relationship(Album, backref=backref('songs', order_by=track_no))