Skip to content

Running the Flink jobs

There are two ways to launch flink jobs on yarn:

Yarn Session vs Standalone

  1. Create a flink yarn session and submit new jobs inside it;
  2. Run each flink application in standalone mode.

The first way is preferred, since it is makes it easy to use flink list, flink cancel, etc and to monitor our jobs (they are all listed in the flink session web page accessed through the applicationMaster url of the session).

From Flink 1.3 onwards, it is possible to create externalized checkpoints (see savepoints and externalized checkpoints). This is very convenient in order to recover a failed job: we just need to relaunch the job with the -s <chk dir> option.

But, we have one problem: the name of a checkpoint directory is random and meaningless (it doesn't contain the name of the job) and the checkpoint directory is not customizable at runtime. Instead, it is specified in flink-conf.yaml:

Similarly to savepoints, an externalized checkpoint consists of a meta data file and, depending on the state back-end, some additional data files. The target directory for the externalized checkpoint’s meta data is determined from the configuration key state.checkpoints.dir which, currently, can only be set via the configuration files state.checkpoints.dir: hdfs:///checkpoints/ (source)

Using a yarn session, the same flink-conf.yaml is used for all the applications running inside it. It means that all the applications will dump their externalized checkpoints in the same root directory and that in case of failure, it is impossible to know which directory holds data for a given application...

The ticket Add option for persistent checkpoints and the propositions FLINK-4484 FLINK-10 and FLINK-4512 FLIP-10 are already discussing the issue.

Solution

As long as the externalized checkpoint location is not overridable by code, the only possibility is to launch the flink applications in standalone with their own configuration file.

Script

For example, here is the script to launch the flink basic applications on the daplab (daplab-app-1.fri.lan:/opt/bbdata/flink/flink-basic-processing):

#!/bin/bash

export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HADOOP_CLASSPATH=$(hadoop classpath)
export LD_LIBRARY_PATH=/usr/hdp/current/hadoop-client/lib/native:$LD_LIBRARY_PATH

base=/opt/bbdata/flink/flink-basic-processing

savepoint="$2"
if [ -n "savepoint" ]; then 
    savepoint="-s hdfs://$savepoint"
fi

if [ "$1" == "saver" ]; then
    ## flink quarters
    export FLINK_CONF_DIR=/opt/bbdata/flink/conf/conf-saver
    flink run -d --yarnstreaming --jobmanager yarn-cluster -yn 1 -ynm "BBDATA saver" $savepoint -j $base/target/flink-basic-*.jar $base/../properties/flink-basic.properties saver

elif [ "$1" == "augmenter" ]; then 
    ## flink hours
    export FLINK_CONF_DIR=/opt/bbdata/flink/conf/conf-augmenter
    flink run -d --yarnstreaming --jobmanager yarn-cluster -yn 1 -ynm "BBDATA augmenter" $savepoint -j $base/target/flink-basic-*.jar $base/../properties/flink-basic.properties augmenter
else
    echo "missing or wrong argument: expecting saver or augmenter"
fi

Usage:

./launch.sh <saver|augmenter> [savepoint-directory]

The configuration directoy should be in /opt/bbdata/flink/conf/conf-[saver|augmenter] (with for only difference the target directories for checkpoints and savepoints) and the properties file should be in /opt/bbdata/flink/properties/flink-basic.properties.