-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
1170 lines (954 loc) · 42.7 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import decorator
import flask
from flask.typing import ResponseReturnValue as RRV
import json
from markupsafe import Markup
import mwapi # type: ignore
import mwoauth # type: ignore
import os
import random
import re
import requests
import requests_oauthlib # type: ignore
import stat
import string
import sys
import toolforge
from typing import Any, Callable, Container, Dict, \
Iterable, List, Optional, Tuple
import werkzeug
import yaml
from converters import EntityIdConverter, PropertyIdConverter, \
RankConverter, WikiConverter, WikiWithQueryServiceConverter, \
WikiWithoutQueryServiceException
from query_service import query_wiki, query_service_name, query_service_url
import wbformat
app = flask.Flask(__name__)
user_agent = toolforge.set_user_agent(
'ranker',
email='[email protected]')
@decorator.decorator
def read_private(func: Callable, *args: Any, **kwargs: Any) -> Any:
try:
f = args[0]
fd = f.fileno()
except AttributeError:
pass
except IndexError:
pass
else:
mode = os.stat(fd).st_mode
if (stat.S_IRGRP | stat.S_IROTH) & mode:
name = getattr(f, "name", "config file")
raise ValueError(f'{name} is readable to others, '
'must be exclusively user-readable!')
return func(*args, **kwargs)
has_config = app.config.from_file('config.yaml',
load=read_private(yaml.safe_load),
silent=True)
if not has_config:
print('config.yaml file not found, assuming local development setup')
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(64))
app.secret_key = random_string
if 'OAUTH' in app.config:
oauth_config = app.config['OAUTH']
consumer_token = mwoauth.ConsumerToken(oauth_config['consumer_key'],
oauth_config['consumer_secret'])
index_php = 'https://www.wikidata.org/w/index.php'
app.url_map.converters['eid'] = EntityIdConverter
app.url_map.converters['pid'] = PropertyIdConverter
app.url_map.converters['rank'] = RankConverter
app.url_map.converters['wiki'] = WikiConverter
app.url_map.converters['wwqs'] = WikiWithQueryServiceConverter
@app.template_global()
def csrf_token() -> str:
"""Get a CSRF token for the current session in the tool.
Not to be confused with edit_token,
which gets a token for use with the MediaWiki API."""
if 'csrf_token' not in flask.session:
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(64))
flask.session['csrf_token'] = random_string
return flask.session['csrf_token']
@app.template_filter()
def user_link(user_name: str) -> Markup:
user_href = 'https://www.wikidata.org/wiki/User:'
return (Markup(r'<a href="' + user_href) +
Markup.escape(user_name.replace(' ', '_')) +
Markup(r'">') +
Markup(r'<bdi>') +
Markup.escape(user_name) +
Markup(r'</bdi>') +
Markup(r'</a>'))
@app.template_global()
def authentication_area() -> Markup:
if 'OAUTH' not in app.config:
return Markup()
session = authenticated_session('www.wikidata.org')
if session is None:
return (Markup(r'<a id="login" class="navbar-text" href="') +
Markup.escape(flask.url_for('login')) +
Markup(r'">Log in</a>'))
userinfo = session.get(action='query',
meta='userinfo')['query']['userinfo']
return (Markup(r'<span class="navbar-text">Logged in as ') +
user_link(userinfo['name']) +
Markup(r'</span>'))
@app.template_global()
def can_edit() -> bool:
if 'OAUTH' not in app.config:
return True
return 'oauth_access_token' in flask.session
@app.template_global()
def has_query_service(wiki: str) -> bool:
try:
app.url_map.converters['wwqs'](app.url_map).to_python(wiki)
return True
except WikiWithoutQueryServiceException:
return False
@app.template_global()
def format_value(wiki: str, property_id: str, value: dict) -> Markup:
return wbformat.format_value(anonymous_session(wiki), property_id, value)
@app.template_global()
def format_entity(wiki: str, entity_id: str) -> Markup:
return wbformat.format_entity(anonymous_session(wiki), entity_id)
@app.template_filter()
def format_query_service(wiki: str) -> Markup:
return (Markup(r'<a href="') +
Markup.escape(query_service_url(wiki)) +
Markup(r'">') +
Markup.escape(query_service_name(wiki)) +
Markup(r'</a>'))
@app.template_filter()
def wiki_reason_preferred_property(wiki: str) -> Optional[str]:
if wiki in {'www.wikidata.org', 'commons.wikimedia.org'}:
return 'P7452'
return None
@app.template_filter()
def wiki_reason_deprecated_property(wiki: str) -> Optional[str]:
if wiki in {'www.wikidata.org', 'commons.wikimedia.org'}:
return 'P2241'
return None
def anonymous_session(wiki: str) -> mwapi.Session:
return mwapi.Session('https://' + wiki,
user_agent=user_agent)
def authenticated_session(wiki: str) -> Optional[mwapi.Session]:
if 'oauth_access_token' not in flask.session:
return None
access_token = mwoauth.AccessToken(
**flask.session['oauth_access_token'])
auth = requests_oauthlib.OAuth1(client_key=consumer_token.key,
client_secret=consumer_token.secret,
resource_owner_key=access_token.key,
resource_owner_secret=access_token.secret)
return mwapi.Session(host='https://' + wiki,
auth=auth,
user_agent=user_agent)
@app.route('/')
def index() -> RRV:
args = flask.request.args
return flask.render_template('index.html',
wiki=args.get('wiki'),
entity_id=args.get('entity_id'),
property_id=args.get('property_id'))
@app.route('/', methods=['POST'])
def redirect_edit() -> RRV:
form = flask.request.form
wiki = form['wiki']
entity_id = form['entity_id']
if entity_id.startswith('File:'):
try:
session = anonymous_session(wiki)
response = session.get(action='query',
titles=[entity_id],
formatversion=2)
page_id = response['query']['pages'][0]['pageid']
entity_id = f'M{page_id}'
except Exception:
pass # leave entity_id as it is
url = flask.url_for('show_edit_form',
wiki=wiki,
entity_id=entity_id,
property_id=form['property_id'])
return flask.redirect(url)
@app.route('/batch/list/collective/', methods=['POST'])
def redirect_batch_list_collective() -> RRV:
return flask.redirect(flask.url_for('show_batch_list_collective_form',
wiki=flask.request.form['wiki']))
@app.route('/batch/list/individual/', methods=['POST'])
def redirect_batch_list_individual() -> RRV:
return flask.redirect(flask.url_for('show_batch_list_individual_form',
wiki=flask.request.form['wiki']))
@app.route('/batch/query/collective/', methods=['POST'])
def redirect_batch_query_collective() -> RRV:
return flask.redirect(flask.url_for('show_batch_query_collective_form',
wiki=flask.request.form['wiki']))
@app.route('/batch/query/individual/', methods=['POST'])
def redirect_batch_query_individual() -> RRV:
return flask.redirect(flask.url_for('show_batch_query_individual_form',
wiki=flask.request.form['wiki']))
@app.route('/edit/<wiki:wiki>/<eid:entity_id>/<pid:property_id>/')
def show_edit_form(wiki: str, entity_id: str, property_id: str) -> RRV:
session = anonymous_session(wiki)
entity = get_entities(session, [entity_id])[entity_id]
if 'missing' in entity:
return flask.render_template('no-such-entity.html',
wiki=wiki,
entity_id=entity_id), 404
base_revision_id = entity['lastrevid']
statements = entity_statements(entity).get(property_id, [])
prefetch_entity_ids = {entity_id, property_id}
for statement in statements:
prefetch_entity_ids.update(statement.get('qualifiers', {}).keys())
wbformat.prefetch_entities(session, prefetch_entity_ids)
return flask.render_template('edit.html',
wiki=wiki,
entity_id=entity_id,
property_id=property_id,
statements=statements,
base_revision_id=base_revision_id)
@app.route('/edit/<wiki:wiki>/<eid:entity_id>/<pid:property_id>/set/<rank:rank>', # noqa:E501
methods=['POST'])
def edit_set_rank(wiki: str, entity_id: str, property_id: str, rank: str) \
-> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
statement_ids = flask.request.form
reason = flask.request.form.get('reason')
custom_summary = flask.request.form.get('summary')
base_revision_id = flask.request.form['base_revision_id']
response = requests.get(f'https://{wiki}/wiki/Special:EntityData/'
f'{entity_id}.json?revision={base_revision_id}')
entity = response.json()['entities'][entity_id]
statements = entity_statements(entity).get(property_id, [])
statement_groups, edited_statements = statements_set_rank_to(
statement_ids,
rank,
{property_id: statements},
wiki,
reason,
)
if not edited_statements:
return redirect(session, base_revision_id)
edited_entity = build_entity(entity_id, statement_groups)
summary = get_summary_set_rank(edited_statements,
rank,
wiki,
reason,
custom_summary)
return save_entity_and_redirect(edited_entity,
summary,
base_revision_id,
session)
@app.route('/edit/<wiki:wiki>/<eid:entity_id>/<pid:property_id>/increment',
methods=['POST'])
def edit_increment_rank(wiki: str, entity_id: str, property_id: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
statement_ids = flask.request.form
reason = flask.request.form.get('reason')
custom_summary = flask.request.form.get('summary')
base_revision_id = flask.request.form['base_revision_id']
response = requests.get(f'https://{wiki}/wiki/Special:EntityData/'
f'{entity_id}.json?revision={base_revision_id}')
entity = response.json()['entities'][entity_id]
statements = entity_statements(entity).get(property_id, [])
statement_groups, edited_statements = statements_increment_rank(
statement_ids,
{property_id: statements},
wiki,
reason,
)
if not edited_statements:
return redirect(session, base_revision_id)
edited_entity = build_entity(entity_id, {property_id: statements})
summary = get_summary_increment_rank(edited_statements,
custom_summary)
return save_entity_and_redirect(edited_entity,
summary,
base_revision_id,
session)
@app.route('/batch/list/collective/<wiki:wiki>/')
def show_batch_list_collective_form(wiki: str) -> RRV:
return flask.render_template('batch-list-collective.html',
wiki=wiki)
@app.route('/batch/list/collective/<wiki:wiki>/set/<rank:rank>',
methods=['POST'])
def batch_list_set_rank(wiki: str, rank: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
statement_ids_list = flask.request.form.get('statement_ids', '')
reason = flask.request.form.get('reason')
custom_summary = flask.request.form.get('summary')
statement_ids_by_entity_id = parse_statement_ids_list(statement_ids_list)
return batch_set_rank_and_show_results(wiki,
statement_ids_by_entity_id,
rank,
reason,
session,
custom_summary)
@app.route('/batch/list/collective/<wiki:wiki>/increment',
methods=['POST'])
def batch_list_increment_rank(wiki: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
statement_ids_list = flask.request.form.get('statement_ids', '')
reason = flask.request.form.get('reason')
custom_summary = flask.request.form.get('summary')
statement_ids_by_entity_id = parse_statement_ids_list(statement_ids_list)
return batch_increment_rank_and_show_results(wiki,
statement_ids_by_entity_id,
reason,
session,
custom_summary)
@app.route('/batch/query/collective/<wwqs:wiki>/')
def show_batch_query_collective_form(wiki: str) -> RRV:
return flask.render_template('batch-query-collective.html',
wiki=wiki)
@app.route('/batch/query/collective/<wwqs:wiki>/set/<rank:rank>',
methods=['POST'])
def batch_query_set_rank(wiki: str, rank: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
query = flask.request.form.get('query', '')
reason = flask.request.form.get('reason')
custom_summary = flask.request.form.get('summary')
statement_ids_by_entity_id = query_statement_ids(wiki, query)
return batch_set_rank_and_show_results(wiki,
statement_ids_by_entity_id,
rank,
reason,
session,
custom_summary)
@app.route('/batch/query/collective/<wwqs:wiki>/increment',
methods=['POST'])
def batch_query_increment_rank(wiki: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
query = flask.request.form.get('query', '')
reason = flask.request.form.get('reason')
custom_summary = flask.request.form.get('summary')
statement_ids_by_entity_id = query_statement_ids(wiki, query)
return batch_increment_rank_and_show_results(wiki,
statement_ids_by_entity_id,
reason,
session,
custom_summary)
@app.route('/batch/list/individual/<wiki:wiki>/')
def show_batch_list_individual_form(wiki: str) -> RRV:
return flask.render_template('batch-list-individual.html',
wiki=wiki)
@app.route('/batch/list/individual/<wiki:wiki>/',
methods=['POST'])
def batch_list_edit_rank(wiki: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
commands_list = flask.request.form.get('commands', '')
custom_summary = flask.request.form.get('summary')
commands_by_entity_id = parse_statement_ids_with_ranks_and_reasons(
commands_list,
)
return batch_edit_rank_and_show_results(wiki,
commands_by_entity_id,
session,
custom_summary)
@app.route('/batch/query/individual/<wwqs:wiki>/')
def show_batch_query_individual_form(wiki: str) -> RRV:
return flask.render_template('batch-query-individual.html',
wiki=wiki)
@app.route('/batch/query/individual/<wwqs:wiki>/',
methods=['POST'])
def batch_query_edit_rank(wiki: str) -> RRV:
if not submitted_request_valid():
return 'CSRF error', 400 # TODO better error
session = authenticated_session(wiki)
if session is None:
return 'not logged in', 401 # TODO better error
query = flask.request.form.get('query', '')
custom_summary = flask.request.form.get('summary')
commands_by_entity_id = query_statement_ids_with_ranks_and_reasons(
wiki,
query,
)
return batch_edit_rank_and_show_results(wiki,
commands_by_entity_id,
session,
custom_summary)
@app.route('/login')
def login() -> RRV:
redirect, request_token = mwoauth.initiate(index_php,
consumer_token,
user_agent=user_agent)
flask.session['oauth_request_token'] = dict(zip(request_token._fields,
request_token))
return_url = flask.request.referrer
if return_url and return_url.startswith(full_url('index')):
flask.session['oauth_redirect_target'] = return_url
return flask.redirect(redirect)
@app.route('/oauth/callback')
def oauth_callback() -> RRV:
oauth_request_token = flask.session.pop('oauth_request_token', None)
if oauth_request_token is None:
already_logged_in = 'oauth_access_token' in flask.session
query_string = flask.request.query_string\
.decode('utf8')
return flask.render_template('no-oauth-request-token.html',
already_logged_in=already_logged_in,
query_string=query_string)
request_token = mwoauth.RequestToken(**oauth_request_token)
access_token = mwoauth.complete(index_php,
consumer_token,
request_token,
flask.request.query_string,
user_agent=user_agent)
flask.session['oauth_access_token'] = dict(zip(access_token._fields,
access_token))
flask.session.permanent = True
flask.session.pop('csrf_token', None)
redirect_target = flask.session.pop('oauth_redirect_target', None)
return flask.redirect(redirect_target or flask.url_for('index'))
@app.route('/logout')
def logout() -> RRV:
flask.session.pop('oauth_access_token', None)
flask.session.permanent = False
return flask.redirect(flask.url_for('index'))
def full_url(endpoint: str, **kwargs) -> str:
scheme = flask.request.headers.get('X-Forwarded-Proto', 'http')
return flask.url_for(endpoint, _external=True, _scheme=scheme, **kwargs)
def submitted_request_valid() -> bool:
"""Check whether a submitted POST request is valid.
If this method returns False, the request might have been issued
by an attacker as part of a Cross-Site Request Forgery attack;
callers MUST NOT process the request in that case.
"""
real_token = flask.session.get('csrf_token')
submitted_token = flask.request.form.get('csrf_token')
if not real_token:
# we never expected a POST
return False
if not submitted_token:
# token got lost or attacker did not supply it
return False
if submitted_token != real_token:
# incorrect token (could be outdated or incorrectly forged)
return False
return True
@app.after_request
def deny_frame(response: flask.Response) -> flask.Response:
"""Disallow embedding the tool’s pages in other websites.
Not every tool can be usefully embedded in other websites, but
allowing embedding can expose the tool to clickjacking
vulnerabilities, so err on the side of caution and disallow
embedding. This can be removed (possibly only for certain pages)
as long as other precautions against clickjacking are taken.
"""
response.headers['X-Frame-Options'] = 'deny'
return response
def item_id_from_uri(uri: str, wiki: str) -> str:
if wiki in {'www.wikidata.org', 'commons.wikimedia.org'}:
item_wiki = 'www.wikidata.org'
else:
item_wiki = 'test.wikidata.org'
prefix = f'http://{item_wiki}/entity/'
if uri.startswith(prefix):
return uri[len(prefix):]
else:
raise ValueError(f'URI {uri} does not belong to item wiki {item_wiki}')
def statement_id_from_uri(uri: str, wiki: str) -> str:
for protocol in ['http', 'https']:
prefix = f'{protocol}://{wiki}/entity/statement/'
if uri.startswith(prefix):
break
else:
raise ValueError(f'URI {uri} does not belong to wiki {wiki}')
dashed_statement_id = uri[len(prefix):]
entity_id, guid = dashed_statement_id[:-37], dashed_statement_id[-36:]
return f'{entity_id}${guid}'
def rank_from_uri(uri: str) -> str:
return {
'http://wikiba.se/ontology#DeprecatedRank': 'deprecated',
'http://wikiba.se/ontology#NormalRank': 'normal',
'http://wikiba.se/ontology#PreferredRank': 'preferred',
}[uri]
def entity_id_from_statement_id(statement_id: str) -> str:
try:
dollar_index = statement_id.index('$')
except ValueError:
flask.abort(400, f'{statement_id} does not look like a statement ID'
' (does not contain a dollar sign)')
else:
return statement_id[:dollar_index].upper()
def parse_statement_ids_list(input: str) -> Dict[str, List[str]]:
statement_ids = input.splitlines()
statement_ids_by_entity_id: Dict[str, List[str]] = {}
for statement_id in statement_ids:
entity_id = entity_id_from_statement_id(statement_id)
statement_ids_by_entity_id.setdefault(entity_id, [])\
.append(statement_id)
return statement_ids_by_entity_id
def query_statement_ids(wiki: str, query: str) -> Dict[str, List[str]]:
results = query_wiki(wiki, query, user_agent)
assert 'statement' in results['head']['vars'] # TODO better error handling
statement_ids_by_entity_id: Dict[str, List[str]] = {}
for result in results['results']['bindings']:
if result['statement']['type'] != 'uri':
continue
statement_id = statement_id_from_uri(result['statement']['value'],
wiki)
entity_id = entity_id_from_statement_id(statement_id)
statement_ids_by_entity_id.setdefault(entity_id, [])\
.append(statement_id)
return statement_ids_by_entity_id
def parse_statement_ids_with_ranks_and_reasons(input: str) \
-> Dict[str, Dict[str, Tuple[str, str]]]:
commands = input.splitlines()
commands_by_entity_id: Dict[str, Dict[str, Tuple[str, str]]] = {}
for command in commands:
statement_id, rank, reason, _ = re.split(
r'[|\t]',
command + '||', # ensure we can unpack reason even if not given
maxsplit=3,
)
entity_id = entity_id_from_statement_id(statement_id)
commands_by_entity_id.setdefault(entity_id, {})\
[statement_id] = rank, reason # noqa: E211
return commands_by_entity_id
def query_statement_ids_with_ranks_and_reasons(wiki: str, query: str) \
-> Dict[str, Dict[str, Tuple[str, str]]]:
results = query_wiki(wiki, query, user_agent)
# TODO better error handling
assert 'statement' in results['head']['vars']
assert 'rank' in results['head']['vars']
commands_by_entity_id: Dict[str, Dict[str, Tuple[str, str]]] = {}
for result in results['results']['bindings']:
if result['statement']['type'] != 'uri':
continue
if result['rank']['type'] != 'uri':
continue
statement_id = statement_id_from_uri(result['statement']['value'],
wiki)
entity_id = entity_id_from_statement_id(statement_id)
rank = rank_from_uri(result['rank']['value'])
reason_uri = None
if rank == 'preferred':
reason_uri = result.get('reasonForPreferredRank', {}).get('value')
elif rank == 'deprecated':
reason_uri = result.get('reasonForDeprecatedRank', {}).get('value')
if not reason_uri:
reason_uri = result.get('reason', {}).get('value')
if reason_uri:
reason = item_id_from_uri(reason_uri, wiki)
else:
reason = ''
commands_by_entity_id.setdefault(entity_id, {})\
[statement_id] = rank, reason # noqa: E211
return commands_by_entity_id
def get_entities(session: mwapi.Session, entity_ids: Iterable[str]) -> dict:
entity_ids = list(set(entity_ids))
entities = {}
for chunk in [entity_ids[i:i+50] for i in range(0, len(entity_ids), 50)]:
response = session.get(action='wbgetentities',
ids=chunk,
props=['info', 'claims'],
formatversion=2)
entities.update(response['entities'])
return entities
def entity_statements(entity: dict) -> Dict[str, List[dict]]:
if entity.get('type') == 'mediainfo': # optional due to T272804
statements = entity['statements']
if statements == []:
statements = {} # work around T222159
else:
statements = entity['claims']
return statements
def increment_rank(rank: str) -> str:
return {
'deprecated': 'normal',
'normal': 'preferred',
'preferred': 'preferred',
}[rank]
def statements_set_rank_to(statement_ids: Container[str],
rank: str,
statements: Dict[str, List[dict]],
wiki: str,
reason: Optional[str]) \
-> Tuple[Dict[str, List[dict]], int]:
"""Set the rank of certain statements to a constant value.
statement_ids is a container (e.g. a set) of statement IDs,
controlling which of the given statements are actually edited.
rank is the target rank,
and statements is a mapping from property IDs to statement groups.
wiki specifies the wiki the statements belong to,
and reason is an optional reason for preferred or deprecated rank
(an exception is raised if a reason is given for normal rank).
Returns a dict of statement groups of edited statements
(though the lists in the statements parameter are also edited in-place),
and the number of edited statements."""
edited_statement_groups: Dict[str, List[dict]] = {}
edited_statements = 0
for property_id, statement_group in statements.items():
for statement in statement_group:
if statement['id'] in statement_ids and statement['rank'] != rank:
statement['rank'] = rank
statement_remove_reasons(statement, wiki)
statement_set_reason(statement, rank, wiki, reason)
edited_statement_groups.setdefault(property_id, [])\
.append(statement)
edited_statements += 1
return edited_statement_groups, edited_statements
def statements_increment_rank(statement_ids: Container[str],
statements: Dict[str, List[dict]],
wiki: str,
reason: Optional[str]) \
-> Tuple[Dict[str, List[dict]], int]:
"""Increment the rank of certain statements.
statement_ids is a container (e.g. a set) of statement IDs,
controlling which of the given statements are actually edited.
statements is a mapping from property IDs to statement groups.
wiki specifies the wiki the statements belong to.
reason is mainly included for consistency with statements_set_rank_to,
an exception is raised whenever it is specified.
Returns a dict of statement groups of edited statements
(though the lists in the statements parameter are also edited in-place),
and the number of edited statements."""
edited_statement_groups: Dict[str, List[dict]] = {}
edited_statements = 0
for property_id, statement_group in statements.items():
for statement in statement_group:
if statement['id'] in statement_ids:
rank = statement['rank']
incremented_rank = increment_rank(rank)
if incremented_rank != rank:
statement['rank'] = incremented_rank
statement_remove_reasons(statement, wiki)
if reason:
description = ('Specifying a reason when incrementing '
'rank is not supported')
flask.abort(400, description=description)
edited_statement_groups.setdefault(property_id, [])\
.append(statement)
edited_statements += 1
return edited_statement_groups, edited_statements
def statements_edit_rank(commands: Dict[str, Tuple[str, str]],
statements: Dict[str, List[dict]],
wiki: str) \
-> Tuple[Dict[str, List[dict]], int]:
"""Edit the rank of certain statements.
commands maps statement IDs to a tuple of
the rank they should have and the reason for it (optional, may be empty).
statements is a mapping from property IDs to statement groups.
wiki specifies the wiki the statements belong to.
Returns the edited statements in the same format
(in fact, it returns unedited statements too,
though I don’t remember if this is intentional or not),
and the number of edited statements."""
edited_statements = 0
for statement_group in statements.values():
for statement in statement_group:
if statement['id'] in commands:
rank, reason = commands[statement['id']]
if rank != statement['rank']:
statement['rank'] = rank
statement_remove_reasons(statement, wiki)
statement_set_reason(statement, rank, wiki, reason)
edited_statements += 1
return statements, edited_statements
def statement_remove_reasons(statement: dict, wiki: str):
"""Remove any reason for preferred / deprecated rank
qualifiers from the statement."""
if qualifiers := statement.get('qualifiers', {}):
qualifiers.pop(wiki_reason_preferred_property(wiki), None)
qualifiers.pop(wiki_reason_deprecated_property(wiki), None)
def statement_set_reason(
statement: dict,
rank: str,
wiki: str,
reason: Optional[str],
):
"""Set a reason for preferred / deprecated rank on the statement.
statement is the statement to be edited and is updated in place.
rank is rank that the statement is being set to, determining the property.
wiki specifies the wiki the statement belongs to.
reason is an item ID; if None or empty, nothing is done."""
if not reason:
return
if rank == 'preferred':
property_id = wiki_reason_preferred_property(wiki)
elif rank == 'deprecated':
property_id = wiki_reason_deprecated_property(wiki)
else:
property_id = None
if property_id is None:
description = Markup('Cannot set a reason for {} rank on {}')\
.format(rank, wiki)
flask.abort(400, description=description)
qualifiers = statement.setdefault('qualifiers', {})
assert property_id not in qualifiers, \
'existing reasons should have been removed already'
qualifiers[property_id] = [{
'snaktype': 'value',
'property': property_id,
'datatype': 'wikibase-item',
'datavalue': {
'type': 'wikibase-entityid',
'value': {
'entity-type': 'item',
'id': reason,
},
},
}]
def build_entity(entity_id: str,
statement_groups: Dict[str, List[dict]]) -> dict:
return {
'id': entity_id,
'claims': statement_groups,
# yes, 'claims' even for MediaInfo entities
}
def str_strip_optional(s: Optional[str]) -> Optional[str]:
return s.strip() if s is not None else None
def get_summary_set_rank(edited_statements: int,
rank: str,
wiki: str,
reason: Optional[str],
custom_summary: Optional[str]) -> str:
if edited_statements == 1:
summary = f'Set rank of 1 statement to {rank}'
else:
summary = f'Set rank of {edited_statements} statements to {rank}'
if reason:
prefix = wiki_reason_summary_prefix(wiki)
summary += ' (reason: [[' + prefix + reason + ']])'
custom_summary = str_strip_optional(custom_summary)
if custom_summary:
summary += ': ' + custom_summary
return summary
def get_summary_increment_rank(edited_statements: int,
custom_summary: Optional[str]) -> str:
if edited_statements == 1:
summary = 'Incremented rank of 1 statement'
else:
summary = f'Incremented rank of {edited_statements} statements'
custom_summary = str_strip_optional(custom_summary)
if custom_summary:
summary += ': ' + custom_summary
return summary
def get_summary_edit_rank(edited_statements: int,
custom_summary: Optional[str]) -> str:
if edited_statements == 1:
summary = 'Edited rank of 1 statement'
else:
summary = f'Edited rank of {edited_statements} statements'
custom_summary = str_strip_optional(custom_summary)
if custom_summary:
summary += ': ' + custom_summary
return summary
def wiki_reason_summary_prefix(wiki: str) -> str:
if wiki == 'commons.wikimedia.org':
return 'd:Special:EntityPage/'
elif wiki == 'test-commons.wikimedia.org':
return 'testwikidata:Special:EntityPage/'
else:
return ''
def edit_token(session: mwapi.Session) -> str:
"""Get an edit token / CSRF token for the MediaWiki API.
Not to be confused with csrf_token,
which gets a token for use within the tool."""
edit_tokens = flask.g.setdefault('edit_tokens', {})
key = session.host
if key in edit_tokens:
return edit_tokens[key]
token = session.get(action='query',
meta='tokens',
type='csrf')['query']['tokens']['csrftoken']
edit_tokens[key] = token
return token
def save_entity(entity_data: dict,
summary: str,
base_revision_id: int | str,
session: mwapi.Session) -> int:
token = edit_token(session)
api_response = session.post(action='wbeditentity',
id=entity_data['id'],
data=json.dumps(entity_data),
summary=summary,
baserevid=base_revision_id,
token=token,
formatversion=2)
if api_response['entity'].get('nochange', False):
print('WARNING: The API returned that no change was made,',
'so save_entity() should not have been called;',
f'we edited {entity_data["id"]} as of {base_revision_id},',
'with the following data:',
entity_data,
file=sys.stderr)
revision_id = api_response['entity']['lastrevid']