-
Notifications
You must be signed in to change notification settings - Fork 0
/
opening_to_middle.py
93 lines (79 loc) · 3.14 KB
/
opening_to_middle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import pyspark.sql.functions as F
from spark_session_builder import create_spark_session
spark = create_spark_session()
SOURCE_TABLE = "s3a://chess-data-lake-opening/app/games/"
df = spark.read.option("mode", "PERMISSIVE").json(SOURCE_TABLE)
# df.printSchema()
# root
# |-- black: string (nullable = true)
# |-- black_elo: long (nullable = true)
# |-- black_rating_diff: long (nullable = true)
# |-- eco: string (nullable = true)
# |-- event: string (nullable = true)
# |-- moves: string (nullable = true)
# |-- opening: string (nullable = true)
# |-- result: string (nullable = true)
# |-- site: string (nullable = true)
# |-- termination: string (nullable = true)
# |-- time_control: string (nullable = true)
# |-- utc_date: string (nullable = true)
# |-- utc_time: string (nullable = true)
# |-- white: string (nullable = true)
# |-- white_elo: long (nullable = true)
# |-- white_rating_diff: long (nullable = true)
# |-- extracted_at: date (nullable = true)
df = (
df.withColumn("white_elo", F.col("white_elo").cast("int"))
.withColumn("black_elo", F.col("black_elo").cast("int"))
.withColumn("white_rating_diff", F.col("white_rating_diff").cast("int"))
.withColumn("black_rating_diff", F.col("black_rating_diff").cast("int"))
)
# Transform moves string like "1. e4 c5" to an array like ["e4", "c5"]
df = df.withColumn("moves", F.expr(r"regexp_extract_all(moves, '[A-Za-z]+[0-9]+', 0)"))
df = (
df.withColumn(
"utc_datetime", F.concat(F.col("utc_date"), F.lit(" "), F.col("utc_time"))
)
.withColumn(
"utc_datetime", F.to_timestamp(F.col("utc_datetime"), "yyyy.MM.dd HH:mm:ss")
)
.withColumn(
"utc_month", F.to_date(F.date_format(F.col("utc_datetime"), "yyyy-MM-01"))
)
.drop("utc_date")
.drop("utc_time")
)
df = df.withColumn(
"result",
F.when(F.col("result") == "1-0", "white")
.when(F.col("result") == "0-1", "black")
.otherwise(F.lit("draw")),
)
# Extract the time format of the game, the values can be bullet, blitz or classical
df = df.withColumn("time_format", F.lower(F.split("event", r" ")[1]))
df = df.withColumn(
"utc_month", F.to_date(F.date_format(F.col("utc_datetime"), "yyyy-MM-01"))
)
# df.printSchema()
# root
# |-- black: string (nullable = true)
# |-- black_elo: integer (nullable = true)
# |-- black_rating_diff: integer (nullable = true)
# |-- eco: string (nullable = true)
# |-- event: string (nullable = true)
# |-- moves: array (nullable = true)
# | |-- element: string (containsNull = true)
# |-- opening: string (nullable = true)
# |-- result: string (nullable = false)
# |-- site: string (nullable = true)
# |-- termination: string (nullable = true)
# |-- time_control: string (nullable = true)
# |-- white: string (nullable = true)
# |-- white_elo: integer (nullable = true)
# |-- white_rating_diff: integer (nullable = true)
# |-- extracted_at: date (nullable = true)
# |-- utc_datetime: timestamp (nullable = true)
# |-- time_format: string (nullable = true)
# |-- utc_month: date (nullable = true)
DESTINATION_TABLE = "s3a://chess-data-lake-middle/app/games/"
df.write.partitionBy("utc_month").mode("overwrite").parquet(DESTINATION_TABLE)