Skip to content

Upgrade to HDP 2.6

The DAPLAB was updated to HDP 2.6, leading to problems with the current flink setup.

The error

At the time, we used Flink 1.3 and launched the different applications into a flink session started with yarn-session.sh. After the update, the error was either:

java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:157)
    at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:87)
    at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:63)
    at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:168)
    at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:96)
    at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at org.apache.flink.yarn.YarnFlinkResourceManager.initialize(YarnFlinkResourceManager.java:244)
    at org.apache.flink.runtime.clusterframework.FlinkResourceManager.preStart(FlinkResourceManager.java:186)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
    at akka.actor.UntypedActor.aroundPreStart(UntypedActor.scala:97)
    at akka.actor.ActorCell.create(ActorCell.scala:580)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

or

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/RMFailoverProxyProvider
...
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.RMFailoverProxyProvider

The fix

The problem is that the update to HDP 2.6 changed some jars, such as hadoop-client, which is used by flink. When we use the binary distribution of flink, the binaries are not the same...

The solution is to build flink from source and to force the use of the local libraries during the build process with the option -Pvendor-repos. At the time, the latest flink version is flink 1.4-SNAPSHOT:

git clone https://github.com/apache/flink
cd flink
mvn install -DskipTests -Dhadoop.version=$(hadoop version | head -1 | sed 's/Hadoop //') -Pvendor-repos
cp -r build-target /usr/local/flink/flink-1.4-SNAPSHOT

Then, don't forget to update your PATH and the FLINK_CONFIG_DIR to use the latest flink version.

Required export

To be able to run a flink application/session on yarn, ensure that you exported the following variables:

export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HADOOP_CLASSPATH=$(hadoop classpath)
export LD_LIBRARY_PATH=/usr/hdp/current/hadoop-client/lib/native:$LD_LIBRARY_PATH
The HADOOP_CLASSPATH is new compared to the previous configurations... And really important !

Build applications using system libraries

Once the update is done, you will be able to launch a flink session (and applications inside the session) using the yarn-session.sh script.

In case you need to launch applications in standalone mode as well, you need to ensure that the application jar does not contain the flink-core, but that you use the one on the system.

Open pom.xml and use <scope>provided</scope> for at least the flink core dependency. For example, here is an extract of the flink-aggregations project:

<properties>
    <flink.version>1.3.0</flink.version>
</properties>

<dependencies>
    <!--flink-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-wikiedits_2.11</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

Note that:

  • we still use flink 1.3 for building (see below)
  • all flink libraries are marked as provided except for the kafka connector, which needs to be included inside the jar

To check that the application runs, use:

export FLINK_CONF_DIR=/path/to/flink-1.4/conf
flink run -d -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 -ynm <appname> <jar> <args>

Fixing jetty-XX could not be resolved and NoSuchMethodError KeyDeser...

Remember to build the jar with flink 1.3.0, not flink-1.4-SNAPSHOT.

The first time, building with flink 1.4 gave an error

org.mortbay.jetty:jetty-util:jar:6.1.26.hwx could not be resolved

Two months later, this error disappeared, so the jar was built BUT on launch I got the error:

java.lang.NoSuchMethodError: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.(Lorg/apache/flink/streaming/util/serialization/DeserializationSchema;)V

Changing the flink.version property in the pom.xml back to flink-1.3.0 fixed it.

Fixing 'containerized.head-cutoff-min is higher ...'

If when launching an application you run into this stacktrace:

java.lang.RuntimeException: Error deploying the YARN cluster
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:584)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:78)
    at org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:963)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:267)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Couldn't deploy Yarn session cluster
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:582)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: The configuration value 'containerized.heap-cutoff-min' is higher (600) than the requested amount of memory 256
    at org.apache.flink.yarn.Utils.calculateHeapSize(Utils.java:101)
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.setupApplicationMasterContainer(AbstractYarnClusterDescriptor.java:1356)
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:840)
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:456)
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:362)
    ... 13 more

Open flink-conf.yaml and set the properties *.heap.mb to 1024 (older versions set the default to 256 or 512, which is not enough):

jobmanager.heap.mb:  1024
taskmanager.heap.mb: 1024