Skip to content

Commit

Permalink
fix: Catalog init introduces significant overhead (#1270)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav274 authored Oct 9, 2023
1 parent 3ea2f8a commit 18bc547
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 109 deletions.
1 change: 1 addition & 0 deletions evadb/executor/abstract_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, db: EvaDBDatabase, node: AbstractPlan):
self._config: ConfigurationManager = db.config if db else None
self._children = []

# @lru_cache(maxsize=None)
def catalog(self) -> "CatalogManager":
"""The object is intentionally generated on demand to prevent serialization issues. Having a SQLAlchemy object as a member variable can cause problems with multiprocessing. See get_catalog_instance()"""
return self._db.catalog() if self._db else None
Expand Down
13 changes: 4 additions & 9 deletions evadb/executor/apply_and_merge_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import instrument_function_expression_cost
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.apply_and_merge_plan import ApplyAndMergePlan

Expand All @@ -42,19 +43,13 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
for batch in child_executor.exec(**kwargs):
func_result = self.func_expr.evaluate(batch)

# persist stats of function expression
if self.func_expr.function_obj and self.func_expr._stats:
function_id = self.func_expr.function_obj.row_id
self.catalog().upsert_function_cost_catalog_entry(
function_id,
self.func_expr.function_obj.name,
self.func_expr._stats.prev_cost,
)

output = Batch.merge_column_wise([batch, func_result])
if self.do_unnest:
output.unnest(func_result.columns)
# we reset the index as after unnest there can be duplicate index
output.reset_index()

yield output

# persist stats of function expression
instrument_function_expression_cost(self.func_expr, self.catalog())
50 changes: 27 additions & 23 deletions evadb/executor/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import glob
import os
from pathlib import Path
from typing import TYPE_CHECKING, Generator, List
from typing import TYPE_CHECKING, Generator, List, Union

from evadb.catalog.catalog_utils import xform_column_definitions_to_catalog_entries
from evadb.catalog.models.utils import TableCatalogEntry
Expand All @@ -39,41 +39,45 @@ class ExecutorError(Exception):
pass


