-
Notifications
You must be signed in to change notification settings - Fork 0
/
EQ_ykv.py
172 lines (120 loc) · 4.2 KB
/
EQ_ykv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from __future__ import print_function
import numpy as np
import pyspark as ps
import math
import os
import urllib
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
reload(sys)
sys.setdefaultencoding('utf-8')
spark = ps.sql.SparkSession.builder.appName('EQ_ykv').getOrCreate()
# Load data
sample = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load('gs://tester012018-192800/EQ/DataSample.csv')
poi = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load('gs://tester012018-192800/EQ/POIList.csv')
# Stage 0 - CLEANUP
# Removing spaces from names
sample = sample.withColumnRenamed(" TimeSt", "TimeSt")
poi = poi.withColumnRenamed(" Latitude", "Latitude")
# data size
print("data size: \n")
print("sample : ", sample.columns)
print((sample.count(), len(sample.columns)))
print("pois : ", poi.columns)
print((poi.count(), len(poi.columns)))
# Temp tables
sample.registerTempTable("sample0")
poi.registerTempTable("pois0")
# duplicate removal
query = """SELECT *
FROM sample0 A
WHERE _ID > (SELECT MIN(_ID) FROM sample0 B
WHERE A.TimeSt = B.TimeSt AND A.Latitude=B.Latitude AND A.Longitude=B.Longitude)"""
duplicates = spark.sql(query)
print("Duplicated data - time stamp and location : \n")
print(duplicates.count())
clean = sample.subtract(duplicates)
# Temp table
clean.registerTempTable("clean0")
# Removing mislabeled data out of canadian boundaries
query = """ SELECT *
FROM clean0
WHERE Latitude > 40 AND Longitude > -130 AND Longitude < -60
"""
clean2 = spark.sql(query)
print("Cleaned records: \n",clean2.count())
# Temp table
clean2.registerTempTable("clean2")
# 2 out of four POIs where the same / Location convertion to radians
poi = poi.withColumn("lat_rad", poi.Latitude* math.pi / 180)
poi = poi.withColumn("lon_rad", poi.Longitude* math.pi / 180)
# Saage 1 - minimum distance and POI Assignation
# Distance calculation and location conversion to radians
lat0 = F.toRadians("Latitude").alias("lat0")
lon0 = F.toRadians("Longitude").alias("lon0")
lat1 = 0.9345569159727344
lon1 = -1.9806997123424743
lat2 = 0.7945023069213337
lon2 = -1.2839693364011688
lat3 = 0.7893221871547071
lon3 = -1.1036193160713015
dlat1 = lat1 - lat0
dlon1 = lon1 - lon0
dlat2 = lat2 - lat0
dlon2 = lon2 - lon0
dlat3 = lat3 - lat0
dlon3 = lon3 - lon0
a1 = F.sin(dlat1/2)**2 + F.cos(lat0) * F.cos(lat0) * F.sin(dlon1/2)**2
a2 = F.sin(dlat2/2)**2 + F.cos(lat0) * F.cos(lat0) * F.sin(dlon2/2)**2
a3 = F.sin(dlat3/2)**2 + F.cos(lat0) * F.cos(lat0) * F.sin(dlon3/2)**2
c1 = F.lit(2) * F.asin(F.sqrt(a1))
c2 = F.lit(2) * F.asin(F.sqrt(a2))
c3 = F.lit(2) * F.asin(F.sqrt(a3))
r = F.lit(6371)
dist1 = (c1 * r).alias('dist1')
dist2 = (c2 * r).alias('dist2')
dist3 = (c3 * r).alias('dist3')
distances = clean2.select("_ID", "TimeSt", "City", "Province","Latitude", "Longitude", dist1, dist2, dist3)
distances.registerTempTable("dist0")
# POI assignation and minimal distance to poi
query = """SELECT _ID, TimeSt, City, Province, dist1, dist2, dist3,
CASE WHEN (dist1 < dist2) AND (dist1 < dist3) THEN "POI1 - EDMONTON"
WHEN (dist2 < dist1) AND (dist2 < dist3) THEN "POI2 - MONTREAL"
ELSE "POI3 - NOVA SCOTIA"
END AS POI
FROM dist0
"""
distPOI = spark.sql(query)
distPOI.registerTempTable("distPOI0")
query = """SELECT _ID, TimeSt, City, POI,
CASE WHEN (dist1 < dist2) AND (dist1 < dist3) THEN dist1
WHEN (dist2 < dist1) AND (dist2 < dist3) THEN dist2
ELSE dist3
END AS minDist
FROM distPOI0
"""
distPOI2 = spark.sql(query)
distPOI2.registerTempTable("distPOI2")
distPOI2.show()
# Stage 2 Analysis
# grouping data by POI
by_POI = distPOI2.groupBy("POI")
by_POI.avg("minDist").show()
by_POI.agg(F.stddev("minDist")).show()
by_POI.min("minDist").show()
by_POI.max("minDist").show()
by_POI.agg(F.skewness("minDist")).show()
by_POI.agg(F.kurtosis("minDist")).show()
query = """SELECT COUNT(_ID) Requests, POI, AVG(minDist) AS Mean, percentile_approx(minDist, 0.5) AS Median,
MAX(minDist) AS poiRadius_km, COUNT(_ID)/(3.14159*POWER(MAX(minDist),2)) AS Density_Requests_by_km2
FROM distPOI2
GROUP BY POI
"""
spark.sql(query).show()