flink-ai-extended

by alibaba

256 Stars 95 Forks Last release: Not found Apache License 2.0 103 Commits 1 Releases

Available items

No Items, yet!

The developer of this repository has not created any items for sale yet. Need a bug fixed? Help with integration? A different license? Create a request here:

Build Status

flink-ai-extended

This project is to extend deep learning framework on the Flink project. Currently supports tensorflow running on flink.

contents

TensorFlow support

TensorFlow is a deep learning system developed by Google and open source, which is widely used in the field of deep learning. There are many inconveniences in distributed use and resource management of native TensorFlow, but it can not integrate with the existing widely used large data processing framework.

Flink is a data processing framework. It is widely used in data extraction, feature preprocessing and data cleaning.

This project combines TensorFlow with Flink and provides users with more convenient and useful tools. Currently, Flink job code uses java language and the algorithm code uses python language.

Support Version

TensorFlow: 1.15.0

Flink: 1.11.0

Quick Start

Setup

Requirements 1. python: 2.7 future support python 3 2. pip 3. cmake >= 3.6 4. java 1.8 5. maven >=3.3.0

Install python2

macOS

shell
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
export PATH="/usr/local/bin:/usr/local/sbin:$PATH"
brew install [email protected] 
Ubuntu
shell
sudo apt install python-dev

Install pip

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python get-pip.py

Ubuntu you can install with command:

shell
sudo apt install python-pip

Install pip dependencies Install the pip package dependencies (if using a virtual environment, omit the --user argument):

shell
pip install -U --user pip six numpy wheel mock grpcio grpcio-tools
Install cmake

cmake version must >= 3.6

cmake download page

Install java 8

java download page

Install maven

maven version >=3.3.0

download maven page

tar -xvf apache-maven-3.6.1-bin.tar.gz
mv -rf apache-maven-3.6.1 /usr/local/

configuration environment variables

shell
MAVEN_HOME=/usr/local/apache-maven-3.6.1
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin

Build From Source

Compiling source code depends on tensorflow 1.15.0. Compiling commands will automatically install tensorflow 1.15.0

mvn -DskipTests=true clean install

If you run all tests, this step may take a long time, about 20 minutes, and wait patiently. You can also skip the test run command: mvn -DskipTests=true clean install

Optional Commands ```shell

run all tests

mvn clean install

skip unit tests

mvn -DskipUTs=true clean install

skip integration tests

mvn -DskipITs=true clean install ``` If the above command is executed successfully, congratulations on your successful deployment of flink-ai-extended. Now you can write algorithm programs.

Build Source in virtual environment

  • change project pom.xml item pip.install.option from --user to -U
  • create virtual environment:
virtualenv tfenv
  • enter the virtual environment
source tfenv/bin/activate
  • install pip dependencies
pip install -U pip six numpy wheel mock grpcio grpcio-tools
  • build source
mvn clean install
  • exit from virtual environment
    shell 
    deactivate
    

Example

  1. tensorflow add example

    python code:

import tensorflow as tf
import time
import sys
from flink_ml_tensorflow.tensorflow_context import TFContext

def build_graph(): global a i = 1 a = tf.placeholder(tf.float32, shape=None, name="a") b = tf.reduce_mean(a, name="b") r_list = [] v = tf.Variable(dtype=tf.float32, initial_value=tf.constant(1.0), name="v_" + str(i)) c = tf.add(b, v, name="c_" + str(i)) add = tf.assign(v, c, name="assign_" + str(i)) sum = tf.summary.scalar(name="sum_" + str(i), tensor=c) r_list.append(add) global_step = tf.contrib.framework.get_or_create_global_step() global_step_inc = tf.assign_add(global_step, 1) r_list.append(global_step_inc) return r_list

def map_func(context): tf_context = TFContext(context) job_name = tf_context.get_role_name() index = tf_context.get_index() cluster_json = tf_context.get_tf_cluster()

cluster = tf.train.ClusterSpec(cluster=cluster_json)
server = tf.train.Server(cluster, job_name=job_name, task_index=index)
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False,
                             device_filters=["/job:ps", "/job:worker/task:%d" % index])
t = time.time()
if 'ps' == job_name:
    from time import sleep
    while True:
        sleep(1)
else:
    with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + str(index), cluster=cluster)):
        train_ops = build_graph()
        hooks = [tf.train.StopAtStepHook(last_step=2)]
        with tf.train.MonitoredTrainingSession(master=server.target, config=sess_config,
                                                checkpoint_dir="./target/tmp/s1/" + str(t),
                                                hooks=hooks) as mon_sess:
            while not mon_sess.should_stop():
                print (mon_sess.run(train_ops, feed_dict={a: [1.0, 2.0, 3.0]}))
                sys.stdout.flush()

java code:

