-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotion_backup.py
131 lines (114 loc) · 4.22 KB
/
notion_backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import json
import shutil
import time
import arrow
import zipfile
from pathlib import Path
from typing import List
import requests
from slugify import slugify
from utils.config import Config
from utils.utils import FileUtils
class NotionUp:
def __init__(self) -> None:
super().__init__()
@staticmethod
def exportTask(spaceId):
return {
'task': {
'eventName': "exportSpace",
'request': {
'spaceId': spaceId,
'exportOptions': {
'exportType': 'markdown',
'timeZone': Config.notion_timezone(),
'locale': Config.notion_locale(),
}
}
}
}
@staticmethod
def requestPost(endpoint: str, params: object):
response = requests.post(
f'{Config.notion_api()}/{endpoint}',
data=json.dumps(params).encode('utf8'),
headers={
'content-type': 'application/json',
'cookie': f'token_v2={Config.token_v2()}; '
},
)
return response.json()
@staticmethod
def getUserContent():
return NotionUp.requestPost("loadUserContent", {})["recordMap"]
@staticmethod
def waitForExportedUrl(taskId):
print('Polling for export task: {}'.format(taskId))
t_0 = arrow.now()
wait_nums = 0
while True:
res = NotionUp.requestPost('getTasks', {'taskIds': [taskId]})
tasks = res.get('results')
task = next(t for t in tasks if t['id'] == taskId)
if task['state'] == 'success':
url = task['status']['exportURL']
print('\n' + url)
break
elif task['state'] == 'in_progress':
print(f'\rDuration: {arrow.now() - t_0}', end="", flush=False)
time.sleep(10)
elif task['state'] == 'not_started':
wait_nums += 1
print(f'not started, wait {wait_nums} times.')
time.sleep(10)
else:
raise Exception('\n' + 'Error! Here is the return message:\n' + str(res))
print('export state success\n')
return url
@staticmethod
def downloadFile(url, filename) -> str:
file = FileUtils.new_file(Config.output(), filename)
FileUtils.create_file(file)
with requests.get(url, stream=True) as r:
with open(file, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return file
@staticmethod
def backup() -> List[str]:
zips = []
# tokens
userContent = NotionUp.getUserContent()
userId = list(userContent["notion_user"].keys())[0]
print(f"User id: {userId}")
# list spaces
spaces = [(space_id, space_details["value"]["name"]) for (space_id, space_details) in userContent["space"].items()]
print("Available spaces total: {}".format(len(spaces)))
for (spaceId, spaceName) in spaces:
print(f"\nexport space: {spaceId}, {spaceName}")
# request export task
taskId = NotionUp.requestPost('enqueueTask', NotionUp.exportTask(spaceId)).get('taskId')
# get exported file url and download
url = NotionUp.waitForExportedUrl(taskId)
filename = slugify(f'{spaceName}-{spaceId}') + '.zip'
print('download exported zip: {}, {}'.format(url, filename))
filePath = NotionUp.downloadFile(url, filename)
zips.append(filePath)
break
return zips
@staticmethod
def unzipFile(filePath: str, saveDir: str = None):
try:
if not saveDir:
saveDir = FileUtils.new_file(Config.output(), Path(filePath).name.replace('.zip', ''))
FileUtils.clean_dir(saveDir)
file = zipfile.ZipFile(filePath)
file.extractall(saveDir)
file.close()
return saveDir
except Exception as e:
print(f'{filePath} unzip fail,{str(e)}')
@staticmethod
def unzip():
for file in Config.zip_files():
print('unzip exported zip: {}'.format(file))
NotionUp.unzipFile(file)