Skip to content

Latest commit

 

History

History
90 lines (75 loc) · 3.12 KB

README.md

File metadata and controls

90 lines (75 loc) · 3.12 KB

hadoop-connectors

Apache Hadoop connectors for Pravega.

Description

Implements both the input and the output format interfaces for Hadoop. It leverages Pravega batch client to read existing events in parallel; and uses write API to write events to Pravega stream.

Build

The build script handles Pravega as a source dependency, meaning that the connector is linked to a specific commit of Pravega (as opposed to a specific release version) in order to faciliate co-development. This is accomplished with a combination of a git submodule and the use of Gradle's composite build feature.

Cloning the repository

When cloning the connector repository, be sure to instruct git to recursively checkout submodules, e.g.:

git clone --recurse-submodules https://github.com/pravega/hadoop-connectors.git

To update an existing repository:

git submodule update --init --recursive

Building Pravega

Pravega is built automatically by the connector build script.

Building Hadoop Connector

Build the connector:

./gradlew build (w/o dependencies)
./gradlew shadowJar (w/ dependencies)

Test

./gradlew test

Usage

Input Connector

        Configuration conf = PravegaInputFormat.builder()
            .withScope("myScope")
            .forStream("myInputStream")
            .withURI("tcp://127.0.0.1:9090")
            .withDeserializer(io.pravega.client.stream.impl.JavaSerializer.class.getName())
            // optional to set start and end positions
            // generally, start positions are set to the end positions in previous job,
            // so only new generated events will be processed, otherwise, start from very beginning if not set
            .startPositions(startPos)
            .endPositions(endPos)
            .build();

        Job job = new Job(conf);
        job.setInputFormatClass(PravegaInputFormat.class);

        // NOTE:
        // 1. You have the option to use existing job 'Configuration' instance as the input parameter to create a builder
        //     "PravegaInputFormat.builder(conf)"
        // 2. Key class is 'EventKey', but you won't need it at most of the time.

Output Connector

        Configuration conf = PravegaOutputFormat.builder()
            .withScope("myScope")
            .forStream("myOutputStream")
            .withURI("tcp://127.0.0.1:9090")
            .withSerializer(io.pravega.client.stream.impl.JavaSerializer.class.getName())
            // optional to set the scaling of output stream, 1 by default
            .withScaling(3)
            .build();

        Job job = new Job(conf);
        job.setOutputFormatClass(PravegaOutputFormat.class);
        // NOTE:
        // 1. You have the option to use existing job 'Configuration' instance as the output parameter to create a builder
        //     "PravegaOutputFormat.builder(conf)"