Skip to content

Commit

Permalink
Moving function indexing to init request (#1446)
Browse files Browse the repository at this point in the history
* Moved indexing to init

* Updated handle init request

* Added e2e tests

* Added unit tests

* Added more unit tests

* Linter fixes

* Added consumption tests

* Minor Refactoring

* Fixed comments

* Throwing ex in load response

* Updated app setting name

* Addressed comments

* Addressed comments

* Addressed comments
  • Loading branch information
gavin-aguiar authored Mar 25, 2024
1 parent c68f88b commit 8ebd94e
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 68 deletions.
5 changes: 5 additions & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@

# Paths
CUSTOMER_PACKAGES_PATH = "/home/site/wwwroot/.python_packages/lib/site-packages"

# Flag to index functions in handle init request
PYTHON_ENABLE_INIT_INDEXING = "PYTHON_ENABLE_INIT_INDEXING"

METADATA_PROPERTIES_WORKER_INDEXED = "worker_indexed"
133 changes: 92 additions & 41 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
PYTHON_ENABLE_DEBUG_LOGGING,
PYTHON_SCRIPT_FILE_NAME,
PYTHON_SCRIPT_FILE_NAME_DEFAULT,
PYTHON_LANGUAGE_RUNTIME)
PYTHON_LANGUAGE_RUNTIME, PYTHON_ENABLE_INIT_INDEXING,
METADATA_PROPERTIES_WORKER_INDEXED)
from .extension import ExtensionManager
from .logging import disable_console_logging, enable_console_logging
from .logging import (logger, error_logger, is_system_log_category,
Expand Down Expand Up @@ -72,9 +73,12 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._function_data_cache_enabled = False
self._functions = functions.Registry()
self._shmem_mgr = SharedMemoryManager()

self._old_task_factory = None

# Used to store metadata returns
self._function_metadata_result = None
self._function_metadata_exception = None

# We allow the customer to change synchronous thread pool max worker
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
# For 3.[6|7|8] The default value is 1.
Expand Down Expand Up @@ -297,6 +301,14 @@ async def _handle__worker_init_request(self, request):
# dictionary which will be later used in the invocation request
bindings.load_binding_registry()

if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
worker_init_request.function_app_directory,
caller_info="worker_init_request")
except Exception as ex:
self._function_metadata_exception = ex

return protos.StreamingMessage(
request_id=self.request_id,
worker_init_response=protos.WorkerInitResponse(
Expand All @@ -313,82 +325,114 @@ async def _handle__worker_status_request(self, request):
request_id=request.request_id,
worker_status_response=protos.WorkerStatusResponse())

def load_function_metadata(self, function_app_directory, caller_info):
"""
This method is called to index the functions in the function app
directory and save the results in function_metadata_result or
function_metadata_exception in case of an exception.
"""
script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')

logger.debug(
'Received load metadata request from %s, request ID %s, '
'script_file_name: %s',
caller_info, self.request_id, script_file_name)

validate_script_file_name(script_file_name)
function_path = os.path.join(function_app_directory,
script_file_name)

self._function_metadata_result = (
self.index_functions(function_path)) \
if os.path.exists(function_path) else None

async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory
function_app_directory = metadata_request.function_app_directory

script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')
function_path = os.path.join(directory, script_file_name)
function_path = os.path.join(function_app_directory,
script_file_name)

logger.info(
'Received WorkerMetadataRequest, request ID %s, function_path: %s',
'Received WorkerMetadataRequest, request ID %s, '
'function_path: %s',
self.request_id, function_path)

try:
validate_script_file_name(script_file_name)

if not os.path.exists(function_path):
# Fallback to legacy model
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=True,
result=protos.StatusResult(
status=protos.StatusResult.Success)))

fx_metadata_results = self.index_functions(function_path)
if not is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
function_app_directory,
caller_info="functions_metadata_request")
except Exception as ex:
self._function_metadata_exception = ex

