generated from wbkd/webpack-starter
-
Notifications
You must be signed in to change notification settings - Fork 2
/
build_data.py
278 lines (234 loc) · 8.81 KB
/
build_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# flake8: noqa
import json
import logging
import os
import sys
from datetime import datetime
import pandas as pd
from normality import normalize
# setup logger
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
log_format = "%(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
root.addHandler(handler)
log = logging.getLogger(__name__)
# some defaults
NOW = datetime.now()
DATABASE_URI = os.getenv("FTM_STORE_URI")
BASE_URL = "https://correctiv.github.io/ru-sanctions-dashboard/public"
COR_BASE_URL = "https://correctiv.org/wp-content/uploads/2022/02"
CATEGORIES = {
"Person": "Person",
"Company": "Unternehmen",
"Airplane": "Flugzeug",
"Vessel": "Schiff",
"Other": "Sonstige",
}
AUTHORITIES = {
"European External Action Service": "eu",
"United Nations Security Council (UN SC)": "uno",
"UN; Office of Financial Sanctions Implementation": "uno",
"World Bank": "uno",
}
# helpers
def load_data():
return pd.read_sql(
"""
select
s.id as sanction_id,
e.id as entity_id,
s.entity -> 'properties' -> 'program' as program,
s.entity -> 'properties' -> 'reason' as reason,
s.entity -> 'properties' -> 'country' as origin,
s.entity -> 'properties' -> 'authority' as authority,
s.entity -> 'properties' -> 'sourceUrl' as sourceUrl,
s.entity -> 'properties' -> 'startDate' as startDate,
s.entity -> 'properties' -> 'listingDate' as listingDate,
s.entity -> 'properties' -> 'endDate' as endDate,
s.entity -> 'properties' -> 'date' as date,
e.entity ->> 'schema' as schema,
e.entity ->> 'caption' as name,
e.entity -> 'properties' -> 'country' as countries
from ftm_opensanctions s
join ftm_opensanctions e on
s.entity @> '{"schema": "Sanction"}' and
s.entity -> 'properties' -> 'entity' ->> 0 = e.id
""",
DATABASE_URI,
)
def clean_data(df):
def get_start(row):
if row["listingdate"] is not None:
return min(row["listingdate"])
if row["startdate"] is not None:
return min(row["startdate"])
if row["date"] is not None:
return min(row["date"])
def get_end(row):
if row["enddate"] is not None:
return max(row["enddate"])
def get_active(row):
if row["start"] and row["start"] < NOW:
if not pd.isna(row["end"]):
return row["end"] > NOW
return True
return False
df["start"] = pd.to_datetime(df.apply(get_start, axis=1), errors="coerce")
df["end"] = pd.to_datetime(df.apply(get_end, axis=1), errors="coerce")
df["active"] = df.apply(get_active, axis=1)
df["origin"] = df["origin"].map(lambda x: x[0] if x else x)
return df.sort_values("start", ascending=False)
def clean_table(df):
def unpack(value):
if value is None:
return
return "; ".join(sorted(value, key=lambda x: len(x)))
def clean_date(value):
if value is None:
return
return value.date()
def clean_authority(row):
value = row.get("authority")
if value is None:
return
origin = row.get("origin")
if origin is not None:
return f":{origin}: {origin.upper()}"
origin = AUTHORITIES.get(value)
if origin == "uno":
return f"![{origin}]({BASE_URL}/img/{origin}.svg) {origin[:2].upper()}"
return value
def markdown_url(value):
if value is None:
return
return f"[URL]({value})"
df_table = df.copy()
df_table["program"] = df_table["program"].map(unpack)
df_table["authority"] = df_table["authority"].map(unpack)
# brute force transliteration
df_table["name"] = (
df_table["name"]
.map(lambda x: normalize(x, latinize=True, lowercase=False))
.map(lambda x: " ".join(i.title() for i in x.split()))
)
df_table["countries"] = df_table["countries"].map(unpack)
df_table["reason"] = df_table["reason"].map(unpack)
df_table["sourceurl"] = (
df_table["sourceurl"]
.map(lambda x: x[0] if x is not None else x)
.map(markdown_url) # noqa
)
df_table["start"] = df_table["start"].map(clean_date)
df_table["end"] = df_table["end"].map(clean_date)
df_table["category_en"] = df_table["schema"].map(
lambda x: x if x in CATEGORIES.keys() else "Other"
)
df_table["category_de"] = df_table["category_en"].map(lambda x: CATEGORIES[x])
df_table["authority"] = df_table.apply(clean_authority, axis=1)
df_table = df_table[
["name", "category_en", "category_de", "start", "end", "authority", "sourceurl"]
]
df_table = df_table.drop_duplicates()
return df_table
if __name__ == "__main__":
# load data
df = load_data()
df = clean_data(df)
log.info(f"Total entries: `{len(df)}`")
# filter for active sanctions
df = df[df["active"]]
log.info(f"Active entries: `{len(df)}`")
# filter for russia and >= 2014
df = df[df["countries"].fillna("").map(lambda x: "ru" in x)]
df = df[df["start"] > "2014"]
log.info(f"Entries against russian targets since 2014: `{len(df)}`")
df.index = pd.DatetimeIndex(df["start"])
# generate table csv
df_table = clean_table(df)
df_table.fillna("").to_csv(
"./src/data/sanctions_2014-2022.csv", index=False
) # noqa
# generate timeline csv
pd.DataFrame(df.resample("1M")["sanction_id"].count()).to_csv(
"./src/data/sanctions_timeline_2014-2022.csv"
)
# generate recent aggregations
df_recent = df[df["start"] > "2022-02-21"]
log.info(f"Entries since 2022-02-22: `{len(df_recent)}`")
# per schema aggregation
SCHEMA = {"Person": "Person", "Company": "Unternehmen", "Other": "Sonstige Ziele"}
df_recent_schema = df_recent.copy()
df_recent_schema["schema"] = df_recent_schema["schema"].map(
lambda x: x if x in SCHEMA else "Other"
)
df_recent_schema = (
df_recent_schema.groupby("schema")
.resample("1D")["sanction_id"]
.count()
.reset_index()
)
df_recent_schema = df_recent_schema.pivot(
index="start", columns="schema", values="sanction_id"
)
df_recent_schema = df_recent_schema.applymap(
lambda x: None if x == 0 else x
).dropna(how="all")
df_recent_schema.index = df_recent_schema.index.map(lambda x: x.date())
df_recent_schema = df_recent_schema.sort_values("start", ascending=False).head(10)
df_recent_schema.fillna(0).to_csv("./src/data/recent_schema_aggregation_en.csv")
df_recent_schema.columns = df_recent_schema.columns.map(lambda x: SCHEMA[x])
df_recent_schema.fillna(0).to_csv("./src/data/recent_schema_aggregation_de.csv")
# per origin - table with 1st row as flag icon header
def get_icon(origin):
if origin == "uno":
return f"![{origin}]({BASE_URL}/img/{origin}.svg)"
return f":{origin}:"
df_recent_origin = (
df_recent.groupby("origin").resample("1D")["sanction_id"].count().reset_index()
)
df_recent_origin = df_recent_origin.pivot(
index="start", columns="origin", values="sanction_id"
)
df_recent_origin = (
df_recent_origin.applymap(lambda x: None if x == 0 else x)
.dropna(how="all")
.applymap(lambda x: "" if pd.isna(x) else str(int(x)))
)
df_recent_origin.index = df_recent_origin.index.map(lambda x: x.date()).map(str)
df_recent_origin.loc[""] = df_recent_origin.columns.map(get_icon)
df_recent_origin.iloc[::-1].fillna("").to_csv(
"./src/data/recent_origin_aggregation_table.csv"
)
# meta data to inject into page via js
df["old"] = (df["start"] < "2022-02-22").map(int)
df["recent"] = (df["start"] > "2022-02-21").map(int)
df["all"] = 1
old_sanctions = df["old"].sum()
old_entities = len(df[df["old"].map(bool)]["entity_id"].unique())
new_sanctions = df["recent"].sum()
new_entities = len(df[df["recent"].map(bool)]["entity_id"].unique())
all_sanctions = len(df)
all_entities = len(df["entity_id"].unique())
df_meta = pd.DataFrame(
(
(old_sanctions, new_sanctions, all_sanctions),
(old_entities, new_entities, all_entities),
),
columns=("old", "recent", "all"),
index=("sanctions", "entities"),
)
df_meta = pd.concat(
(
df_meta,
df.groupby("origin")[["old", "recent", "all"]].sum(),
df.groupby("schema")[["old", "recent", "all"]].sum(),
)
)
meta = df_meta.to_dict()
meta["last_updated"] = NOW.isoformat()
with open("./src/data/meta.json", "w") as f:
json.dump(meta, f)