Skip to content

Commit

Permalink
Java mapper (#300)
Browse files Browse the repository at this point in the history
Add Java mapper
  • Loading branch information
potiuk authored Jul 9, 2019
1 parent e52e2f7 commit 670c8eb
Show file tree
Hide file tree
Showing 17 changed files with 662 additions and 13 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ repos:
hooks:
- id: mypy
name: Checks typing annotations consistency with mypy
require_serial: true
- repo: https://github.com/pre-commit/mirrors-pylint
rev: "v2.3.1"
hooks:
Expand Down
42 changes: 32 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,37 @@ If you want to contribute to the project, please take a look at [CONTRIBUTING.md
- [FS Example](#fs-example)
- [Output](#output-4)
- [Known limitations](#known-limitations-4)
- [Pig Example](#pig-example)
- [Java Example](#java-example)
- [Output](#output-5)
- [Known limitations](#known-limitations-5)
- [Shell Example](#shell-example)
- [Pig Example](#pig-example)
- [Output](#output-6)
- [Known limitations](#known-limitations-6)
- [Spark Example](#spark-example)
- [Shell Example](#shell-example)
- [Output](#output-7)
- [Known limitations](#known-limitations-7)
- [Sub-workflow Example](#sub-workflow-example)
- [Spark Example](#spark-example)
- [Output](#output-8)
- [Known limitations](#known-limitations-8)
- [DistCp Example](#distcp-example)
- [Sub-workflow Example](#sub-workflow-example)
- [Output](#output-9)
- [Known limitations](#known-limitations-9)
- [Decision Example](#decision-example)
- [DistCp Example](#distcp-example)
- [Output](#output-10)
- [Known limitations](#known-limitations-10)
- [EL Example](#el-example)
- [Decision Example](#decision-example)
- [Output](#output-11)
- [Known limitations](#known-limitations-11)
- [Hive/Hive2 Example](#hivehive2-example)
- [EL Example](#el-example)
- [Output](#output-12)
- [Known limitations](#known-limitations-12)
- [Email Example](#email-example)
- [Hive/Hive2 Example](#hivehive2-example)
- [Output](#output-13)
- [Prerequisites](#prerequisites)
- [Known limitations](#known-limitations-13)
- [Email Example](#email-example)
- [Output](#output-14)
- [Prerequisites](#prerequisites)
- [Known limitations](#known-limitations-14)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -485,6 +488,25 @@ The converted DAG uses the `BashOperator` in Airflow.

Not all FS operations are currently idempotent. This will be fixed.

## Java Example

The Java example can be run as:

`o2a -i examples/java -o output/java`

Make sure to first copy `examples/fs/configuration.template.properties`, rename it as
`configuration.properties` and fill in with configuration data.

### Output
In this example the output will appear in `/output/java/test_java_dag.py`.

The converted DAG uses the `DataProcHadoopOperator` in Airflow.

### Known limitations

1. Overriding action's Main class via `oozie.launcher.action.main.class` is not implemented.


## Pig Example

The Pig example can be run as:
Expand Down
2 changes: 1 addition & 1 deletion bin/o2a-run-sys-test-complete
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ h a: p: C: L: b: c: m: r: z: d: v A S W K

_ALLOWED_PHASES=" prepare-configuration convert prepare-dataproc test-composer test-oozie "
# shellcheck disable=SC2011
_ALLOWED_APPLICATIONS=" advancedflow childwf decision demo distcp el email fs git hive mapreduce pig shell spark ssh subwf "
_ALLOWED_APPLICATIONS=" advancedflow childwf decision demo distcp el email fs git hive java mapreduce pig shell spark ssh subwf "

_LONG_OPTIONS="
help application: composer-name: composer-location: phase: bucket: cluster:
Expand Down
36 changes: 36 additions & 0 deletions examples/java/configuration.template.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

dataproc_cluster={{DATAPROC_CLUSTER_NAME}}
gcp_conn_id=google_cloud_default
gcp_region={{GCP_REGION}}
gcp_uri_prefix=gs://{{COMPOSER_DAG_BUCKET}}/dags
Binary file added examples/java/hdfs/lib/demo-java-main.jar
Binary file not shown.
59 changes: 59 additions & 0 deletions examples/java/hdfs/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<workflow-app xmlns="uri:oozie:workflow:1.0" name="java-main-wf">
<start to="java-node"/>
<action name="java-node">
<java>
<resource-manager>${resourceManager}</resource-manager>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>org.apache.oozie.example.DemoJavaMain</main-class>
<java-opt>-Dtest1=val1</java-opt>
<java-opt>-Dtest2=val2</java-opt>
<arg>Hello</arg>
<arg>Oozie!</arg>
</java>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
40 changes: 40 additions & 0 deletions examples/java/job.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

nameNode=hdfs://
resourceManager=localhost:8032
queueName=default
examplesRoot=examples
# TODO: replace it with wf:user once we implement it
userName=${user.name}
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/java
2 changes: 2 additions & 0 deletions o2a/converter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@

# Workflow.xml and all HDFS files should be located in this subfolder
HDFS_FOLDER = "hdfs"
# Lib files are added by default to the path of the mapper specified
LIB_FOLDER = "lib"
2 changes: 2 additions & 0 deletions o2a/converter/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from o2a.mappers.distcp_mapper import DistCpMapper
from o2a.mappers.action_mapper import ActionMapper
from o2a.mappers.java_mapper import JavaMapper
from o2a.mappers.email_mapper import EmailMapper
from o2a.mappers.fs_mapper import FsMapper
from o2a.mappers.git_mapper import GitMapper
Expand All @@ -39,6 +40,7 @@
"spark": SparkMapper,
"pig": PigMapper,
"fs": FsMapper,
"java": JavaMapper,
"sub-workflow": SubworkflowMapper,
"shell": ShellMapper,
"map-reduce": MapReduceMapper,
Expand Down
6 changes: 6 additions & 0 deletions o2a/converter/oozie_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from o2a.converter.renderers import BaseRenderer
from o2a.converter.task_group import TaskGroup
from o2a.converter.workflow import Workflow
from o2a.utils.file_utils import get_lib_files
from o2a.mappers.action_mapper import ActionMapper
from o2a.transformers.base_transformer import BaseWorkflowTransformer
from o2a.o2a_libs.property_utils import PropertySet
Expand Down Expand Up @@ -85,11 +86,16 @@ def __init__(
transformers=self.transformers,
)

def retrieve_lib_jar_libraries(self):
logging.info(f"Looking for jar libraries for the workflow in {self.workflow.library_folder}.")
self.workflow.jar_files = get_lib_files(self.workflow.library_folder, extension=".jar")

def recreate_output_directory(self):
shutil.rmtree(self.workflow.output_directory_path, ignore_errors=True)
os.makedirs(self.workflow.output_directory_path, exist_ok=True)

def convert(self, as_subworkflow=False):
self.retrieve_lib_jar_libraries()
self.property_parser.parse_property()
self.parser.parse_workflow()

Expand Down
8 changes: 6 additions & 2 deletions o2a/converter/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Workflow"""
import os
from collections import OrderedDict
from typing import Set, Dict, Type

from o2a.converter.constants import HDFS_FOLDER, LIB_FOLDER
from o2a.converter.parsed_action_node import ParsedActionNode
from o2a.converter.relation import Relation
from o2a.converter.task_group import TaskGroup
from o2a.utils.file_utils import get_lib_files


# This is a container for data, so it does not contain public methods intentionally.
class Workflow: # pylint: disable=too-few-public-methods
class Workflow:
"""Class for Workflow"""

def __init__(
Expand Down Expand Up @@ -55,6 +57,8 @@ def __init__(
"from airflow.utils.trigger_rule import TriggerRule",
"from airflow.utils import dates",
}
self.library_folder = os.path.join(self.input_directory_path, HDFS_FOLDER, LIB_FOLDER)
self.jar_files = get_lib_files(self.library_folder, extension=".jar")

def get_nodes_by_type(self, mapper_type: Type):
return [node for node in self.nodes.values() if isinstance(node.mapper, mapper_type)]
Expand Down
1 change: 1 addition & 0 deletions o2a/converter/workflow_xml_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def parse_action_node(self, action_node: ET.Element):
renderer=self.renderer,
input_directory_path=self.workflow.input_directory_path,
output_directory_path=self.workflow.output_directory_path,
jar_files=self.workflow.jar_files,
transformers=self.transformers,
)

Expand Down
Loading

0 comments on commit 670c8eb

Please sign in to comment.