if self._function_metadata_exception:
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
function_metadata_results=fx_metadata_results,
result=protos.StatusResult(
status=protos.StatusResult.Success)))
status=protos.StatusResult.Failure,
exception=self._serialize_exception(
self._function_metadata_exception))))
else:
metadata_result = self._function_metadata_result

except Exception as ex:
return protos.StreamingMessage(
request_id=self.request_id,
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=False if metadata_result else
True,
function_metadata_results=metadata_result,
result=protos.StatusResult(
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))
status=protos.StatusResult.Success)))

async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_metadata = func_request.metadata
function_name = function_metadata.name
function_app_directory = function_metadata.directory

logger.info(
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
'function_name: %s,', self.request_id, function_id, function_name)
'function_name: %s',
self.request_id, function_id, function_name)

programming_model = "V2"
try:
if not self._functions.get_function(function_id):
script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')
validate_script_file_name(script_file_name)
function_path = os.path.join(
function_metadata.directory,
script_file_name)

if function_metadata.properties.get("worker_indexed", False) \
or os.path.exists(function_path):

if function_metadata.properties.get(
METADATA_PROPERTIES_WORKER_INDEXED, False):
# This is for the second worker and above where the worker
# indexing is enabled and load request is called without
# calling the metadata request. In this case we index the
# function and update the workers registry
_ = self.index_functions(function_path)

try:
self.load_function_metadata(
function_app_directory,
caller_info="functions_load_request")
except Exception as ex:
self._function_metadata_exception = ex

# For the second worker, if there was an exception in
# indexing, we raise it here
if self._function_metadata_exception:
raise Exception(self._function_metadata_exception)

else:
# legacy function
programming_model = "V1"

func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
function_name,
function_app_directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

Expand Down Expand Up @@ -562,6 +606,7 @@ async def _handle__function_environment_reload_request(self, request):

func_env_reload_request = \
request.function_environment_reload_request
directory = func_env_reload_request.function_app_directory

# Append function project root to module finding sys.path
if func_env_reload_request.function_app_directory:
Expand All @@ -587,14 +632,20 @@ async def _handle__function_environment_reload_request(self, request):
root_logger.setLevel(logging.DEBUG)

# Reload azure google namespaces
DependencyManager.reload_customer_libraries(
func_env_reload_request.function_app_directory
)
DependencyManager.reload_customer_libraries(directory)

# calling load_binding_registry again since the
# reload_customer_libraries call clears the registry
bindings.load_binding_registry()

if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
directory,
caller_info="environment_reload_request")
except Exception as ex:
self._function_metadata_exception = ex

# Change function app directory
if getattr(func_env_reload_request,
'function_app_directory', None):
Expand Down
4 changes: 2 additions & 2 deletions azure_functions_worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .utils.common import get_app_setting
from .constants import MODULE_NOT_FOUND_TS_URL, PYTHON_SCRIPT_FILE_NAME, \
PYTHON_SCRIPT_FILE_NAME_DEFAULT, PYTHON_LANGUAGE_RUNTIME, \
CUSTOMER_PACKAGES_PATH, RETRY_POLICY
CUSTOMER_PACKAGES_PATH, RETRY_POLICY, METADATA_PROPERTIES_WORKER_INDEXED
from .logging import logger
from .utils.wrappers import attach_message_to_exception

Expand Down Expand Up @@ -142,7 +142,7 @@ def process_indexed_function(functions_registry: functions.Registry,
bindings=binding_protos,
raw_bindings=indexed_function.get_raw_bindings(),
retry_options=retry_protos,
properties={"worker_indexed": "True"})
properties={METADATA_PROPERTIES_WORKER_INDEXED: "True"})

fx_metadata_results.append(function_metadata)

