forked from x4nth055/pythoncode-tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
credit_card_detection.py
303 lines (256 loc) · 11.2 KB
/
credit_card_detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# %%
# Importing modules
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import gridspec
# %%
#read the dataset
dataset = pd.read_csv("creditcard.csv")
# read the first 5 and last 5 rows of the data
dataset.head().append(dataset.tail())
# %%
# check for relative proportion
print("Fraudulent Cases: " + str(len(dataset[dataset["Class"] == 1])))
print("Valid Transactions: " + str(len(dataset[dataset["Class"] == 0])))
print("Proportion of Fraudulent Cases: " + str(len(dataset[dataset["Class"] == 1])/ dataset.shape[0]))
# To see how small are the number of Fraud transactions
data_p = dataset.copy()
data_p[" "] = np.where(data_p["Class"] == 1 , "Fraud", "Genuine")
# plot a pie chart
data_p[" "].value_counts().plot(kind="pie")
# %%
# plot the named features
f, axes = plt.subplots(1, 2, figsize=(18,4), sharex = True)
amount_value = dataset['Amount'].values # values
time_value = dataset['Time'].values # values
sns.distplot(amount_value, hist=False, color="m", kde_kws={"shade": True}, ax=axes[0]).set_title('Distribution of Amount')
sns.distplot(time_value, hist=False, color="m", kde_kws={"shade": True}, ax=axes[1]).set_title('Distribution of Time')
plt.show()
# %%
print("Average Amount in a Fraudulent Transaction: " + str(dataset[dataset["Class"] == 1]["Amount"].mean()))
print("Average Amount in a Valid Transaction: " + str(dataset[dataset["Class"] == 0]["Amount"].mean()))
# %%
print("Summary of the feature - Amount" + "\n-------------------------------")
print(dataset["Amount"].describe())
# %%
# Reorder the columns Amount, Time then the rest
data_plot = dataset.copy()
amount = data_plot['Amount']
data_plot.drop(labels=['Amount'], axis=1, inplace = True)
data_plot.insert(0, 'Amount', amount)
# Plot the distributions of the features
columns = data_plot.iloc[:,0:30].columns
plt.figure(figsize=(12,30*4))
grids = gridspec.GridSpec(30, 1)
for grid, index in enumerate(data_plot[columns]):
ax = plt.subplot(grids[grid])
sns.distplot(data_plot[index][data_plot.Class == 1], hist=False, kde_kws={"shade": True}, bins=50)
sns.distplot(data_plot[index][data_plot.Class == 0], hist=False, kde_kws={"shade": True}, bins=50)
ax.set_xlabel("")
ax.set_title("Distribution of Column: " + str(index))
plt.show()
# %%
# check for null values
dataset.isnull().shape[0]
print("Non-missing values: " + str(dataset.isnull().shape[0]))
print("Missing values: " + str(dataset.shape[0] - dataset.isnull().shape[0]))
# %%
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler().fit(dataset[["Time", "Amount"]])
dataset[["Time", "Amount"]] = scaler.transform(dataset[["Time", "Amount"]])
dataset.head().append(dataset.tail())
# %%
# Separate response and features Undersampling before cross validation will lead to overfiting
y = dataset["Class"] # target
X = dataset.iloc[:,0:30]
# Use SKLEARN for the split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size = 0.2, random_state = 42)
X_train.shape, X_test.shape, y_train.shape, y_test.shape
# %%
# Create the cross validation framework
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV, cross_val_score, RandomizedSearchCV
kf = StratifiedKFold(n_splits=5, random_state = None, shuffle = False)
# %%
# Import the imbalance Learn module
from imblearn.pipeline import make_pipeline ## Create a Pipeline using the provided estimators .
from imblearn.under_sampling import NearMiss ## perform Under-sampling based on NearMiss methods.
from imblearn.over_sampling import SMOTE ## PerformOver-sampling class that uses SMOTE.
# import the metrics
from sklearn.metrics import roc_curve, roc_auc_score, accuracy_score, recall_score, precision_score, f1_score
# Import the classifiers
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
# %%
# Using SKLEARN module for random forest
from sklearn.ensemble import RandomForestClassifier
# Fit and predict
rfc = RandomForestClassifier()
rfc.fit(X_train, y_train)
y_pred = rfc.predict(X_test)
# For the performance let's use some metrics from SKLEARN module
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
print("The accuracy is", accuracy_score(y_test, y_pred))
print("The precision is", precision_score(y_test, y_pred))
print("The recall is", recall_score(y_test, y_pred))
print("The F1 score is", f1_score(y_test, y_pred))
# %%
def get_model_best_estimator_and_metrics(estimator, params, kf=kf, X_train=X_train,
y_train=y_train, X_test=X_test,
y_test=y_test, is_grid_search=True,
sampling=NearMiss(), scoring="f1",
n_jobs=2):
if sampling is None:
# make the pipeline of only the estimator, just so the remaining code will work fine
pipeline = make_pipeline(estimator)
else:
# make the pipeline of over/undersampling and estimator
pipeline = make_pipeline(sampling, estimator)
# get the estimator name
estimator_name = estimator.__class__.__name__.lower()
# construct the parameters for grid/random search cv
new_params = {f'{estimator_name}__{key}': params[key] for key in params}
if is_grid_search:
# grid search instead of randomized search
search = GridSearchCV(pipeline, param_grid=new_params, cv=kf, scoring=scoring, return_train_score=True, n_jobs=n_jobs, verbose=2)
else:
# randomized search
search = RandomizedSearchCV(pipeline, param_distributions=new_params,
cv=kf, scoring=scoring, return_train_score=True,
n_jobs=n_jobs, verbose=1)
# fit the model
search.fit(X_train, y_train)
cv_score = cross_val_score(search, X_train, y_train, scoring=scoring, cv=kf)
# make predictions on the test data
y_pred = search.best_estimator_.named_steps[estimator_name].predict(X_test)
# calculate the metrics: recall, accuracy, F1 score, etc.
recall = recall_score(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
y_proba = search.best_estimator_.named_steps[estimator_name].predict_proba(X_test)[::, 1]
fpr, tpr, _ = roc_curve(y_test, y_proba)
auc = roc_auc_score(y_test, y_proba)
# return the best estimator along with the metrics
return {
"best_estimator": search.best_estimator_,
"estimator_name": estimator_name,
"cv_score": cv_score,
"recall": recall,
"accuracy": accuracy,
"f1_score": f1,
"fpr": fpr,
"tpr": tpr,
"auc": auc,
}
# %%
# Cumulatively create a table for the ROC curve
## Create the dataframe
res_table = pd.DataFrame(columns=['classifiers', 'fpr','tpr','auc'])
rfc_results = get_model_best_estimator_and_metrics(
estimator=RandomForestClassifier(),
params={
'n_estimators': [50, 100, 200],
'max_depth': [4, 6, 10, 12],
'random_state': [13]
},
sampling=None,
n_jobs=3,
)
res_table = res_table.append({'classifiers': rfc_results["estimator_name"],
'fpr': rfc_results["fpr"],
'tpr': rfc_results["tpr"],
'auc': rfc_results["auc"]
}, ignore_index=True)
# %%
print(f"==={rfc_results['estimator_name']}===")
print("Model:", rfc_results['best_estimator'])
print("Accuracy:", rfc_results['accuracy'])
print("Recall:", rfc_results['recall'])
print("F1 Score:", rfc_results['f1_score'])
# %%
logreg_us_results = get_model_best_estimator_and_metrics(
estimator=LogisticRegression(),
params={"penalty": ['l1', 'l2'],
'C': [ 0.01, 0.1, 1, 100],
'solver' : ['liblinear']},
sampling=NearMiss(),
n_jobs=3,
)
print(f"==={logreg_us_results['estimator_name']}===")
print("Model:", logreg_us_results['best_estimator'])
print("Accuracy:", logreg_us_results['accuracy'])
print("Recall:", logreg_us_results['recall'])
print("F1 Score:", logreg_us_results['f1_score'])
res_table = res_table.append({'classifiers': logreg_us_results["estimator_name"],
'fpr': logreg_us_results["fpr"],
'tpr': logreg_us_results["tpr"],
'auc': logreg_us_results["auc"]
}, ignore_index=True)
res_table
# %%
# Plot the ROC curve for undersampling
res_table.set_index('classifiers', inplace=True)
fig = plt.figure(figsize=(17,7))
for j in res_table.index:
plt.plot(res_table.loc[j]['fpr'],
res_table.loc[j]['tpr'],
label="{}, AUC={:.3f}".format(j, res_table.loc[j]['auc']))
plt.plot([0,1], [0,1], color='orange', linestyle='--')
plt.xticks(np.arange(0.0, 1.1, step=0.1))
plt.xlabel("Positive Rate(False)", fontsize=15)
plt.yticks(np.arange(0.0, 1.1, step=0.1))
plt.ylabel("Positive Rate(True)", fontsize=15)
plt.title('Analysis for Oversampling', fontweight='bold', fontsize=15)
plt.legend(prop={'size':13}, loc='lower right')
plt.show()
# %%
# Cumulatively create a table for the ROC curve
res_table = pd.DataFrame(columns=['classifiers', 'fpr','tpr','auc'])
lin_reg_os_results = get_model_best_estimator_and_metrics(
estimator=LogisticRegression(),
params={"penalty": ['l1', 'l2'], 'C': [ 0.01, 0.1, 1, 100, 100],
'solver' : ['liblinear']},
sampling=SMOTE(random_state=42),
scoring="f1",
is_grid_search=False,
n_jobs=2,
)
print(f"==={lin_reg_os_results['estimator_name']}===")
print("Model:", lin_reg_os_results['best_estimator'])
print("Accuracy:", lin_reg_os_results['accuracy'])
print("Recall:", lin_reg_os_results['recall'])
print("F1 Score:", lin_reg_os_results['f1_score'])
res_table = res_table.append({'classifiers': lin_reg_os_results["estimator_name"],
'fpr': lin_reg_os_results["fpr"],
'tpr': lin_reg_os_results["tpr"],
'auc': lin_reg_os_results["auc"]
}, ignore_index=True)
# %%
# boxplot for two example variables in the dataset
f, axes = plt.subplots(1, 2, figsize=(18,4), sharex = True)
variable1 = dataset["V1"]
variable2 = dataset["V2"]
sns.boxplot(variable1, color="m", ax=axes[0]).set_title('Boxplot for V1')
sns.boxplot(variable2, color="m", ax=axes[1]).set_title('Boxplot for V2')
plt.show()
# %%
# Find the IQR for all the feature variables
# Please note that we are keeping Class variable also in this evaluation, though we know using this method no observation
# be removed based on this variable.
quartile1 = dataset.quantile(0.25)
quartile3 = dataset.quantile(0.75)
IQR = quartile3 - quartile1
print(IQR)
# %%
# Remove the outliers
constant = 3
datavalid = dataset[~((dataset < (quartile1 - constant * IQR)) |(dataset > (quartile3 + constant * IQR))).any(axis=1)]
deletedrows = dataset.shape[0] - datavalid.shape[0]
print("We have removed " + str(deletedrows) + " rows from the data as outliers")