Skip to content

Commit

Permalink
Merge pull request #9714 from archesproject/9670_improve_bulk_load_pe…
Browse files Browse the repository at this point in the history
…rformance

Improve bulk load performance, #9670
  • Loading branch information
apeters authored Jul 20, 2023
2 parents 94abd5f + 8277166 commit 6203b04
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 13 deletions.
2 changes: 1 addition & 1 deletion arches/app/etl_modules/base_data_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def run_load_task(self, loadid, graph_id, node_id, operation, language_code, old
return {"success": False, "data": {"title": _("Error"), "message": data_staged["message"]}}

if data_updated["success"]:
data_updated = self.save_to_tiles(loadid)
data_updated = self.save_to_tiles(loadid, finalize_import=False)
return {"success": True, "data": "done"}
else:
with connection.cursor() as cursor:
Expand Down
41 changes: 31 additions & 10 deletions arches/app/etl_modules/base_import_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ def reverse(self, request, **kwargs):
logger.warn(response)
return response

def save_to_tiles(self, loadid, multiprocessing=True):
def save_to_tiles(self, loadid, finalize_import=True, multiprocessing=True):
self.loadid = loadid
with connection.cursor() as cursor:
try:
cursor.execute("""CALL __arches_prepare_bulk_load();""")
cursor.execute("""SELECT * FROM __arches_staging_to_tile(%s)""", [self.loadid])
row = cursor.fetchall()
saved = cursor.fetchone()[0]
except (IntegrityError, ProgrammingError) as e:
logger.error(e)
cursor.execute(
Expand All @@ -66,19 +66,40 @@ def save_to_tiles(self, loadid, multiprocessing=True):
"message": _("Unable to insert record into staging table"),
}
finally:
cursor.execute("""CALL __arches_complete_bulk_load();""")
try:
cursor.execute("""CALL __arches_complete_bulk_load();""")

if row[0][0]:
if finalize_import:
cursor.execute("""SELECT __arches_refresh_spatial_views();""")
refresh_successful = cursor.fetchone()[0]
if not refresh_successful:
raise Exception('Unable to refresh spatial views')
except Exception as e:
logger.exception(e)
cursor.execute(
"""UPDATE load_event SET (status, indexed_time, complete, successful) = (%s, %s, %s, %s) WHERE loadid = %s""",
("unindexed", datetime.now(), True, True, loadid),
)

if saved:
cursor.execute(
"""UPDATE load_event SET (status, load_end_time) = (%s, %s) WHERE loadid = %s""",
("completed", datetime.now(), loadid),
)
index_resources_by_transaction(loadid, quiet=True, use_multiprocessing=False, recalculate_descriptors=True)
cursor.execute(
"""UPDATE load_event SET (status, indexed_time, complete, successful) = (%s, %s, %s, %s) WHERE loadid = %s""",
("indexed", datetime.now(), True, True, loadid),
)
return {"success": True, "data": "success"}
try:
index_resources_by_transaction(loadid, quiet=True, use_multiprocessing=False, recalculate_descriptors=True)
cursor.execute(
"""UPDATE load_event SET (status, indexed_time, complete, successful) = (%s, %s, %s, %s) WHERE loadid = %s""",
("indexed", datetime.now(), True, True, loadid),
)
return {"success": True, "data": "indexed"}
except Exception as e:
logger.exception(e)
cursor.execute(
"""UPDATE load_event SET (status, load_end_time) = (%s, %s) WHERE loadid = %s""",
("unindexed", datetime.now(), loadid),
)
return {"success": False, "data": "saved"}
else:
cursor.execute(
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,8 @@ define([
}

self.addAllFormData();
self.loading(true);
params.activeTab("import");
self.submit('write').then(data => {
params.activeTab("import");
console.log(data.result);
}).fail( function(err) {
self.alert(
Expand Down
70 changes: 70 additions & 0 deletions arches/app/models/migrations/9670_improve_bulk_load_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from django.db import migrations

class Migration(migrations.Migration):

dependencies = [
("models", "9477_fix_for_spatial_view_dbf_function_edtf_displaying_null"),
]

update_check_excess_tiles_trigger = """
create or replace procedure __arches_complete_bulk_load() AS
$$
DECLARE
cardinality_violations bigint;
BEGIN
alter table tiles enable trigger __arches_check_excess_tiles_trigger;
alter table tiles enable trigger __arches_trg_update_spatial_attributes;
END
$$
language plpgsql;
"""

restore_check_excess_tiles_trigger = """
create or replace procedure __arches_complete_bulk_load() as
$$
DECLARE
cardinality_violations bigint;
BEGIN
alter table tiles enable trigger __arches_check_excess_tiles_trigger;
alter table tiles enable trigger __arches_trg_update_spatial_attributes;
if (not __arches_refresh_spatial_views()) then
Raise EXCEPTION 'Unable to refresh spatial views';
end if;
with cardinality_violations as (SELECT t.resourceinstanceid,
t.nodegroupid,
COALESCE(t.parenttileid::text, '') parent_tileid,
count(*)
FROM tiles t,
node_groups ng
WHERE t.nodegroupid = ng.nodegroupid
AND ng.cardinality = '1'
group by t.resourceinstanceid, t.nodegroupid, parent_tileid
having count(*) > 1)
select count(*)
into cardinality_violations
from cardinality_violations;
if (cardinality_violations > 0) then
Raise Exception 'Cardinality violations found. Run `%` to list violations',
'select * from __arches_get_tile_cardinality_violations()';
else
Raise Notice 'No cardinality violations found';
end if;
END $$
language plpgsql;
"""

create_index_on_load_staging_tileid = """
CREATE INDEX IF NOT EXISTS load_staging_tileid ON load_staging (tileid);
"""

drop_index_on_load_staging_tileid = """
DROP INDEX IF EXISTS load_staging_tileid;
"""

operations = [
migrations.RunSQL(update_check_excess_tiles_trigger, restore_check_excess_tiles_trigger),
migrations.RunSQL(create_index_on_load_staging_tileid, drop_index_on_load_staging_tileid),
]
1 change: 1 addition & 0 deletions arches/app/templates/javascript.htm
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@
remove-from-history='{% trans "remove from history" as removeFromHistory %} "{{ removeFromHistory|escapejs }}"'
number-of-resources-updated='{% trans "Number of Resources Updated" as numberOfResourcesUpdated %} "{{ numberOfResourcesUpdated|escapejs }}"'
indexing='{% trans "indexing" as indexing %} "{{ indexing|escapejs }}"'
loaded-but-unindexed='{% trans "loaded but unindexed" as loadedButUnindexed %} "{{ loadedButUnindexed|escapejs }}"'
validating='{% trans "validating" as validating %} "{{ validating|escapejs }}"'
completed='{% trans "completed" as completed %} "{{ completed|escapejs }}"'
failed='{% trans "failed" as failed %} "{{ failed|escapejs }}"'
Expand Down
6 changes: 6 additions & 0 deletions arches/app/templates/views/components/plugins/etl-manager.htm
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ <h4 class="workflow-select-title" data-bind="text: etlmodule.name"></h4>
<i class="fa fa-spin fa-spinner" data-bind="visible: event.loading()"></i>
</button>
<!-- /ko -->
<!-- ko if: event.status == "unindexed" -->
<button class="btn btn-success" style="width:150px;">
<span data-bind="text: $root.translations.loadedButUnindexed"></span>
<i class="fa fa-spin fa-spinner" data-bind="visible: event.loading()"></i>
</button>
<!-- /ko -->
<!-- ko if: event.status == "failed" -->
<button class="btn btn-danger" style="width:150px;">
<span data-bind="text: $root.translations.failed"></span>
Expand Down

0 comments on commit 6203b04

Please sign in to comment.