-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathread_load_data.py
107 lines (89 loc) · 3.01 KB
/
read_load_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from modules.countries import Country_Dict
from modules.genres import Genre_Dict
from modules.schema import Country, Genre, Movie, Director, Actor, Writer
from modules.base import Session, engine, Base
import pandas as pd
from tqdm import tqdm
import os
import json
from glob import glob
import numpy as np
from multiprocessing import cpu_count
from multiprocessing import Pool
# 2 - generate database schema
Base.metadata.create_all(engine)
# 3 - create a new session
session = Session()
num_cores = cpu_count() - 1
def read_json(json_files):
json_list = []
for file in tqdm(json_files, desc='Creating DataFrame'):
with open (file) as f:
exp = json.load(f)
json_list.append(exp)
df = pd.DataFrame(json_list)
return df
def create_dataframe(filepath):
json_pattern = os.path.join(filepath, '*.json')
file_list = glob(json_pattern)
json_files = np.array_split(file_list, num_cores)
with Pool() as pool:
data = pd.concat(pool.map(read_json, json_files))
return data
def create_catalog():
for key, value in Country_Dict.items():
country = Country(value, key)
session.add(country)
for key, value in Genre_Dict.items():
genre = Genre(value, key)
session.add(genre)
session.commit()
def load_data(df):
for row in tqdm (df.itertuples(), desc='Loading Data'):
country = None
if row.country in Country_Dict:
country = Country_Dict[row.country]
movie = Movie(row.imdb_id, row.title, country)
if row.genres is not None:
for val in row.genres:
if val in Genre_Dict:
genre = Genre(Genre_Dict[val], val)
movie.genres.append(genre)
if row.director != [None] and row.director != None:
done = set()
w = []
for d in row.director:
if d['id'] not in done:
done.add(d['id'])
w.append(d)
for data in w or []:
director = Director(data['id'], data['name'])
session.merge(director)
session.flush()
movie.directors.append(director)
if row.cast:
for data in row.cast:
actor = Actor(data['id'], data['name'])
session.merge(actor)
session.flush()
movie.actors.append(actor)
if row.writer:
done = set()
w = []
for d in row.writer:
if d['id'] not in done:
done.add(d['id'])
w.append(d)
for data in w:
writer = Writer(data['id'], data['name'])
session.merge(writer)
session.flush()
movie.writers.append(writer)
session.merge(movie)
session.flush()
session.commit()
if __name__ == "__main__":
df = create_dataframe('./datasets/')
create_catalog()
load_data(df)
session.close()