-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathserver_update.py
1844 lines (1126 loc) · 52.2 KB
/
server_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import sys
# Before we do ANYTHING, we check to make sure python is the correct version!
if sys.version_info < (3,7,0):
sys.stdout.write("\n--== [ Invalid python version! ] ==--\n")
sys.stdout.write("Current version: {}\n".format(version_info))
sys.stdout.write("Expected version: 3.7+\n")
sys.stdout.write("\nPlease install the correct version of python before continuing!\n")
sys.exit()
import tempfile
import urllib.request
import os
import shutil
import json
import traceback
import argparse
from urllib.error import URLError
from http.client import HTTPResponse
from hashlib import sha256
from typing import Any, Callable, List, Sequence, Tuple, Union
from json.decoder import JSONDecodeError
from math import ceil
"""
A Set of tools to automate the server update process.
"""
__version__ = '2.2.3'
# These variables contain links for the script updating process.
GITHUB = 'https://github.com/Owen-Cochell/PaperMC-Update'
GITHUB_RELEASE = 'https://api.github.com/repos/Owen-Cochell/PaperMC-Update/releases/latest'
GITHUB_RAW = 'https://raw.githubusercontent.com/Owen-Cochell/PaperMC-Update/master/server_update.py'
def load_config_old(config: dict) -> Tuple[str, int]:
"""
Loads configuration data from the given file.
We only load version info if it's in the official format!
We return the version and build number found in the configuration file.
This function looks for config data in the old format,
which at this time seems to be pre 1.21 (TODO: Confirm)
We preform a check to see if the data looks correct,
the current version string MUST start with 'git-Paper',
otherwise we will raise a value error.
:param config: JSON data to consider
:type config: dict
:return: Tuple containing version and build data respectively
:rtype: Tuple[str, int]
"""
OLD_PREFIX = 'git-Paper' # Old prefix
# Read the data, and attempt to pull some info out of it
current = config['currentVersion']
# Ensure string prefix is correct:
if not OLD_PREFIX == current[:len(OLD_PREFIX)]:
# Does not match! Must be invalid ...
raise ValueError("Invalid old config data format!")
# Splitting the data in two so we can pull some content out:
build, version = current.split(" ", 1)
# Getting build information:
build = int(build.split("-")[-1])
# Getting version information:
version = version[5:-1]
# Returning version information:
return version, build
def load_config_new(config: dict) -> Tuple[str, int]:
"""
Loads configuration data from the given file.
We only load version info if it's in the official format!
We return the version and build number found in the configuration file.
This function looks for config data in the new format,
which at this time seems to be post 1.21 (TODO: Confirm)
:param config: JSON data to consider
:type config: dict
:return: Tuple containing version and build data respectively
:rtype: Tuple[str, int]
"""
# Read the data, and attempt to pull some info out of it
current = config['currentVersion']
# Splitting the data in two so we can pull some content out:
split = current.split(" ")[0].split("-")
# Getting build information:
build = int(split[1])
# Getting version information:
version = split[0]
# Returning version information:
return version, build
def upgrade_script(serv: ServerUpdater):
"""
Upgrades this script.
We do this by checking github for any new releases,
comparing them to our version,
and then updating if necessary.
We use the ServerUpdater to do this operation for us,
so you will need to provide it for this function
to work correctly.
:param serv: ServerUpdater to use
:type serv: ServerUpdater
"""
output("# Checking for update ...")
# Creating request here:
req = urllib.request.Request(GITHUB_RELEASE, headers={'Accept': 'application/vnd.github.v3+json'})
# Getting data:
data = json.loads(urllib.request.urlopen(req).read())
# Checking if the version is new:
if data['tag_name'] == __version__:
# No update necessary, lets log and exit:
output("# No update necessary!\n")
return
output("# New version available!")
url = GITHUB_RAW
path = os.path.realpath(__file__)
# Determine if we are working in a frozen environment:
if getattr(sys, 'frozen', False):
print("# Can't upgrade frozen files!")
return
serv.fileutil.path = path
# Getting data:
data = urllib.request.urlopen(urllib.request.Request(url))
# Write the data:
serv.fileutil.create_temp_dir()
temp_path = os.path.realpath(serv.fileutil.temp.name + '/temp')
file = open(temp_path, mode='wb')
file.write(data.read())
# Install the new script:
serv.fileutil.install(temp_path, path)
# We are done!
output("# Script update complete!\n")
def output(text: str):
"""
Outputs text to the terminal via print,
will not print content if we are in quiet mode.
"""
if not args.quiet:
# We are not quieted, print the content
print(text)
def error_report(exc, net: bool=False):
"""
Function for displaying error information to the terminal.
:param exc: Exception object
:param net: Whether to include network information
:type net: bool
"""
print("+==================================================+")
print(" [ --== The Following Error Has Occurred: ==-- ]")
print("+==================================================+")
# Print error name
print("Error Name: {}".format(exc))
print("+==================================================+")
# Print full traceback:
print("Full Traceback:")
traceback.print_exc()
if net:
# Include extra network information
print("+==================================================+")
print("Extra Network Information:")
if hasattr(exc, 'url'):
print("Attempted URL: {}".format(exc.url))
if hasattr(exc, 'reason'):
print("We failed to reach the server.")
print("Reason: {}".format(exc.reason))
if hasattr(exc, 'code'):
print("The server could not fulfill the request.")
print("Error code: {}".format(exc.code))
print("+==================================================+")
print("(Can you make anything of this?)")
print("Please check the github page for more info: https://github.com/Owen-Cochell/PaperMC-Update.")
return
def progress_bar(length: int, stepsize: int, total_steps: int, step: int, prefix: str="Downloading:", size: int=60, prog_char: str="#", empty_char: str="."):
"""
Outputs a simple progress bar to stdout.
We act as a generator, continuing to iterate and add to the bar progress
as we download more information.
:param legnth: Length of data to download
:type length: int
:param stepsize: Size of each step
:type stepsize: int
:param total_steps: Total number of steps
:type total_steps: int
:param step: Step number we are on
:type step: int
:param prefix: Prefix to use for the progress bar
:type prefix: str
:param size: Number of characters on the progress bar
:type size: int
:param prog_char: Character to use for progress
:type prog_char: str
:param empty_char: Character to use for empty progress
:type empty_char: str
"""
# Calculate number of '#' to render:
x = int(size*(step+1)/total_steps)
# Rendering progress bar:
if not args.quiet:
sys.stdout.write("{}[{}{}] {}/{}\r".format(prefix, prog_char*x, empty_char*(size-x),
(step*stepsize if step < total_steps - 1 else length), length))
sys.stdout.flush()
if not args.quiet and step >= total_steps - 1 :
sys.stdout.write("\n")
sys.stdout.flush()
class Update:
"""
Server updater, handles checking, downloading, and installing.
This class facilitates communication between this script and the Paper v2 API.
We offer methods to retrieve available versions, builds,
and other information about downloads.
Users can download the final jar file using this class as well.
We also offer the ability to generate download URLs,
so the user can download the files in any way they see fit.
"""
def __init__(self):
self._base = 'https://api.papermc.io/v2/projects/paper' # Base URL to build of off
self._headers = {
'Content-Type': 'application/json;charset=UTF-8',
'Accept': 'application/json, text/plain, */*',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:43.0) Gecko/20100101 Firefox/43.0',
'Accept-Language': 'en-US,en;q=0.5',
'DNT': '1',
} # Request headers for contacting Paper Download API, emulating a Firefox client
self.download_path = '' # Path the file was downloaded to
self.cache = {} # A basic cache for saving responses
def _none_function(self, length, blocksize, total, step, *args, **kwargs):
"""
Dummy function that does nothing.
"""
pass
def version_convert(self, ver: str) -> Tuple[int,int,int]:
"""
Converts the version string into a tuple that can be used for comparison.
This tuple contains three numbers, each of which can be used
in equality operations.
This can be used to determine if a given version is greater or lessor than another.
:param ver: Version string to convert
:type ver: str
:return: Tuple contaning version information
:rtype: Tuple[int,int,int]
"""
# Convert and return the tuple:
temp: List[int] = []
for item in ver.split('.'):
# Convert and add the item:
temp.append(int(item))
return (temp[0], temp[1], temp[2])
def build_data_url(self, version: str=None, build_num: int=None) -> str:
"""
Builds a valid URL for retrieving version data.
The user can use this URL to retrieve various versioning data.
If version and build_num are not specified,
then general paper info is returned:
https://papermc.io/api/v2/projects/paper
If build_num is not specified,
then general data about the specified data is returned:
https://papermc.io/api/v2/projects/paper/versions/[version]
If both arguments are provided,
then data about the specific version and build is returned:
https://papermc.io/api/v2/projects/paper/versions/[version]/builds/[build_num]
:param version: Version to fetch info for, defaults to None
:type version: str, optional
:param build_num: Build number to get info for, defaults to None
:type build_num: int, optional
:return: URL of the data
:rtype: str
"""
# Building url:
final = self._base
if version is not None:
# Specific version requested:
final = final + '/versions/' + str(version)
if build_num is not None:
# Specific build num requested:
final = final + '/builds/' + str(build_num)
# Return the URL:
return final
def build_download_url(self, version: str, build_num:int):
"""
Builds a valid download URL that can be used to download a file.
We use the version and build number to generate this URL.
The user can use this URL to download the file
using any method of their choice.
:param version: Version to download
:type version: str
:param build_num: Build number to download, defaults to 'latest'
:type build_num: str, optional
"""
# Get download name
download_name = self.get(version, build_num)['downloads']['application']['name']
# Build and return the URL:
return self._base + '/versions/' + str(version) + '/builds/' + str(build_num) + '/downloads/' + str(download_name)
def download_response(self, version: str, build_num:int) -> HTTPResponse:
"""
Calls the underlying urllib library and return the object generated.
This object is usually a HTTPResponse object.
The user can use this object in any way they see fit.
We automatically generate the URL using the version and build_num given.
:param url: URL of file to download
:type url: str
:return: Object returned by urllib
:rtype: HTTPResponse
"""
# Build the URL
url = self.build_download_url(version, build_num)
# Creating request here:
req = urllib.request.Request(url, headers=self._headers)
# Sending request to Paper API
return urllib.request.urlopen(req)
def download_file(self, path: str, version: str, build_num:int, check:bool=True, call: Callable=None, args: List=None, blocksize: int=4608) -> str:
"""
Downloads the content to the given external file.
We handle all file operations,
and automatically work with the URLResponse objects
to write the file contents to an external file.
If a directory is provided,
then we will use the recommended name of the file,
and save it to the directory provided.
Users can also pass a function to be called upon each block of the download.
This can be useful to visualize or track downloads.
We will pass the total length, stepsize, total steps, and step number.
The args provided in the args parameters will be passed to the function as well.
:param path: Path to directory to write to
:type path: str
:param version: Version to download
:type version: str
:param build_num: Build to download
:type build_num: int
:param check: Boolean determining if we should check the integrity of the file
:type check: bool
:param call: Method to call upon each download iteration
:type call: Callable
:param args: Args to pass to the callable
:type args: List
:param blocksize: Number of bytes to read per copy operation
:type blocksize: int
:return: Path the file was saved to
:raises: ValueError: If file integrity check fails
"""
if args is None:
args = []
if call is None:
call = self._none_function
args = []
# Get the data:
data = self.download_response(version, build_num)
# Get filename for download:
if os.path.isdir(path):
# Use the default name:
path = os.path.join(path, data.getheader('content-disposition', default='').split("''")[1])
# Get length of file:
length = data.getheader('content-length')
# Ensure result is not None:
if length is None:
# Raise an error:
raise ValueError("Content length not present in HTTP headers!")
# Otherwise, set the length:
length = int(length)
total = ceil(length/blocksize) + 1
# Open the file:
file = open(path, mode='ba')
# Copy the downloaded data to the file:
for i in range(total):
# Call the method:
call(length, blocksize, total, i, *args)
# Get the data:
file.write(data.read(blocksize))
# Close the file:
file.close()
# Re-open the file for reading:
file = open(path, mode='rb')
if check:
# Get the ideal SHA256 hash for the file:
hash = self.get(version, build_num)['downloads']['application']['sha256']
# Checking integrity:
if not sha256(file.read()).hexdigest() == hash:
# File integrity failed! Do something...
raise ValueError("File integrity check failed!")
# Closing file:
file.close()
return path
def get_versions(self) -> Tuple[str,...]:
"""
Gets available versions of the server.
The list of versions is a tuple of strings.
Each version follows the Minecraft game version conventions.
:return: List of available versions
"""
# Returning version info
return self.get()['versions']
def get_buildnums(self, version: str) -> Tuple[int,...]:
"""
Gets available build for a particular version.
The builds are a tuple of ints,
which follow PaperMC build number conventions.
:param version: Version to get builds for
:type version: str
:return: List of builds
:rtype: Tuple[int,...]
"""
# Returning build info:
return self.get(version)['builds']
def get(self, version: str=None, build_num: int=None) -> dict:
"""
Gets RAW data from the Paper API, version info only.
We utilize some basic caching to remember responses
instead of calling the PaperMC API multiple times.
You can use this to get a list of valid versions,
list of valid builds per version,
as well as general data related to the selected version.
You should check out:
https://paper.readthedocs.io/en/latest/site/api.html
https://papermc.io/api/docs/swagger-ui/index.html?configUrl=/api/openapi/swagger-config#/
For more information on PaperMC API formatting.
We return the data in a dictionary format.
:param version: Version to include in the URL
:type version: str
:param build_num: Build number to include in the URL
:type build_num: int
:return: Dictionary of request data
:rtype: dict
"""
# Generate URL:
url = self.build_data_url(version, build_num)
# Check if we have saved content:
if url in self.cache.keys():
# We have cached content:
return self.cache[url]
# Get the data and return:
data = json.loads(self._get(version, build_num).read())
# Cache it:
self.cache[url] = data
# Return the final data:
return data
def _get(self, version: str=None, build_num: int=None) -> HTTPResponse:
"""
Gets raw data from the PaperMC download API.
This method generates the relevant URLs and returns
the HTTPResponse object representing the request.
:param version: Version to get info for, defaults to None
:type version: str, optional
:param build_num: Build to get info for, defaults to None
:type build_num: int, optional
:return: HTTPResponse representing the request
:rtype: HTTPResponse
"""
final = self.build_data_url(version, build_num)
# Check if the URL is present in the cache:
if final in self.cache.keys():
# Cached content is found, return THAT:
return self.cache[final]
# Creating request here:
req = urllib.request.Request(final, headers=self._headers)
# Getting data:
data = urllib.request.urlopen(req)
# Saving data:
self.cache[final] = data
return data
class FileUtil:
"""
Class for managing the creating/deleting/moving of server files.
"""
def __init__(self, path):
self.path: str = os.path.abspath(path) # Path to working directory
self.temp: tempfile.TemporaryDirectory # Tempdir instance
self.config_default = 'version_history.json' # Default name of paper versioning file
self.target_path = '' # Path the new file will be moved to
def create_temp_dir(self):
"""
Creates a temporary directory.
:return: Temp file instance
"""
self.temp = tempfile.TemporaryDirectory()
return self.temp
def close_temp_dir(self):
"""
Closes created temporary directory.
"""
self.temp.cleanup()
def load_config(self, config: str) -> Tuple[str, int]:
"""
Loads configuration info from 'version.json' in the server directory
We only load version info if it's in the official format!
:param config: Path to config file
:type config: str
:return: Configuration info from file
:rtype: Tuple[str, int]
"""
if config is None:
# Need to determine our config path:
if os.path.isdir(self.path):
config = os.path.join(self.path, self.config_default)
else:
config = os.path.join(os.path.dirname(self.path), self.config_default)
output("# Loading configuration data from file [{}] ...".format(config))
if not os.path.isfile(config):
print("# Unable to load config data from file at [{}] - Not found/Not a file!".format(config))
return '0', 0
# Load file content
try:
# Open file
with open(config, 'r') as file:
# Decode file
data = json.load(file)
except JSONDecodeError:
# Data not in valid JSON format.
output("# Failed to load config data - Not in JSON format!")
return '0', 0
try:
# First, try old format...
return load_config_old(data)
except Exception:
# Did not work, try something else ...
pass
# Try new format:
try:
return load_config_new(data)
except Exception:
# Extra weird errors due to formatting issues:
output("# Failed to load config data - Strange format, we support official builds only!")
return '0', 0
def install(self, file_path: str, new_path: str, target_copy: str=None, backup=True, new=False):
"""
"Installs" the contents of the temporary file into the target in the root server directory.
The new file should exist in the temporary directory before this method is invoked!
We backup the old jar file by default to the temporary directory,
and we will attempt to recover the old jar file in the event of any failures.
This feature can be disabled.
:param file_path: The path to the new file to install
:type new_file: str
:param new_path: Path to install the file to
:type new_path: str
:param target_copy: Where to copy the old file to
:type target_copy: str
:param file_name: What to rename the new file to, None for no change
:type file_name: str
:param backup: Value determining if we should back up the old file
:type backup: bool
:param new: Determines if we are doing a new install, aka if we care about file operation errors
:type new: bool
"""
output("\n[ --== Installation: ==-- ]\n")
# Checking if we should copy the old file:
if target_copy is not None and os.path.isfile(self.path):
# Copy the old file:
output("# Copying old file ...")
output("# ({} > {})".format(self.path, target_copy))
try:
shutil.copy(self.path, target_copy)
except Exception as e:
# Copy error:
self._fail_install("Old File Copy")
# Report the error:
error_report(e)
# Exit, install failed!
return False
# Creating backup of old file:
if backup and os.path.isfile(self.path) and not new:
output("# Creating backup of previous installation ...")
try:
shutil.copyfile(self.path, os.path.join(self.temp.name, 'backup'))
except Exception as e:
# Show install error
self._fail_install("File Backup")
# Show error info
error_report(e)
return False
self.target_path = new_path
output("# Backup created at: {}".format(os.path.join(self.temp.name, 'backup')))
# Determine if we should delete the original file:
if os.path.isfile(self.path) and not new:
# Removing current file:
output("# Deleting current file at {} ...".format(self.path))
try:
os.remove(self.path)
except Exception as e:
if not new:
self._fail_install("Old File Deletion")
# Showing error
error_report(e)
# Recovering backup
if backup:
self._recover_backup()
return False
output("# Removed original file!")
# Copying downloaded file to root:
try:
output("# Copying download data to root directory ...")
output("# ({} > {})".format(file_path, os.path.join(os.path.dirname(self.path), new_path)))
# Copy to the new directory with the given name:
shutil.copyfile(file_path, os.path.join(os.path.dirname(self.path), new_path))
except Exception as e:
# Install error
self._fail_install("File Copy")
# Show error
error_report(e)
# Recover backup
if backup and os.path.isfile(self.path) and not new:
self._recover_backup()
return False
return False
output("# Done copying download data to root directory!")
output("\n[ --== Installation complete! ==-- ]")
return True
def _recover_backup(self):
"""
Attempts to recover the backup of the old server jar file.
"""
print("+==================================================+")
print("\n> !ATTENTION! <")
print("A failure has occurred during the installation process.")
print("I'm sure you can see the error information above.")
print("This script will attempt to recover your old installation.")
print("If this operation fails, check the github page for more info: "
"https://github.com/Owen-Cochell/PaperMC-Update")
# Deleting file in root directory:
print("# Deleting Corrupted temporary File...")
try:
os.remove(self.target_path)
except FileNotFoundError:
# File was not found. Continuing...
print("# File not found. Continuing operation...")
except Exception as e:
print("# Critical error during recovery process!")
print("# Displaying error information:")
error_report(e)