From 13abc332c7565baec565b48c2c25587ca3edd581 Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Tue, 25 Jan 2022 14:33:14 +0800 Subject: [PATCH 1/6] clean data for ldbc --- scripts/clean-data.py | 95 +++++++++++++++++++++++++++++++++++++++++++ scripts/copy-data.py | 27 ++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 scripts/clean-data.py create mode 100644 scripts/copy-data.py diff --git a/scripts/clean-data.py b/scripts/clean-data.py new file mode 100644 index 0000000..e5cde55 --- /dev/null +++ b/scripts/clean-data.py @@ -0,0 +1,95 @@ +import getopt +import os +import sys +import threading +import pandas as pd + +_csv_dir = "../target/data/test_data/social_network/" +_thread_count = 10 +_all_csv_files = [] +lock = threading.Lock() + + +def handler_data_once(): + lock.acquire() + file = _all_csv_files.pop() + lock.release() + print('%s handler %s.' % (threading.current_thread().name, file)) + if(os.path.isfile(_csv_dir+file[:-4]+'_header.csv')): + pd_header_file = pd.read_csv(_csv_dir+file[:-4]+'_header.csv', sep='|') + df_header = pd.DataFrame(pd_header_file) + if os.path.exists(_csv_dir+file+'.copy'): + os.remove(_csv_dir+file+'.copy') + pd_file = pd.read_csv(_csv_dir+file, sep='|', + header=None, chunksize=100000) + # handler header csv + name_map = {} + date_list = [] + for i in range(len(df_header.columns)): + if df_header.columns[i].endswith('.id'): + name_map[i] = df_header.columns[i][:-3].lower() + elif df_header.columns[i] == 'id': + name_map[i] = os.path.splitext(file)[0].split('/')[-1] + elif df_header.columns[i].endswith('Date'): + date_list.append(i) + + for key in name_map: + df_header[df_header.columns[key] + ] = df_header[df_header.columns[key]].astype(str) + df_header[df_header.columns[key]] = df_header[df_header.columns[key]].apply( + lambda x: name_map[key]+'-'+x) + for key in date_list: + df_header[df_header.columns[key]] = df_header[df_header.columns[key]].apply( + lambda x: x[:-5]) + df_header.to_csv(_csv_dir+file[:-4] + + '_header.csv.copy', index=False, sep='|') + # handler data csv in chunk + for df in pd_file: + for key in name_map: + df[key] = df[key].astype(str) + df[key] = df[key].apply(lambda x: name_map[key]+'-'+x) + for key in date_list: + df[key] = df[key].apply(lambda x: x[:-5]) + df.to_csv(_csv_dir+file+'.copy', index=False, + sep='|', header=None, mode='a') + + +def handler_data(): + while len(_all_csv_files) > 0: + lock.acquire() + if len(_all_csv_files) <= 0: + break + lock.release() + handler_data_once() + + +if __name__ == "__main__": + argv = sys.argv[1:] + try: + opts, args = getopt.getopt(argv, "i:j:", []) + except getopt.GetoptError: + print('clean-data.py -i -j ') + sys.exit(2) + for opt, arg in opts: + if opt == "-i": + _csv_dir = arg + elif opt == "-j": + _thread_count = int(arg) + all_dir_list = os.listdir(_csv_dir) + for dir in all_dir_list: + if os.path.isdir(_csv_dir+'/'+dir): + dir_list = os.listdir(_csv_dir+'/'+dir) + for file in dir_list: + if file.endswith('.csv') and not file.endswith('header.csv'): + _all_csv_files.append(dir+'/'+file) + thread_group = [] + n = 0 + while n < _thread_count: + n = n+1 + t = threading.Thread(target=handler_data, + name='handler-Thread-'+str(n)) + t.start() + thread_group.append(t) + for th in thread_group: + th.join() + print('all task done! please run copy-data.py to recover csv file') diff --git a/scripts/copy-data.py b/scripts/copy-data.py new file mode 100644 index 0000000..ba430a9 --- /dev/null +++ b/scripts/copy-data.py @@ -0,0 +1,27 @@ +import getopt +import os +import sys + +_csv_dir = "../target/data/test_data/social_network/" +_all_csv_files = [] + +if __name__ == "__main__": + argv = sys.argv[1:] + try: + opts, args = getopt.getopt(argv, "i:j:", []) + except getopt.GetoptError: + print('copy-data.py -i ') + sys.exit(2) + for opt, arg in opts: + if opt == "-i": + _csv_dir = arg + all_dir_list = os.listdir(_csv_dir) + for dir in all_dir_list: + if os.path.isdir(_csv_dir+'/'+dir): + dir_list = os.listdir(_csv_dir+'/'+dir) + for file in dir_list: + if file.endswith('.csv'): + _all_csv_files.append(dir+'/'+file) + for dir in _all_csv_files: + os.remove(_csv_dir+dir) + os.rename(_csv_dir+dir+'.copy', _csv_dir+dir) From 389bf6edbb92a7c7ab4a697ed8023e1240ff5d25 Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Wed, 26 Jan 2022 11:16:15 +0800 Subject: [PATCH 2/6] fix some bugs --- scripts/clean-data.py | 43 ++++++++++++++++++++++++++++++++++++++----- scripts/copy-data.py | 10 +++++++++- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/scripts/clean-data.py b/scripts/clean-data.py index e5cde55..56fdb08 100644 --- a/scripts/clean-data.py +++ b/scripts/clean-data.py @@ -1,5 +1,6 @@ import getopt import os +import re import sys import threading import pandas as pd @@ -16,18 +17,20 @@ def handler_data_once(): lock.release() print('%s handler %s.' % (threading.current_thread().name, file)) if(os.path.isfile(_csv_dir+file[:-4]+'_header.csv')): - pd_header_file = pd.read_csv(_csv_dir+file[:-4]+'_header.csv', sep='|') - df_header = pd.DataFrame(pd_header_file) + pd_header_csv = pd.read_csv(_csv_dir+file[:-4]+'_header.csv', sep='|') + df_header = pd.DataFrame(pd_header_csv) if os.path.exists(_csv_dir+file+'.copy'): os.remove(_csv_dir+file+'.copy') - pd_file = pd.read_csv(_csv_dir+file, sep='|', - header=None, chunksize=100000) + pd_csv = pd.read_csv(_csv_dir+file, sep='|', + header=None, chunksize=100000) # handler header csv name_map = {} date_list = [] for i in range(len(df_header.columns)): if df_header.columns[i].endswith('.id'): name_map[i] = df_header.columns[i][:-3].lower() + elif df_header.columns[i].endswith('.id.1'): + name_map[i] = df_header.columns[i][:-5].lower() elif df_header.columns[i] == 'id': name_map[i] = os.path.splitext(file)[0].split('/')[-1] elif df_header.columns[i].endswith('Date'): @@ -44,7 +47,7 @@ def handler_data_once(): df_header.to_csv(_csv_dir+file[:-4] + '_header.csv.copy', index=False, sep='|') # handler data csv in chunk - for df in pd_file: + for df in pd_csv: for key in name_map: df[key] = df[key].astype(str) df[key] = df[key].apply(lambda x: name_map[key]+'-'+x) @@ -63,6 +66,35 @@ def handler_data(): handler_data_once() +def back_handler(): + print('start back handler') + target_need_fix_title = ['static/place_isPartOf_place_header.csv.copy', + 'dynamic/person_knows_person_header.csv.copy', + 'dynamic/comment_replyOf_comment_header.csv'] + # remove duplicate columns .1 + for dir in target_need_fix_title: + with open(_csv_dir+dir, "r+") as fr: + with open(_csv_dir+dir+'1', "w") as fw: + fw.writelines(re.sub('id\.1', 'id', fr.readline())) + fw.writelines(fr.readlines()) + os.remove(_csv_dir+dir) + os.rename(_csv_dir+dir+'1', _csv_dir+dir) + # split place + place_header_csv = pd.read_csv( + _csv_dir+'static/place_header.csv.copy', sep='|') + place_csv = pd.read_csv(_csv_dir+'static/place.csv.copy', sep='|', + header=None, names=place_header_csv.columns) + df = pd.DataFrame(place_csv) + grouped = df.groupby('type') + for name, group in grouped: + group[0:10].to_csv(_csv_dir+'static/'+name + + '_header.csv.copy', sep='|', index=False) + group.to_csv(_csv_dir+'static/'+name+'.csv.copy', index=False, + sep='|', header=None, mode='a') + os.remove(_csv_dir+'static/place.csv.copy') + os.remove(_csv_dir+'static/place_header.csv.copy') + + if __name__ == "__main__": argv = sys.argv[1:] try: @@ -92,4 +124,5 @@ def handler_data(): thread_group.append(t) for th in thread_group: th.join() + back_handler() print('all task done! please run copy-data.py to recover csv file') diff --git a/scripts/copy-data.py b/scripts/copy-data.py index ba430a9..fb07bea 100644 --- a/scripts/copy-data.py +++ b/scripts/copy-data.py @@ -1,9 +1,14 @@ import getopt import os +import re import sys _csv_dir = "../target/data/test_data/social_network/" _all_csv_files = [] +_all_csv_files_copy = [] +_all_csv_files_need_fix_title = [ + 'static/place_isPartOf_place_header.csv.copy', + 'dynamic/person_knows_person_header.csv.copy'] if __name__ == "__main__": argv = sys.argv[1:] @@ -22,6 +27,9 @@ for file in dir_list: if file.endswith('.csv'): _all_csv_files.append(dir+'/'+file) + elif file.endswith('.copy'): + _all_csv_files_copy.append(dir+'/'+file) for dir in _all_csv_files: os.remove(_csv_dir+dir) - os.rename(_csv_dir+dir+'.copy', _csv_dir+dir) + for dir in _all_csv_files_copy: + os.rename(_csv_dir+dir, _csv_dir+dir[:-5]) From 64641b2e27509f3b93315986540bdc180470e1cc Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Wed, 26 Jan 2022 11:51:50 +0800 Subject: [PATCH 3/6] fix some bugs --- scripts/clean-data.py | 7 ++++--- temp/nebula-importer | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) create mode 160000 temp/nebula-importer diff --git a/scripts/clean-data.py b/scripts/clean-data.py index 56fdb08..481df81 100644 --- a/scripts/clean-data.py +++ b/scripts/clean-data.py @@ -70,7 +70,8 @@ def back_handler(): print('start back handler') target_need_fix_title = ['static/place_isPartOf_place_header.csv.copy', 'dynamic/person_knows_person_header.csv.copy', - 'dynamic/comment_replyOf_comment_header.csv'] + 'dynamic/comment_replyOf_comment_header.csv.copy', + 'static/tagclass_isSubclassOf_tagclass_header.csv.copy'] # remove duplicate columns .1 for dir in target_need_fix_title: with open(_csv_dir+dir, "r+") as fr: @@ -87,8 +88,8 @@ def back_handler(): df = pd.DataFrame(place_csv) grouped = df.groupby('type') for name, group in grouped: - group[0:10].to_csv(_csv_dir+'static/'+name + - '_header.csv.copy', sep='|', index=False) + group[0:9].to_csv(_csv_dir+'static/'+name + + '_header.csv.copy', sep='|', index=False) group.to_csv(_csv_dir+'static/'+name+'.csv.copy', index=False, sep='|', header=None, mode='a') os.remove(_csv_dir+'static/place.csv.copy') diff --git a/temp/nebula-importer b/temp/nebula-importer new file mode 160000 index 0000000..1c83ca8 --- /dev/null +++ b/temp/nebula-importer @@ -0,0 +1 @@ +Subproject commit 1c83ca83aa4f0a25ad9a810136afbda377e3b57b From f8f9ff8381751d52b01490addd8668bc824570bc Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Wed, 26 Jan 2022 14:52:43 +0800 Subject: [PATCH 4/6] fix --- temp/nebula-importer | 1 - 1 file changed, 1 deletion(-) delete mode 160000 temp/nebula-importer diff --git a/temp/nebula-importer b/temp/nebula-importer deleted file mode 160000 index 1c83ca8..0000000 --- a/temp/nebula-importer +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1c83ca83aa4f0a25ad9a810136afbda377e3b57b From 4d505f25f5a480d05c5bf56849d467cb3f5249df Mon Sep 17 00:00:00 2001 From: heroicNeZha <25311962+heroicNeZha@users.noreply.github.com> Date: Wed, 26 Jan 2022 17:45:18 +0800 Subject: [PATCH 5/6] update --- scripts/env.sh | 2 +- templates/nebula-import-vid-string.yaml.j2 | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/env.sh b/scripts/env.sh index ba85e32..5e8ba1d 100755 --- a/scripts/env.sh +++ b/scripts/env.sh @@ -5,7 +5,7 @@ HADOOP_VERSION=3.2.1 scaleFactor=${scaleFactor:-1} -NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-v2.5.1} +NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-v3.0.0} NEBULA_XK6_VERSION=${NEBULA_XK6_VERSION:-v0.0.8} GOLANG_VERSION=${GOLANG_VERSION:-1.16.6} \ No newline at end of file diff --git a/templates/nebula-import-vid-string.yaml.j2 b/templates/nebula-import-vid-string.yaml.j2 index 02a8aea..ecff1cd 100644 --- a/templates/nebula-import-vid-string.yaml.j2 +++ b/templates/nebula-import-vid-string.yaml.j2 @@ -12,7 +12,7 @@ clientSettings: address: {{ address }} postStart: commands: | - CREATE SPACE IF NOT EXISTS {{ space }}(PARTITION_NUM = 24, REPLICA_FACTOR = 3, vid_type = fixed_string(20)); + CREATE SPACE IF NOT EXISTS {{ space }}(PARTITION_NUM = 24, REPLICA_FACTOR = 3, vid_type = fixed_string(32)); USE {{ space }}; {% for vertex in vertex_set -%} CREATE TAG IF NOT EXISTS `{{ vertex.name }}`( From 49db31be83eb725e5318d655d3d2d47c0b2751c9 Mon Sep 17 00:00:00 2001 From: "endy.li" <25311962+heroicNeZha@users.noreply.github.com> Date: Wed, 26 Jan 2022 17:52:07 +0800 Subject: [PATCH 6/6] Update env.sh --- scripts/env.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/env.sh b/scripts/env.sh index 5e8ba1d..783be70 100755 --- a/scripts/env.sh +++ b/scripts/env.sh @@ -5,7 +5,7 @@ HADOOP_VERSION=3.2.1 scaleFactor=${scaleFactor:-1} -NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-v3.0.0} +NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-v2.6.0} NEBULA_XK6_VERSION=${NEBULA_XK6_VERSION:-v0.0.8} -GOLANG_VERSION=${GOLANG_VERSION:-1.16.6} \ No newline at end of file +GOLANG_VERSION=${GOLANG_VERSION:-1.16.6}