Expand Down
5 changes: 3 additions & 2 deletions azure_functions_worker/utils/app_setting_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39,
PYTHON_ENABLE_DEBUG_LOGGING,
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED,
PYTHON_SCRIPT_FILE_NAME)
PYTHON_SCRIPT_FILE_NAME, PYTHON_ENABLE_INIT_INDEXING)


def get_python_appsetting_state():
Expand All @@ -23,7 +23,8 @@ def get_python_appsetting_state():
PYTHON_ENABLE_DEBUG_LOGGING,
PYTHON_ENABLE_WORKER_EXTENSIONS,
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED,
PYTHON_SCRIPT_FILE_NAME]
PYTHON_SCRIPT_FILE_NAME,
PYTHON_ENABLE_INIT_INDEXING]

app_setting_states = "".join(
f"{app_setting}: {current_vars[app_setting]} | "
Expand Down
1 change: 0 additions & 1 deletion azure_functions_worker/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,3 @@ def validate_script_file_name(file_name: str):
pattern = re.compile(r'^[a-zA-Z0-9_][a-zA-Z0-9_\-]*\.py$')
if not pattern.match(file_name):
raise InvalidFileNameError(file_name)
return True
41 changes: 29 additions & 12 deletions tests/consumption_tests/test_linux_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from requests import Request

from azure_functions_worker.constants import PYTHON_ENABLE_INIT_INDEXING, \
PYTHON_ENABLE_WORKER_EXTENSIONS, PYTHON_ISOLATE_WORKER_DEPENDENCIES, \
PYTHON_ENABLE_DEBUG_LOGGING
from tests.utils.testutils_lc import (
LinuxConsumptionWebHostController
)
Expand Down Expand Up @@ -107,7 +110,7 @@ def test_new_protobuf(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("NewProtobuf"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -137,7 +140,7 @@ def test_old_protobuf(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("OldProtobuf"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -189,7 +192,7 @@ def test_debug_logging_enabled(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"EnableDebugLogging"),
"PYTHON_ENABLE_DEBUG_LOGGING": "1"
PYTHON_ENABLE_DEBUG_LOGGING: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger1')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -218,7 +221,7 @@ def test_pinning_functions_to_older_version(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"PinningFunctions"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1",
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger1')
resp = ctrl.send_request(req)
Expand All @@ -232,8 +235,7 @@ def test_opencensus_with_extensions_enabled(self):
"""A function app with extensions enabled containing the
following libraries:
azure-functions, azure-eventhub, azure-storage-blob, numpy,
cryptography, pyodbc, requests
azure-functions, opencensus
should return 200 after importing all libraries.
"""
Expand All @@ -242,8 +244,25 @@ def test_opencensus_with_extensions_enabled(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("Opencensus"),
"PYTHON_ENABLE_WORKER_EXTENSIONS": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ENABLE_WORKER_EXTENSIONS: "1"
})
req = Request('GET', f'{ctrl.url}/api/opencensus')
resp = ctrl.send_request(req)
self.assertEqual(resp.status_code, 200)

@skipIf(sys.version_info.minor != 10,
"This is testing only for python310")
def test_opencensus_with_extensions_enabled_init_indexing(self):
"""
A function app with init indexing enabled
"""
with LinuxConsumptionWebHostController(_DEFAULT_HOST_VERSION,
self._py_version) as ctrl:
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("Opencensus"),
PYTHON_ENABLE_WORKER_EXTENSIONS: "1",
PYTHON_ENABLE_INIT_INDEXING: "true"
})
req = Request('GET', f'{ctrl.url}/api/opencensus')
resp = ctrl.send_request(req)
Expand All @@ -263,8 +282,7 @@ def test_reload_variables_after_timeout_error(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"TimeoutError"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/hello')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -297,8 +315,7 @@ def test_reload_variables_after_oom_error(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"OOMError"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/httptrigger')
resp = ctrl.send_request(req)
Expand Down
Loading

0 comments on commit 8ebd94e

Please sign in to comment.