-
Notifications
You must be signed in to change notification settings - Fork 19
/
twitter-parser.py
executable file
·1487 lines (1309 loc) · 79.6 KB
/
twitter-parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
twitter-archive-parser - Python code to parse a Twitter archive and output in various ways
Copyright (C) 2022 Tim Hutton - https://github.com/timhutton/twitter-archive-parser
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from collections import defaultdict
from dis import Instruction
from typing import Optional
from urllib.parse import urlparse
import datetime
import glob
import importlib
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
from prompt import write_json
# hot-loaded if needed, see import_module():
# imagesize
# requests
# Print a compile-time error in Python < 3.6. This line does nothing in Python 3.6+ but is reported to the user
# as an error (because it is the first line that fails to compile) in older versions.
f' Error: This script requires Python 3.6 or later. Use `python --version` to check your version.'
class UserData:
def __init__(self, user_id: str, handle: str):
if user_id is None:
raise ValueError('ID "None" is not allowed in UserData.')
self.user_id = user_id
if handle is None:
raise ValueError('handle "None" is not allowed in UserData.')
self.handle = handle
class PathConfig:
"""
Helper class containing constants for various directories and files.
The script will only add / change / delete content in its own directories, which start with `parser-`.
Files within `parser-output` are the end result that the user is probably interested in.
Files within `parser-cache` are temporary working files, which improve the efficiency if you run
this script multiple times. They can safely be removed without harming the consistency of the
files within `parser-output`.
"""
def __init__(self, dir_archive):
self.dir_archive = dir_archive
self.dir_input_data = os.path.join(dir_archive, 'data')
self.file_account_js = os.path.join(self.dir_input_data, 'account.js')
# check if user is in correct folder
if not os.path.isfile(self.file_account_js):
print(f'Error: Failed to load {self.file_account_js}. ')
exit()
self.dir_input_media = find_dir_input_media(self.dir_input_data)
self.dir_output = os.path.join(self.dir_archive, 'parser-output')
self.dir_output_media = os.path.join(self.dir_output, 'media')
self.dir_output_cache = os.path.join(self.dir_archive, 'parser-cache')
self.file_output_following = os.path.join(self.dir_output, 'following.txt')
self.file_output_followers = os.path.join(self.dir_output, 'followers.txt')
self.file_download_log = os.path.join(self.dir_output_media, 'download_log.txt')
self.file_tweet_icon = os.path.join(self.dir_output_media, 'tweet.ico')
self.files_input_tweets = find_files_input_tweets(self.dir_input_data)
# structured like an actual tweet output file, can be used to compute relative urls to a media file
self.example_file_output_tweets = self.create_path_for_file_output_tweets(year=2020, month=12)
def create_path_for_file_output_tweets(self, year, month, format="html", kind="tweets") -> str:
"""Builds the path for a tweet-archive file based on some properties."""
# Previously the filename was f'{dt.year}-{dt.month:02}-01-Tweet-Archive-{dt.year}-{dt.month:02}'
return os.path.join(self.dir_output, f"{kind}-{format}", f"{year:04}", f"{year:04}-{month:02}-01-{kind}.{format}")
def create_path_for_file_output_dms(self, name: str, index: Optional[int]=None, format: str="html", kind: str="DMs") -> str:
"""Builds the path for a dm-archive file based on some properties."""
index_suffix = ""
if (index):
index_suffix = f"-part{index:03}"
return os.path.join(self.dir_output, kind, f"{kind}-{name}{index_suffix}.{format}")
def create_path_for_file_output_single(self, format: str, kind: str)->str:
"""Builds the path for a single output file which, i.e. one that is not part of a larger group or sequence."""
return os.path.join(self.dir_output, f"{kind}.{format}")
def get_consent(prompt: str, default_to_yes: bool = False):
"""Asks the user for consent, using the given prompt. Accepts various versions of yes/no, or
an empty answer to accept the default. The default is 'no' unless default_to_yes is passed as
True. The default will be indicated automatically. For unacceptable answers, the user will
be asked again."""
if default_to_yes:
suffix = " [Y/n]"
default_answer = "yes"
else:
suffix = " [y/N]"
default_answer = "no"
# a temporary hack to make the script work in a non-interactive environment
return False
while True:
user_input = input(prompt + suffix)
if user_input == "":
print (f"Your empty response was assumed to mean '{default_answer}' (the default for this question).")
return default_to_yes
if user_input.lower() in ('y', 'yes'):
return True
if user_input.lower() in ('n', 'no'):
return False
print (f"Sorry, did not understand. Please answer with y, n, yes, no, or press enter to accept "
f"the default (which is '{default_answer}' in this case, as indicated by the uppercase "
f"'{default_answer.upper()[0]}'.)")
def import_module(module):
"""Imports a module specified by a string. Example: requests = import_module('requests')"""
try:
return importlib.import_module(module)
except ImportError:
print(f'\nError: This script uses the "{module}" module which is not installed.\n')
if not get_consent('OK to install using pip?'):
exit()
subprocess.run([sys.executable, '-m', 'pip', 'install', module], check=True)
return importlib.import_module(module)
def open_and_mkdirs(path_file):
"""Opens a file for writing. If the parent directory does not exist yet, it is created first."""
mkdirs_for_file(path_file)
return open(path_file, 'w', encoding='utf-8')
def mkdirs_for_file(path_file):
"""Creates the parent directory of the given file, if it does not exist yet."""
path_dir = os.path.split(path_file)[0]
os.makedirs(path_dir, exist_ok=True)
def rel_url(media_path, document_path):
"""Computes the relative URL needed to link from `document_path` to `media_path`.
Assumes that `document_path` points to a file (e.g. `.md` or `.html`), not a directory."""
return os.path.relpath(media_path, os.path.split(document_path)[0]).replace("\\", "/")
def get_twitter_api_guest_token(session, bearer_token):
"""Returns a Twitter API guest token for the current session."""
guest_token_response = session.post("https://api.twitter.com/1.1/guest/activate.json",
headers={'authorization': f'Bearer {bearer_token}'},
timeout=2,
)
guest_token = json.loads(guest_token_response.content)['guest_token']
if not guest_token:
raise Exception(f"Failed to retrieve guest token")
return guest_token
def get_twitter_users(session, bearer_token, guest_token, user_ids):
"""Asks Twitter for all metadata associated with user_ids."""
users = {}
while user_ids:
max_batch = 100
user_id_batch = user_ids[:max_batch]
user_ids = user_ids[max_batch:]
user_id_list = ",".join(user_id_batch)
query_url = f"https://api.twitter.com/1.1/users/lookup.json?user_id={user_id_list}"
response = session.get(query_url,
headers={'authorization': f'Bearer {bearer_token}', 'x-guest-token': guest_token},
timeout=2,
)
if not response.status_code == 200:
raise Exception(f'Failed to get user handle: {response}')
response_json = json.loads(response.content)
for user in response_json:
users[user["id_str"]] = user
return users
def lookup_users(user_ids, users):
"""Fill the users dictionary with data from Twitter"""
# Filter out any users already known
filtered_user_ids = [id for id in user_ids if id not in users]
if not filtered_user_ids:
# Don't bother opening a session if there's nothing to get
return
# Account metadata observed at ~2.1KB on average.
estimated_size = int(2.1 * len(filtered_user_ids))
print(f'{len(filtered_user_ids)} users are unknown.')
if not get_consent(f'Download user data from Twitter (approx {estimated_size:,} KB)?'):
return
requests = import_module('requests')
try:
with requests.Session() as session:
bearer_token = 'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
guest_token = get_twitter_api_guest_token(session, bearer_token)
retrieved_users = get_twitter_users(session, bearer_token, guest_token, filtered_user_ids)
for user_id, user in retrieved_users.items():
if user["screen_name"] is not None:
users[user_id] = UserData(user_id=user_id, handle=user["screen_name"])
print() # empty line for better readability of output
except Exception as err:
print(f'Failed to download user data: {err}')
def read_json_from_js_file(filename):
"""Reads the contents of a Twitter-produced .js file into a dictionary."""
print(f'Parsing {filename}...')
with open(filename, 'r', encoding='utf8') as f:
data = f.readlines()
# if the JSON has no real content, it can happen that the file is only one line long.
# in this case, return an empty dict to avoid errors while trying to read non-existing lines.
if len(data) <= 1:
return {}
# convert js file to JSON: replace first line with just '[', squash lines into a single string
prefix = '['
if '{' in data[0]:
prefix += ' {'
data = prefix + ''.join(data[1:])
# parse the resulting JSON and return as a dict
return json.loads(data)
def extract_username(paths: PathConfig):
"""Returns the user's Twitter username from account.js."""
account = read_json_from_js_file(paths.file_account_js)
return account[0]['account']['username']
def escape_markdown(input_text: str) -> str:
"""
Escape markdown control characters from input text so that the text will not break in rendered markdown.
(Only use on unformatted text parts that do not yet have any markdown control characters added on purpose!)
"""
# disable escapes permanently
return input_text
characters_to_escape: str = r"\_*[]()~`>#+-=|{}.!"
output_text: str = ''
for char in input_text:
if char in characters_to_escape:
# add backslash before control char
output_text = output_text + "\\" + char
elif char == '\n':
# add double space before line break
output_text = output_text + " " + char
else:
output_text = output_text + char
return output_text
def collect_tweet_references(tweet):
if 'tweet' in tweet.keys():
tweet = tweet['tweet']
tweet_ids = set()
in_reply_to = False
quote = False
retweet = False
# Collect quoted tweets
if 'entities' in tweet and 'urls' in tweet['entities']:
for url in tweet['entities']['urls']:
if 'url' in url and 'expanded_url' in url:
expanded_url = url['expanded_url']
matches = re.match(r'^https://twitter.com/([0-9A-Za-z_]*)/status/(\d+)$', expanded_url)
if (matches):
quote = True
# Collect previous tweets in conversation
if 'in_reply_to_status_id_str' in tweet:
in_reply_to = True
# Collect RT retweets
if 'full_text' in tweet and tweet['full_text'].startswith('RT @'):
retweet = True
# TODO: really parse the tweet ids into texts
# needs to create 3 sets for each type.
return in_reply_to, quote, retweet
def convert_tweet(tweet, username, media_sources, users: dict, paths: PathConfig):
"""Converts a JSON-format tweet. Returns tuple of timestamp, markdown and HTML."""
if 'tweet' in tweet.keys():
tweet = tweet['tweet']
if not 'created_at' in tweet:
# seems to be a bug in archive:
return None, None, None, None, None, None, None
timestamp_str = tweet['created_at']
timestamp = int(round(datetime.datetime.strptime(timestamp_str, '%a %b %d %X %z %Y').timestamp()))
# Example: Tue Mar 19 14:05:17 +0000 2019
body_markdown = tweet['full_text']
body_html = tweet['full_text']
tweet_id_str = tweet['id_str']
# for old tweets before embedded t.co redirects were added, ensure the links are
# added to the urls entities list so that we can build correct links later on.
if 'entities' in tweet and 'media' not in tweet['entities'] and len(tweet['entities'].get("urls", [])) == 0:
for word in tweet['full_text'].split():
try:
url = urlparse(word)
except ValueError:
pass # don't crash when trying to parse something that looks like a URL but actually isn't
else:
if url.scheme != '' and url.netloc != '' and not word.endswith('\u2026'):
# Shorten links similar to twitter
netloc_short = url.netloc[4:] if url.netloc.startswith("www.") else url.netloc
path_short = url.path if len(url.path + '?' + url.query) < 15 \
else (url.path + '?' + url.query)[:15] + '\u2026'
tweet['entities']['urls'].append({
'url': word,
'expanded_url': word,
'display_url': netloc_short + path_short,
'indices': [tweet['full_text'].index(word), tweet['full_text'].index(word) + len(word)],
})
# replace t.co URLs with their original versions
if 'entities' in tweet and 'urls' in tweet['entities']:
for url in tweet['entities']['urls']:
if 'url' in url and 'expanded_url' in url:
expanded_url = url['expanded_url']
#body_markdown = body_markdown.replace(url['url'], expanded_url)
body_markdown = body_markdown.replace(url['url'], '(link)')
expanded_url_html = f'<a href="{expanded_url}">{expanded_url}</a>'
body_html = body_html.replace(url['url'], expanded_url_html)
# if the tweet is a reply, construct a header that links the names
# of the accounts being replied to the tweet being replied to
header_markdown = ''
header_html = ''
if 'in_reply_to_status_id' in tweet:
# match and remove all occurrences of '@username ' at the start of the body
replying_to = re.match(r'^(@[0-9A-Za-z_]* )*', body_markdown)[0]
if replying_to:
body_markdown = body_markdown[len(replying_to):]
body_html = body_html[len(replying_to):]
else:
# no '@username ' in the body: we're replying to self
replying_to = f'@{username}'
names = replying_to.split()
# some old tweets lack 'in_reply_to_screen_name': use it if present, otherwise fall back to names[0]
in_reply_to_screen_name = tweet['in_reply_to_screen_name'] if 'in_reply_to_screen_name' in tweet else names[0]
# create a list of names of the form '@name1, @name2 and @name3' - or just '@name1' if there is only one name
name_list = ', '.join(names[:-1]) + (f' and {names[-1]}' if len(names) > 1 else names[0])
in_reply_to_status_id = tweet['in_reply_to_status_id']
replying_to_url = f'https://twitter.com/{in_reply_to_screen_name}/status/{in_reply_to_status_id}'
header_markdown += f'Replying to [{escape_markdown(name_list)}]({replying_to_url})\n\n'
header_html += f'Replying to <a href="{replying_to_url}">{name_list}</a><br>'
# escape tweet body for markdown rendering:
# no need to escape here as it will be tokenized again.
body_markdown = escape_markdown(body_markdown)
#body_markdown = body_markdown.strip().replace("\n", r"\n")
# replace image URLs with image links to local files
if 'entities' in tweet and 'media' in tweet['entities'] and 'extended_entities' in tweet \
and 'media' in tweet['extended_entities']:
original_url = tweet['entities']['media'][0]['url']
markdown = ''
html = ''
for media in tweet['extended_entities']['media']:
if 'url' in media and 'media_url' in media:
original_expanded_url = media['media_url']
original_filename = os.path.split(original_expanded_url)[1]
archive_media_filename = tweet_id_str + '-' + original_filename
archive_media_path = os.path.join(paths.dir_input_media, archive_media_filename)
file_output_media = os.path.join(paths.dir_output_media, archive_media_filename)
media_url = rel_url(file_output_media, paths.example_file_output_tweets)
markdown += '' if not markdown and body_markdown == escape_markdown(original_url) else '\n\n'
#markdown += '' if not markdown and body_markdown == original_url else '\n\n'
html += '' if not html and body_html == original_url else '<br>'
if os.path.isfile(archive_media_path):
# Found a matching image, use this one
if not os.path.isfile(file_output_media):
shutil.copy(archive_media_path, file_output_media)
#markdown += f'![]({media_url})'
# ignore media at this moment
markdown += '(media)'
html += f'<img src="{media_url}"/>'
# Save the online location of the best-quality version of this file, for later upgrading if wanted
best_quality_url = f'https://pbs.twimg.com/media/{original_filename}:orig'
media_sources.append(
(os.path.join(paths.dir_output_media, archive_media_filename), best_quality_url)
)
else:
# Is there any other file that includes the tweet_id in its filename?
archive_media_paths = glob.glob(os.path.join(paths.dir_input_media, tweet_id_str + '*'))
if len(archive_media_paths) > 0:
for archive_media_path in archive_media_paths:
archive_media_filename = os.path.split(archive_media_path)[-1]
file_output_media = os.path.join(paths.dir_output_media, archive_media_filename)
media_url = rel_url(file_output_media, paths.example_file_output_tweets)
if not os.path.isfile(file_output_media):
shutil.copy(archive_media_path, file_output_media)
#markdown += f'<video controls><source src="{media_url}">Your browser ' \
# f'does not support the video tag.</video>\n'
markdown += '(video)'
html += f'<video controls><source src="{media_url}">Your browser ' \
f'does not support the video tag.</video>\n'
# Save the online location of the best-quality version of this file,
# for later upgrading if wanted
if 'video_info' in media and 'variants' in media['video_info']:
best_quality_url = ''
best_bitrate = -1 # some valid videos are marked with bitrate=0 in the JSON
for variant in media['video_info']['variants']:
if 'bitrate' in variant:
bitrate = int(variant['bitrate'])
if bitrate > best_bitrate:
best_quality_url = variant['url']
best_bitrate = bitrate
if best_bitrate == -1:
print(f"Warning No URL found for {original_url} {original_expanded_url} "
f"{archive_media_path} {media_url}")
print(f"JSON: {tweet}")
else:
media_sources.append(
(os.path.join(paths.dir_output_media, archive_media_filename),
best_quality_url)
)
else:
print(f'Warning: missing local file: {archive_media_path}. Using original link instead: '
f'{original_url} (expands to {original_expanded_url})')
#markdown += f'![]({original_url})'
markdown += "(media)"
html += f'<a href="{original_url}">{original_url}</a>'
body_markdown = body_markdown.replace(escape_markdown(original_url), markdown)
#body_markdown = body_markdown.replace(original_url, markdown)
body_html = body_html.replace(original_url, html)
# make the body a quote
body_markdown = body_markdown
body_html = '<p><blockquote>' + '<br>\n'.join(body_html.splitlines()) + '</blockquote>'
# append the original Twitter URL as a link
original_tweet_url = f'https://twitter.com/{username}/status/{tweet_id_str}'
icon_url = rel_url(paths.file_tweet_icon, paths.example_file_output_tweets)
# we do not like header because it's slightly confusing
# TODO: consider if we do need images in this markdown
# timestamp is not needed as it's for corpus
body_markdown = body_markdown #+ f'\n\n<img src="{icon_url}" width="12" /> ' \
#f'[{timestamp_str}]({original_tweet_url})'
body_html = header_html + body_html + f'<a href="{original_tweet_url}"><img src="{icon_url}" ' \
f'width="12" /> {timestamp_str}</a></p>'
# extract user_id:handle connections
if 'in_reply_to_user_id' in tweet and 'in_reply_to_screen_name' in tweet and \
tweet['in_reply_to_screen_name'] is not None:
reply_to_id = tweet['in_reply_to_user_id']
if int(reply_to_id) >= 0: # some ids are -1, not sure why
handle = tweet['in_reply_to_screen_name']
users[reply_to_id] = UserData(user_id=reply_to_id, handle=handle)
if 'entities' in tweet and 'user_mentions' in tweet['entities'] and tweet['entities']['user_mentions'] is not None:
for mention in tweet['entities']['user_mentions']:
if mention is not None and 'id' in mention and 'screen_name' in mention:
mentioned_id = mention['id']
if int(mentioned_id) >= 0: # some ids are -1, not sure why
handle = mention['screen_name']
if handle is not None:
users[mentioned_id] = UserData(user_id=mentioned_id, handle=handle)
# extract the type of tweet
in_reply_to, quote, retweet = collect_tweet_references(tweet)
return timestamp, tweet_id_str, body_markdown, body_html, in_reply_to, quote, retweet
def find_files_input_tweets(dir_path_input_data):
"""Identify the tweet archive's file and folder names -
they change slightly depending on the archive size it seems."""
input_tweets_file_templates = ['tweet.js', 'tweets.js', 'tweets-part*.js']
files_paths_input_tweets = []
for input_tweets_file_template in input_tweets_file_templates:
files_paths_input_tweets += glob.glob(os.path.join(dir_path_input_data, input_tweets_file_template))
if len(files_paths_input_tweets)==0:
print(f'Error: no files matching {input_tweets_file_templates} in {dir_path_input_data}')
exit()
return files_paths_input_tweets
def find_dir_input_media(dir_path_input_data):
input_media_dir_templates = ['tweet_media', 'tweets_media']
input_media_dirs = []
for input_media_dir_template in input_media_dir_templates:
input_media_dirs += glob.glob(os.path.join(dir_path_input_data, input_media_dir_template))
if len(input_media_dirs) == 0:
print(f'Error: no folders matching {input_media_dir_templates} in {dir_path_input_data}')
exit()
if len(input_media_dirs) > 1:
print(f'Error: multiple folders matching {input_media_dir_templates} in {dir_path_input_data}')
exit()
return input_media_dirs[0]
def download_file_if_larger(url, filename, index, count, sleep_time):
"""Attempts to download from the specified URL. Overwrites file if larger.
Returns whether the file is now known to be the largest available, and the number of bytes downloaded.
"""
requests = import_module('requests')
imagesize = import_module('imagesize')
pref = f'{index:3d}/{count:3d} {filename}: '
# Sleep briefly, in an attempt to minimize the possibility of trigging some auto-cutoff mechanism
if index > 1:
print(f'{pref}Sleeping...', end='\r')
time.sleep(sleep_time)
# Request the URL (in stream mode so that we can conditionally abort depending on the headers)
print(f'{pref}Requesting headers for {url}...', end='\r')
byte_size_before = os.path.getsize(filename)
try:
with requests.get(url, stream=True, timeout=2) as res:
if not res.status_code == 200:
# Try to get content of response as `res.text`.
# For twitter.com, this will be empty in most (all?) cases.
# It is successfully tested with error responses from other domains.
raise Exception(f'Download failed with status "{res.status_code} {res.reason}". '
f'Response content: "{res.text}"')
byte_size_after = int(res.headers['content-length'])
if byte_size_after != byte_size_before:
# Proceed with the full download
tmp_filename = filename+'.tmp'
print(f'{pref}Downloading {url}... ', end='\r')
with open(tmp_filename,'wb') as f:
shutil.copyfileobj(res.raw, f)
post = f'{byte_size_after/2**20:.1f}MB downloaded'
width_before, height_before = imagesize.get(filename)
width_after, height_after = imagesize.get(tmp_filename)
pixels_before, pixels_after = width_before * height_before, width_after * height_after
pixels_percentage_increase = 100.0 * (pixels_after - pixels_before) / pixels_before
if width_before == -1 and height_before == -1 and width_after == -1 and height_after == -1:
# could not check size of both versions, probably a video or unsupported image format
os.replace(tmp_filename, filename)
bytes_percentage_increase = 100.0 * (byte_size_after - byte_size_before) / byte_size_before
logging.info(f'{pref}SUCCESS. New version is {bytes_percentage_increase:3.0f}% '
f'larger in bytes (pixel comparison not possible). {post}')
return True, byte_size_after
elif width_before == -1 or height_before == -1 or width_after == -1 or height_after == -1:
# could not check size of one version, this should not happen (corrupted download?)
logging.info(f'{pref}SKIPPED. Pixel size comparison inconclusive: '
f'{width_before}*{height_before}px vs. {width_after}*{height_after}px. {post}')
return False, byte_size_after
elif pixels_after >= pixels_before:
os.replace(tmp_filename, filename)
bytes_percentage_increase = 100.0 * (byte_size_after - byte_size_before) / byte_size_before
if bytes_percentage_increase >= 0:
logging.info(f'{pref}SUCCESS. New version is {bytes_percentage_increase:3.0f}% larger in bytes '
f'and {pixels_percentage_increase:3.0f}% larger in pixels. {post}')
else:
logging.info(f'{pref}SUCCESS. New version is actually {-bytes_percentage_increase:3.0f}% '
f'smaller in bytes but {pixels_percentage_increase:3.0f}% '
f'larger in pixels. {post}')
return True, byte_size_after
else:
logging.info(f'{pref}SKIPPED. Online version has {-pixels_percentage_increase:3.0f}% '
f'smaller pixel size. {post}')
return True, byte_size_after
else:
logging.info(f'{pref}SKIPPED. Online version is same byte size, assuming same content. Not downloaded.')
return True, 0
except Exception as err:
logging.error(f"{pref}FAIL. Media couldn't be retrieved from {url} because of exception: {err}")
return False, 0
def download_larger_media(media_sources, paths: PathConfig):
"""Uses (filename, URL) tuples in media_sources to download files from remote storage.
Aborts downloads if the remote file is the same size or smaller than the existing local version.
Retries the failed downloads several times, with increasing pauses between each to avoid being blocked.
"""
# Log to file as well as the console
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')
mkdirs_for_file(paths.file_download_log)
logfile_handler = logging.FileHandler(filename=paths.file_download_log, mode='w')
logfile_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(logfile_handler)
# Download new versions
start_time = time.time()
total_bytes_downloaded = 0
sleep_time = 0.25
remaining_tries = 5
while remaining_tries > 0:
number_of_files = len(media_sources)
success_count = 0
retries = []
for index, (local_media_path, media_url) in enumerate(media_sources):
success, bytes_downloaded = download_file_if_larger(
media_url, local_media_path, index + 1, number_of_files, sleep_time
)
if success:
success_count += 1
else:
retries.append((local_media_path, media_url))
total_bytes_downloaded += bytes_downloaded
# show % done and estimated remaining time:
time_elapsed: float = time.time() - start_time
estimated_time_per_file: float = time_elapsed / (index + 1)
estimated_time_remaining: datetime.datetime = \
datetime.datetime.fromtimestamp(
(number_of_files - (index + 1)) * estimated_time_per_file,
tz=datetime.timezone.utc
)
if estimated_time_remaining.hour >= 1:
time_remaining_string: str = \
f"{estimated_time_remaining.hour} hour{'' if estimated_time_remaining.hour == 1 else 's'} " \
f"{estimated_time_remaining.minute} minute{'' if estimated_time_remaining.minute == 1 else 's'}"
elif estimated_time_remaining.minute >= 1:
time_remaining_string: str = \
f"{estimated_time_remaining.minute} minute{'' if estimated_time_remaining.minute == 1 else 's'} " \
f"{estimated_time_remaining.second} second{'' if estimated_time_remaining.second == 1 else 's'}"
else:
time_remaining_string: str = \
f"{estimated_time_remaining.second} second{'' if estimated_time_remaining.second == 1 else 's'}"
if index + 1 == number_of_files:
print(' 100 % done.')
else:
print(f' {(100*(index+1)/number_of_files):.1f} % done, about {time_remaining_string} remaining...')
media_sources = retries
remaining_tries -= 1
sleep_time += 2
logging.info(f'\n{success_count} of {number_of_files} tested media files '
f'are known to be the best-quality available.\n')
if len(retries) == 0:
break
if remaining_tries > 0:
print(f'----------------------\n\nRetrying the ones that failed, with a longer sleep. '
f'{remaining_tries} tries remaining.\n')
end_time = time.time()
logging.info(f'Total downloaded: {total_bytes_downloaded/2**20:.1f}MB = {total_bytes_downloaded/2**30:.2f}GB')
logging.info(f'Time taken: {end_time-start_time:.0f}s')
print(f'Wrote log to {paths.file_download_log}')
def parse_tweets(username, users, html_template, paths: PathConfig, lang):
"""Read tweets from paths.files_input_tweets, write to *.md and *.html.
Copy the media used to paths.dir_output_media.
Collect user_id:user_handle mappings for later use, in 'users'.
Returns the mapping from media filename to best-quality URL.
"""
tweets = []
media_sources = []
for tweets_js_filename in paths.files_input_tweets:
jsons = read_json_from_js_file(tweets_js_filename)
for tweet in jsons:
result = convert_tweet(tweet, username, media_sources, users, paths)
if result[0]:
# check valid tweet by timestamp
tweets.append(result)
tweets.sort(key=lambda tup: tup[0]) # oldest first
# Group tweets by month
grouped_tweets = defaultdict(list)
for timestamp, ids, md, html, in_reply_to, quote, retweet in tweets:
# Use a (markdown) filename that can be imported into Jekyll: YYYY-MM-DD-your-title-here.md
dt = datetime.datetime.fromtimestamp(timestamp)
grouped_tweets[(dt.year, dt.month)].append((ids, md, in_reply_to, quote, retweet))
final_md = []
for (year, month), content in grouped_tweets.items():
# Write into *.md files
for id, md, in_reply_to, quote, retweet in content:
final_md.append((id, md, in_reply_to, quote, retweet))
#md_path = paths.create_path_for_file_output_tweets(year, month, format="md")
md_path = "tweets.md"
write_json(md_path, final_md, lang)
# Write into *.html files
# no need, as we gonna output markdowns only
print(f'Wrote {len(tweets)} tweets to *.md and *.html, '
f'with images and video embedded from {paths.dir_output_media}')
return media_sources
def collect_user_ids_from_followings(paths) -> list:
"""
Collect all user ids that appear in the followings archive data.
(For use in bulk online lookup from Twitter.)
"""
# read JSON file from archive
following_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'following.js'))
# collect all user ids in a list
following_ids = []
for follow in following_json:
if 'following' in follow and 'accountId' in follow['following']:
following_ids.append(follow['following']['accountId'])
return following_ids
def parse_followings(users, user_id_url_template, paths: PathConfig):
"""Parse paths.dir_input_data/following.js, write to paths.file_output_following.
"""
following = []
following_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'following.js'))
following_ids = []
for follow in following_json:
if 'following' in follow and 'accountId' in follow['following']:
following_ids.append(follow['following']['accountId'])
for following_id in following_ids:
handle = users[following_id].handle if following_id in users else '~unknown~handle~'
following.append(handle + ' ' + user_id_url_template.format(following_id))
following.sort()
following_output_path = paths.create_path_for_file_output_single(format="txt", kind="following")
with open_and_mkdirs(following_output_path) as f:
f.write('\n'.join(following))
print(f"Wrote {len(following)} accounts to {following_output_path}")
def collect_user_ids_from_followers(paths) -> list:
"""
Collect all user ids that appear in the followers archive data.
(For use in bulk online lookup from Twitter.)
"""
# read JSON file from archive
follower_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'follower.js'))
# collect all user ids in a list
follower_ids = []
for follower in follower_json:
if 'follower' in follower and 'accountId' in follower['follower']:
follower_ids.append(follower['follower']['accountId'])
return follower_ids
def parse_followers(users, user_id_url_template, paths: PathConfig):
"""Parse paths.dir_input_data/followers.js, write to paths.file_output_followers.
"""
followers = []
follower_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'follower.js'))
follower_ids = []
for follower in follower_json:
if 'follower' in follower and 'accountId' in follower['follower']:
follower_ids.append(follower['follower']['accountId'])
for follower_id in follower_ids:
handle = users[follower_id].handle if follower_id in users else '~unknown~handle~'
followers.append(handle + ' ' + user_id_url_template.format(follower_id))
followers.sort()
followers_output_path = paths.create_path_for_file_output_single(format="txt", kind="followers")
with open_and_mkdirs(followers_output_path) as f:
f.write('\n'.join(followers))
print(f"Wrote {len(followers)} accounts to {followers_output_path}")
def chunks(lst: list, n: int):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def collect_user_ids_from_direct_messages(paths) -> list:
"""
Collect all user ids that appear in the direct messages archive data.
(For use in bulk online lookup from Twitter.)
"""
# read JSON file from archive
dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages.js'))
# collect all user ids in a set
dms_user_ids = set()
for conversation in dms_json:
if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
dm_conversation = conversation['dmConversation']
conversation_id = dm_conversation['conversationId']
user1_id, user2_id = conversation_id.split('-')
dms_user_ids.add(user1_id)
dms_user_ids.add(user2_id)
return list(dms_user_ids)
def parse_direct_messages(username, users, user_id_url_template, paths: PathConfig):
"""Parse paths.dir_input_data/direct-messages.js, write to one markdown file per conversation.
"""
# read JSON file
dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages.js'))
# Parse the DMs and store the messages in a dict
conversations_messages = defaultdict(list)
for conversation in dms_json:
if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
dm_conversation = conversation['dmConversation']
conversation_id = dm_conversation['conversationId']
user1_id, user2_id = conversation_id.split('-')
messages = []
if 'messages' in dm_conversation:
for message in dm_conversation['messages']:
if 'messageCreate' in message:
message_create = message['messageCreate']
if all(tag in message_create for tag in ['senderId', 'recipientId', 'text', 'createdAt']):
from_id = message_create['senderId']
to_id = message_create['recipientId']
body = message_create['text']
# replace t.co URLs with their original versions
if 'urls' in message_create and len(message_create['urls']) > 0:
for url in message_create['urls']:
if 'url' in url and 'expanded' in url:
expanded_url = url['expanded']
body = body.replace(url['url'], expanded_url)
# escape message body for markdown rendering:
body_markdown = escape_markdown(body)
# replace image URLs with image links to local files
if 'mediaUrls' in message_create \
and len(message_create['mediaUrls']) == 1 \
and 'urls' in message_create:
original_expanded_url = message_create['urls'][0]['expanded']
message_id = message_create['id']
media_hash_and_type = message_create['mediaUrls'][0].split('/')[-1]
media_id = message_create['mediaUrls'][0].split('/')[-2]
archive_media_filename = f'{message_id}-{media_hash_and_type}'
new_url = os.path.join(paths.dir_output_media, archive_media_filename)
archive_media_path = \
os.path.join(paths.dir_input_data, 'direct_messages_media', archive_media_filename)
if os.path.isfile(archive_media_path):
# found a matching image, use this one
if not os.path.isfile(new_url):
shutil.copy(archive_media_path, new_url)
image_markdown = f'\n![]({new_url})\n'
body_markdown = body_markdown.replace(
escape_markdown(original_expanded_url), image_markdown
)
# Save the online location of the best-quality version of this file,
# for later upgrading if wanted
best_quality_url = \
f'https://ton.twitter.com/i//ton/data/dm/' \
f'{message_id}/{media_id}/{media_hash_and_type}'
# there is no ':orig' here, the url without any suffix has the original size
# TODO: a cookie (and a 'Referer: https://twitter.com' header)
# is needed to retrieve it, so the url might be useless anyway...
# WARNING: Do not uncomment the statement below until the cookie problem is solved!
# media_sources.append(
# (
# os.path.join(output_media_folder_name, archive_media_filename),
# best_quality_url
# )
# )
else:
archive_media_paths = glob.glob(
os.path.join(paths.dir_input_data, 'direct_messages_media', message_id + '*'))
if len(archive_media_paths) > 0:
for archive_media_path in archive_media_paths:
archive_media_filename = os.path.split(archive_media_path)[-1]
media_url = os.path.join(paths.dir_output_media, archive_media_filename)
if not os.path.isfile(media_url):
shutil.copy(archive_media_path, media_url)
video_markdown = f'\n<video controls><source src="{media_url}">' \
f'Your browser does not support the video tag.</video>\n'
body_markdown = body_markdown.replace(
escape_markdown(original_expanded_url), video_markdown
)
# TODO: maybe also save the online location of the best-quality version for videos?
# (see above)
else:
print(f'Warning: missing local file: {archive_media_path}. '
f'Using original link instead: {original_expanded_url})')
created_at = message_create['createdAt'] # example: 2022-01-27T15:58:52.744Z
timestamp = \
int(round(datetime.datetime.strptime(created_at, '%Y-%m-%dT%X.%fZ').timestamp()))
from_handle = escape_markdown(users[from_id].handle) if from_id in users \
else user_id_url_template.format(from_id)
to_handle = escape_markdown(users[to_id].handle) if to_id in users \
else user_id_url_template.format(to_id)
# make the body a quote
body_markdown = '> ' + '\n> '.join(body_markdown.splitlines())
message_markdown = f'{from_handle} -> {to_handle}: ({created_at}) \n\n' \
f'{body_markdown}'
messages.append((timestamp, message_markdown))
# find identifier for the conversation
other_user_id = user2_id if (user1_id in users and users[user1_id].handle == username) else user1_id
# collect messages per identifying user in conversations_messages dict
conversations_messages[other_user_id].extend(messages)
# output as one file per conversation (or part of long conversation)
num_written_messages = 0
num_written_files = 0
for other_user_id, messages in conversations_messages.items():
# sort messages by timestamp
messages.sort(key=lambda tup: tup[0])
other_user_name = escape_markdown(users[other_user_id].handle) if other_user_id in users \
else user_id_url_template.format(other_user_id)
other_user_short_name: str = users[other_user_id].handle if other_user_id in users else other_user_id
escaped_username = escape_markdown(username)
# if there are more than 1000 messages, the conversation was split up in the twitter archive.
# following this standard, also split up longer conversations in the output files:
if len(messages) > 1000:
for chunk_index, chunk in enumerate(chunks(messages, 1000)):
markdown = ''
markdown += f'### Conversation between {escaped_username} and {other_user_name}, ' \
f'part {chunk_index+1}: ###\n\n----\n\n'
markdown += '\n\n----\n\n'.join(md for _, md in chunk)
conversation_output_path = paths.create_path_for_file_output_dms(name=other_user_short_name, index=(chunk_index + 1), format="md")
# write part to a markdown file
with open_and_mkdirs(conversation_output_path) as f:
f.write(markdown)
print(f'Wrote {len(chunk)} messages to {conversation_output_path}')
num_written_files += 1
else:
markdown = ''
markdown += f'### Conversation between {escaped_username} and {other_user_name}: ###\n\n----\n\n'
markdown += '\n\n----\n\n'.join(md for _, md in messages)
conversation_output_path = paths.create_path_for_file_output_dms(name=other_user_short_name, format="md")
with open_and_mkdirs(conversation_output_path) as f:
f.write(markdown)
print(f'Wrote {len(messages)} messages to {conversation_output_path}')
num_written_files += 1
num_written_messages += len(messages)
print(f"\nWrote {len(conversations_messages)} direct message conversations "
f"({num_written_messages} total messages) to {num_written_files} markdown files\n")
def make_conversation_name_safe_for_filename(conversation_name: str) -> str:
"""
Remove/replace characters that could be unsafe in filenames
"""
forbidden_chars = \
['"', "'", '*', '/', '\\', ':', '<', '>', '?', '|', '!', '@', ';', ',', '=', '.', '\n', '\r', '\t']
new_conversation_name = ''
for char in conversation_name:
if char in forbidden_chars:
new_conversation_name = new_conversation_name + '_'
elif char.isspace():
# replace spaces with underscores
new_conversation_name = new_conversation_name + '_'
elif char == 0x7F or (0x1F >= ord(char) >= 0x00):
# 0x00 - 0x1F and 0x7F are also forbidden, just discard them
continue
else:
new_conversation_name = new_conversation_name + char
return new_conversation_name
def find_group_dm_conversation_participant_ids(conversation: dict) -> set:
"""
Find IDs of all participating Users in a group direct message conversation
"""
group_user_ids = set()
if 'dmConversation' in conversation and 'conversationId' in conversation['dmConversation']:
dm_conversation = conversation['dmConversation']
if 'messages' in dm_conversation:
for message in dm_conversation['messages']:
if 'messageCreate' in message:
group_user_ids.add(message['messageCreate']['senderId'])
elif 'joinConversation' in message:
group_user_ids.add(message['joinConversation']['initiatingUserId'])
for participant_id in message['joinConversation']['participantsSnapshot']:
group_user_ids.add(participant_id)
elif "participantsJoin" in message:
group_user_ids.add(message['participantsJoin']['initiatingUserId'])
for participant_id in message['participantsJoin']['userIds']:
group_user_ids.add(participant_id)
return group_user_ids
def collect_user_ids_from_group_direct_messages(paths) -> list:
"""
Collect all user ids that appear in the group direct messages archive data.
(For use in bulk online lookup from Twitter.)
"""
# read JSON file from archive
group_dms_json = read_json_from_js_file(os.path.join(paths.dir_input_data, 'direct-messages-group.js'))
# collect all user ids in a set
group_dms_user_ids = set()