add maven dependencies ```xml <?xml version="1.0" encoding="UTF-8"?> 4.0.0

com.alibaba
flink-ai-extended-examples
0.2.2
jar

    
        com.alibaba.flink.ml
        flink-ml-tensorflow
        0.2.2
    
    
        org.apache.curator
        curator-framework
        2.7.1
    
    
        org.apache.curator
        curator-test
        2.7.1
        
            
                com.google.guava
                guava
            
        
    
    
        com.google.guava
        guava
        20.0
    



    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.1
            
                1.8
                1.8
            
        
    

``` You can refer to the following POM

example pom.xml

class Add{
    public static void main(String args[]) throws Exception{ 
        // local zookeeper server.
        TestingServer server = new TestingServer(2181, true);
        String script = "./add.py";
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // if zookeeper has other address
        Map prop = new HashMap<>();
        prop.put(MLConstants.CONFIG_STORAGE_TYPE, MLConstants.STORAGE_ZOOKEEPER);
        prop.put(MLConstants.CONFIG_ZOOKEEPER_CONNECT_STR, "localhost:2181");
        TFConfig config = new TFConfig(2, 1, prop, script, "map_func", null);
        TFUtils.train(streamEnv, null, config);
        JobExecutionResult result = streamEnv.execute();
        server.stop();
    } 
}

Distributed Running

Deployment

Distributed running environment:

  • Start zookeeper service https://zookeeper.apache.org/
  • Prepare python virtual environment: virtual environment workflow is shown in the following figure: venv
  1. Build python virtual environment package.
  2. Put virtual environment package to a share file system such as HDFS.
  3. Configure the virtual environment package address in build Flink machine learning job configuration (TensorFlow:TFConfig, PyTorch:PyTorchConfig).
  4. When running Flink job, each node downloads the virtual environment package and extracts it locally

    [build docker script]

    [build virtual environment script]

Running Distributed Programs

Distributed Running Example

Setup & Build

  • install docker

docker install

  • install flink-ai-extended

    shell 
    mvn -DskipTests=true clean install
    
  • Change work dir

    shell 
    cd docker/build_cluster/
    

Pay attention: projectRoot is flink-ai-extended project root path.

Start Service

  1. start zookeeper
  2. start hdfs
  3. start flink cluster
  • Start zookeeper

[start zookeeper script]

shell
sh start_zookeeper.sh
* Start HDFS

[start hdfs script]

shell 
sh start_hdfs.sh
* Start flink cluster

[start flink cluster script]

shell 
sh start_flink.sh

Also can start all service

[start service script]

shell 
sh start_cluster.sh

Prepare data & code

  • Copy virtual environment package to hdfs
docker exec flink-jm /opt/hadoop-2.8.0/bin/hadoop fs -put -f /opt/work_home/temp/test/tfenv.zip /user/root/tfenv.zip
  • Download mnist data

    shell 
    sh download_mnist_data.sh
    
  • Put train data to docker container

    shell
    docker cp ${projectRoot}/flink-ml-examples/target/data/ flink-jm:/tmp/mnist_input 
    
  • Package user python code

    shell 
    cd ${projectRoot}/flink-ml-examples/target/
    mkdir code && cp ${projectRoot}/flink-ml-examples/src/test/python/* code/
    zip -r ${projectRoot}/flink-ml-examples/target/code.zip code
    
  • Put code package to hdfs

    shell 
    docker exec flink-jm /opt/hadoop-2.8.0/bin/hadoop fs -put -f /opt/work_home/flink-ml-examples/target/code.zip hdfs://minidfs:9000/user/root/
    

Submit train job

docker exec flink-jm flink run  -c com.alibaba.flink.ml.examples.tensorflow.mnist.MnistDist /opt/work_home/flink-ml-examples/target/flink-ml-examples-0.2.2.jar --zk-conn-str minizk --mode StreamEnv --setup /opt/work_home/flink-ml-examples/src/test/python/mnist_data_setup.py --train mnist_dist.py --envpath hdfs://minidfs:9000/user/root/tfenv.zip --mnist-files /tmp/mnist_input --with-restart false --code-path hdfs://minidfs:9000/user/root/code.zip 

Visit Flink Cluster

Flink Cluster Address

Stop all docker containers

[stop all script]

shell 
sh stop_cluster.sh

Summary

In the example above, zookeeper, flink, and HDFS can be deployed on different machines.

You can also use existing zookeeper, hdfs, flink cluster.

Optional Tools

Build framework and tensorflow python package Independently

build script

run buildwheel.sh script, you will find python package in dist dir. you can install the package with commend: ```shell pip install --user $packagepath ```

Build custom virtual environment package

running distributed programs you need a virtual environment package to upload to hdfs.

build virtual environment script

you can change the script to add some extended python package.

Structure

structure 1. AM registers its address to zookeeper. 2. Worker and Ps get AM address from zookeeper. 3. Worker and Ps register their address to AM. 4. AM collect all Worker and Ps address. 5. Worker and Ps get cluster information. 6. Worker and Ps start algorithm python process.

For More Information

Design document

License

Apache License 2.0

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.