-
Notifications
You must be signed in to change notification settings - Fork 5
/
spark_rmse.py
67 lines (51 loc) · 2.43 KB
/
spark_rmse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# -*- coding: utf-8 -*-
"""spark_RMSE.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1fw5VrLGcJCbuGs2qvL1zl81BKApNJoyr
"""
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
# SparkSession 생성
spark = SparkSession.builder.appName("NBA_Game_Prediction").getOrCreate()
# 파일 경로 리스트 생성
file_paths = [
"hdfs:///user/maria_dev/Spark/nba_1819.csv",
"hdfs:///user/maria_dev/Spark/nba_1920.csv",
"hdfs:///user/maria_dev/Spark/nba_2021.csv",
"hdfs:///user/maria_dev/Spark/nba_2122.csv",
"hdfs:///user/maria_dev/Spark/nba_2223.csv",
"hdfs:///user/maria_dev/Spark/nba_2324.csv"
]
# 각 파일을 읽어와 DataFrame으로 변환
data_frames = [spark.read.csv(file_path, header=True, inferSchema=True) for file_path in file_paths]
# DataFrame들을 합치기
combined_data = data_frames[0]
for df in data_frames[1:]:
combined_data = combined_data.unionByName(df)
# 'Date', 'Home', 'Away', 'weekday', 'overtime', 'remarks' 열 제외 선택
combined_data = combined_data.select([col for col in combined_data.columns if col not in ['Date', 'Home', 'Away', 'weekday', 'overtime', 'remarks']])
# 문자열 열을 숫자형으로 변환하기 위해 StringIndexer 사용
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(combined_data) for column in ["Home", "Away"]]
for indexer in indexers:
combined_data = indexer.transform(combined_data)
# 특성 벡터화
assembler = VectorAssembler(inputCols=combined_data.columns[1:], outputCol="features")
combined_data = assembler.transform(combined_data)
# 회귀 모델 초기화
rf = RandomForestRegressor(featuresCol="features", labelCol="Win_Margin")
# 학습 데이터와 테스트 데이터 분리
(train_data, test_data) = combined_data.randomSplit([0.8, 0.2], seed=42)
# 모델 학습 및 예측 파이프라인 설정
pipeline = Pipeline(stages=[rf])
# 모델 학습
model = pipeline.fit(train_data)
# 테스트 데이터로 예측
predictions = model.transform(test_data)
# 모델 평가 - RMSE 측정
evaluator = RegressionEvaluator(labelCol="Win_Margin", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)