def apply_project(
batch: Batch, project_list: List[AbstractExpression], catalog: "CatalogManager"
def instrument_function_expression_cost(
expr: Union[AbstractExpression, List[AbstractExpression]],
catalog: "CatalogManager",
):
"""We are expecting an instance of a catalog. An optimization can be to avoid creating a catalog instance if there is no function expression. An easy fix is to pass the function handler and create the catalog instance only if there is a function expression. In the past, this was problematic because of Ray. We can revisit it again."""

if expr is None:
return

list_expr = expr
if not isinstance(expr, list):
list_expr = [expr]

# persist stats of function expression
for expr in list_expr:
for func_expr in expr.find_all(FunctionExpression):
if func_expr.function_obj and func_expr._stats:
function_id = func_expr.function_obj.row_id
catalog.upsert_function_cost_catalog_entry(
function_id,
func_expr.function_obj.name,
func_expr._stats.prev_cost,
)


def apply_project(batch: Batch, project_list: List[AbstractExpression]):
if not batch.empty() and project_list:
batches = [expr.evaluate(batch) for expr in project_list]
batch = Batch.merge_column_wise(batches)

# persist stats of function expression
for expr in project_list:
for func_expr in expr.find_all(FunctionExpression):
if func_expr.function_obj and func_expr._stats:
function_id = func_expr.function_obj.row_id
catalog.upsert_function_cost_catalog_entry(
function_id,
func_expr.function_obj.name,
func_expr._stats.prev_cost,
)
return batch


def apply_predicate(
batch: Batch, predicate: AbstractExpression, catalog: "CatalogManager"
) -> Batch:
def apply_predicate(batch: Batch, predicate: AbstractExpression) -> Batch:
if not batch.empty() and predicate is not None:
outcomes = predicate.evaluate(batch)
batch.drop_zero(outcomes)
batch.reset_index()

# persist stats of function expression
for func_expr in predicate.find_all(FunctionExpression):
if func_expr.function_obj and func_expr._stats:
function_id = func_expr.function_obj.row_id
catalog.upsert_function_cost_catalog_entry(
function_id, func_expr.function_obj.name, func_expr._stats.prev_cost
)
return batch


Expand Down
13 changes: 4 additions & 9 deletions evadb/executor/function_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import instrument_function_expression_cost
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.function_scan_plan import FunctionScanPlan

Expand All @@ -41,17 +42,11 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
if not lateral_input.empty():
res = self.func_expr.evaluate(lateral_input)

# persist stats of function expression
if self.func_expr.function_obj and self.func_expr._stats:
function_id = self.func_expr.function_obj.row_id
self.catalog().upsert_function_cost_catalog_entry(
function_id,
self.func_expr.function_obj.name,
self.func_expr._stats.prev_cost,
)

if not res.empty():
if self.do_unnest:
res.unnest(res.columns)

yield res

# persist stats of function expression
instrument_function_expression_cost(self.func_expr, self.catalog())
18 changes: 13 additions & 5 deletions evadb/executor/hash_join_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import apply_predicate, apply_project
from evadb.executor.executor_utils import (
apply_predicate,
apply_project,
instrument_function_expression_cost,
)
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.hash_join_probe_plan import HashJoinProbePlan

Expand All @@ -38,8 +42,12 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
probe_batch.reassign_indices_to_hash(hash_keys)
join_batch = Batch.join(probe_batch, build_batch)
join_batch.reset_index()
join_batch = apply_predicate(join_batch, self.predicate, self.catalog())
join_batch = apply_project(
join_batch, self.join_project, self.catalog()
)
join_batch = apply_predicate(join_batch, self.predicate)
join_batch = apply_project(join_batch, self.join_project)
yield join_batch

# instrument required stats
if self.predicate or self.join_project:
catalog = self.catalog()
instrument_function_expression_cost(self.predicate, catalog)
instrument_function_expression_cost(self.join_project, catalog)
44 changes: 0 additions & 44 deletions evadb/executor/lateral_join_executor.py

This file was deleted.

13 changes: 9 additions & 4 deletions evadb/executor/nested_loop_join_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import apply_predicate
from evadb.executor.executor_utils import (
apply_predicate,
instrument_function_expression_cost,
)
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.nested_loop_join_plan import NestedLoopJoinPlan

Expand All @@ -33,8 +36,10 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
for row2 in inner.exec(**kwargs):
result_batch = Batch.join(row1, row2)
result_batch.reset_index()
result_batch = apply_predicate(
result_batch, self.predicate, self.catalog()
)
result_batch = apply_predicate(result_batch, self.predicate)
if not result_batch.empty():
yield result_batch

# instrument required stats
if self.predicate:
instrument_function_expression_cost(self.predicate, self.catalog())
7 changes: 0 additions & 7 deletions evadb/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from evadb.executor.hash_join_executor import HashJoinExecutor
from evadb.executor.insert_executor import InsertExecutor
from evadb.executor.join_build_executor import BuildJoinExecutor
from evadb.executor.lateral_join_executor import LateralJoinExecutor
from evadb.executor.limit_executor import LimitExecutor
from evadb.executor.load_executor import LoadDataExecutor
from evadb.executor.nested_loop_join_executor import NestedLoopJoinExecutor
Expand Down Expand Up @@ -128,12 +127,6 @@ def _build_execution_tree(
executor_node = SampleExecutor(db=self._db, node=plan)
elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN:
executor_node = NestedLoopJoinExecutor(db=self._db, node=plan)
elif plan_opr_type == PlanOprType.LATERAL_JOIN:
logger.warn(
"LateralJoin Executor should not be part of the execution plan."
"Please raise an issue with the current query. Thanks!"
)
executor_node = LateralJoinExecutor(db=self._db, node=plan)
elif plan_opr_type == PlanOprType.HASH_JOIN:
executor_node = HashJoinExecutor(db=self._db, node=plan)
elif plan_opr_type == PlanOprType.HASH_BUILD:
Expand Down
10 changes: 8 additions & 2 deletions evadb/executor/predicate_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import apply_predicate
from evadb.executor.executor_utils import (
apply_predicate,
instrument_function_expression_cost,
)
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.predicate_plan import PredicatePlan

Expand All @@ -31,6 +34,9 @@ def __init__(self, db: EvaDBDatabase, node: PredicatePlan):
def exec(self, *args, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec(**kwargs):
batch = apply_predicate(batch, self.predicate, self.catalog())
batch = apply_predicate(batch, self.predicate)
if not batch.empty():
yield batch

# perform any required instrumentation before we return
instrument_function_expression_cost(self.predicate, self.catalog())
13 changes: 10 additions & 3 deletions evadb/executor/project_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError, apply_project
from evadb.executor.executor_utils import (
ExecutorError,
apply_project,
instrument_function_expression_cost,
)
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.project_plan import ProjectPlan

Expand All @@ -35,15 +39,18 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
if len(self.children) == 0:
# Create a dummy batch with size 1
dummy_batch = Batch(pd.DataFrame([0]))
batch = apply_project(dummy_batch, self.target_list, self.catalog())
batch = apply_project(dummy_batch, self.target_list)
if not batch.empty():
yield batch
# SELECT expr FROM table;
elif len(self.children) == 1:
child_executor = self.children[0]
for batch in child_executor.exec(**kwargs):
batch = apply_project(batch, self.target_list, self.catalog())
batch = apply_project(batch, self.target_list)
if not batch.empty():
yield batch
else:
raise ExecutorError("ProjectExecutor has more than 1 children.")

# instrument required stats
instrument_function_expression_cost(self.target_list, self.catalog())
16 changes: 13 additions & 3 deletions evadb/executor/seq_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import apply_predicate, apply_project
from evadb.executor.executor_utils import (
apply_predicate,
apply_project,
instrument_function_expression_cost,
)
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.seq_scan_plan import SeqScanPlan

Expand Down Expand Up @@ -44,9 +48,15 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
batch.modify_column_alias(self.alias)

# We do the predicate first
batch = apply_predicate(batch, self.predicate, self.catalog())
batch = apply_predicate(batch, self.predicate)
# Then do project
batch = apply_project(batch, self.project_expr, self.catalog())
batch = apply_project(batch, self.project_expr)

if not batch.empty():
yield batch

# instrument required stats
if self.predicate or self.project_expr:
catalog = self.catalog()
instrument_function_expression_cost(self.predicate, catalog)
instrument_function_expression_cost(self.project_expr, catalog)

0 comments on commit 18bc547

Please sign in to comment.