Skip to content

Commit

Permalink
fixup! Add Java mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk committed Jul 9, 2019
1 parent 9ebe183 commit 9741a64
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
6 changes: 6 additions & 0 deletions o2a/converter/oozie_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from o2a.converter.renderers import BaseRenderer
from o2a.converter.task_group import TaskGroup
from o2a.converter.workflow import Workflow
from o2a.utils.file_utils import get_lib_files
from o2a.mappers.action_mapper import ActionMapper
from o2a.transformers.base_transformer import BaseWorkflowTransformer
from o2a.o2a_libs.property_utils import PropertySet
Expand Down Expand Up @@ -85,11 +86,16 @@ def __init__(
transformers=self.transformers,
)

def retrieve_lib_jar_libraries(self):
logging.info(f"Looking for jar libraries for the workflow in {self.workflow.library_folder}.")
self.workflow.jar_files = get_lib_files(self.workflow.library_folder, extension=".jar")

def recreate_output_directory(self):
shutil.rmtree(self.workflow.output_directory_path, ignore_errors=True)
os.makedirs(self.workflow.output_directory_path, exist_ok=True)

def convert(self, as_subworkflow=False):
self.retrieve_lib_jar_libraries()
self.property_parser.parse_property()
self.parser.parse_workflow()

Expand Down
12 changes: 3 additions & 9 deletions o2a/converter/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
"""Workflow"""
import os
from collections import OrderedDict
from typing import Set, Dict, Type, List
from typing import Set, Dict, Type

from o2a.converter.constants import HDFS_FOLDER, LIB_FOLDER
from o2a.converter.parsed_action_node import ParsedActionNode
from o2a.converter.relation import Relation
from o2a.converter.task_group import TaskGroup
from o2a.utils.file_utils import get_lib_files


class Workflow:
Expand Down Expand Up @@ -57,14 +58,7 @@ def __init__(
"from airflow.utils import dates",
}
self.library_folder = os.path.join(self.input_directory_path, HDFS_FOLDER, LIB_FOLDER)
self.jar_files = self.get_lib_files(extension=".jar")

def get_lib_files(self, extension: str) -> List[str]:
if os.path.exists(self.library_folder):
if os.path.isdir(self.library_folder):
return [file for file in os.listdir(self.library_folder) if file.endswith(extension)]
raise Exception(f"The {self.library_folder} exists but it is not a directory!")
return []
self.jar_files = get_lib_files(self.library_folder, extension=".jar")

def get_nodes_by_type(self, mapper_type: Type):
return [node for node in self.nodes.values() if isinstance(node.mapper, mapper_type)]
Expand Down
33 changes: 33 additions & 0 deletions o2a/utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities to deal with files in workflow folders"""
import os
from typing import List


def get_lib_files(library_folder_path: str, extension: str) -> List[str]:
"""Returns list of all lib files from the library folder matching the extension provided.
For example is you specify the '.jar' extension it will return names (not paths) of all
the *.jar files.
It returns empty list in case there are no files matching the extension and raises Exception
in case the lib folder does not exist or is not a directory.
"""
if os.path.exists(library_folder_path):
if os.path.isdir(library_folder_path):
return [file for file in os.listdir(library_folder_path) if file.endswith(extension)]
raise Exception(f"The {library_folder_path} exists but it is not a directory!")
return []

0 comments on commit 9741a64

Please sign in to comment.