Running the Flink jobs
There are two ways to launch flink jobs on yarn:
Yarn Session vs Standalone
- Create a flink yarn session and submit new jobs inside it;
- Run each flink application in standalone mode.
The first way is preferred, since it is makes it easy to use flink list
, flink cancel
, etc and to monitor our jobs (they are all listed in the flink session web page accessed through the applicationMaster url of the session).
Limitations of Flink 1.3
From Flink 1.3 onwards, it is possible to create externalized checkpoints (see savepoints and externalized checkpoints). This is very convenient in order to recover a failed job: we just need to relaunch the job with the -s <chk dir>
option.
But, we have one problem: the name of a checkpoint directory is random and meaningless (it doesn't contain the name of the job) and the checkpoint directory is not customizable at runtime. Instead, it is specified in flink-conf.yaml
:
Similarly to savepoints, an externalized checkpoint consists of a meta data file and, depending on the state back-end, some additional data files. The target directory for the externalized checkpoint’s meta data is determined from the configuration key state.checkpoints.dir which, currently, can only be set via the configuration files
state.checkpoints.dir: hdfs:///checkpoints/
(source)
Using a yarn session, the same flink-conf.yaml
is used for all the applications running inside it. It means that all the applications will dump their externalized checkpoints in the same root directory and that in case of failure, it is impossible to know which directory holds data for a given application...
The ticket Add option for persistent checkpoints and the propositions FLINK-4484 FLINK-10 and FLINK-4512 FLIP-10 are already discussing the issue.
Solution
As long as the externalized checkpoint location is not overridable by code, the only possibility is to launch the flink applications in standalone with their own configuration file.
Script
For example, here is the script to launch the flink basic applications on the daplab (daplab-app-1.fri.lan:/opt/bbdata/flink/flink-basic-processing
):
#!/bin/bash export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CLASSPATH=$(hadoop classpath) export LD_LIBRARY_PATH=/usr/hdp/current/hadoop-client/lib/native:$LD_LIBRARY_PATH base=/opt/bbdata/flink/flink-basic-processing savepoint="$2" if [ -n "savepoint" ]; then savepoint="-s hdfs://$savepoint" fi if [ "$1" == "saver" ]; then ## flink quarters export FLINK_CONF_DIR=/opt/bbdata/flink/conf/conf-saver flink run -d --yarnstreaming --jobmanager yarn-cluster -yn 1 -ynm "BBDATA saver" $savepoint -j $base/target/flink-basic-*.jar $base/../properties/flink-basic.properties saver elif [ "$1" == "augmenter" ]; then ## flink hours export FLINK_CONF_DIR=/opt/bbdata/flink/conf/conf-augmenter flink run -d --yarnstreaming --jobmanager yarn-cluster -yn 1 -ynm "BBDATA augmenter" $savepoint -j $base/target/flink-basic-*.jar $base/../properties/flink-basic.properties augmenter else echo "missing or wrong argument: expecting saver or augmenter" fi
Usage:
./launch.sh <saver|augmenter> [savepoint-directory]
The configuration directoy should be in /opt/bbdata/flink/conf/conf-[saver|augmenter]
(with for only difference the target directories for checkpoints and savepoints) and the properties file should be in /opt/bbdata/flink/properties/flink-basic.properties
.