Upgrade to HDP 2.6
The DAPLAB was updated to HDP 2.6, leading to problems with the current flink setup.
The error
At the time, we used Flink 1.3 and launched the different applications into a flink session started with yarn-session.sh
.
After the update, the error was either:
java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75) at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:157) at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:87) at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:63) at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:168) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:96) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.flink.yarn.YarnFlinkResourceManager.initialize(YarnFlinkResourceManager.java:244) at org.apache.flink.runtime.clusterframework.FlinkResourceManager.preStart(FlinkResourceManager.java:186) at akka.actor.Actor$class.aroundPreStart(Actor.scala:472) at akka.actor.UntypedActor.aroundPreStart(UntypedActor.scala:97) at akka.actor.ActorCell.create(ActorCell.scala:580) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
or
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/RMFailoverProxyProvider ... Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.RMFailoverProxyProvider
The fix
The problem is that the update to HDP 2.6 changed some jars, such as hadoop-client, which is used by flink. When we use the binary distribution of flink, the binaries are not the same...
Build Flink from source
The solution is to build flink from source and to force the use of the local libraries during the build process with the option -Pvendor-repos
.
At the time, the latest flink version is flink 1.4-SNAPSHOT:
git clone https://github.com/apache/flink cd flink mvn install -DskipTests -Dhadoop.version=$(hadoop version | head -1 | sed 's/Hadoop //') -Pvendor-repos cp -r build-target /usr/local/flink/flink-1.4-SNAPSHOT
Then, don't forget to update your PATH
and the FLINK_CONFIG_DIR
to use the latest flink version.
Required export
To be able to run a flink application/session on yarn, ensure that you exported the following variables:
export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CLASSPATH=$(hadoop classpath) export LD_LIBRARY_PATH=/usr/hdp/current/hadoop-client/lib/native:$LD_LIBRARY_PATH
HADOOP_CLASSPATH
is new compared to the previous configurations... And really important !
Build applications using system libraries
Once the update is done, you will be able to launch a flink session (and applications inside the session) using the yarn-session.sh
script.
In case you need to launch applications in standalone mode as well, you need to ensure that the application jar does not contain the flink-core, but that you use the one on the system.
Open pom.xml
and use <scope>provided</scope>
for at least the flink core dependency. For example, here is an extract of the flink-aggregations project:
<properties> <flink.version>1.3.0</flink.version> </properties> <dependencies> <!--flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
Note that:
- we still use flink 1.3 for building (see below)
- all flink libraries are marked as provided except for the kafka connector, which needs to be included inside the jar
To check that the application runs, use:
export FLINK_CONF_DIR=/path/to/flink-1.4/conf flink run -d -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 -ynm <appname> <jar> <args>
Fixing jetty-XX could not be resolved and NoSuchMethodError KeyDeser...
Remember to build the jar with flink 1.3.0, not flink-1.4-SNAPSHOT.
The first time, building with flink 1.4 gave an error
org.mortbay.jetty:jetty-util:jar:6.1.26.hwx could not be resolved
Two months later, this error disappeared, so the jar was built BUT on launch I got the error:
java.lang.NoSuchMethodError: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.
(Lorg/apache/flink/streaming/util/serialization/DeserializationSchema;)V
Changing the flink.version
property in the pom.xml
back to flink-1.3.0 fixed it.
Fixing 'containerized.head-cutoff-min is higher ...'
If when launching an application you run into this stacktrace:
java.lang.RuntimeException: Error deploying the YARN cluster at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:584) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:78) at org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:963) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:267) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132) at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:582) ... 12 more Caused by: java.lang.IllegalArgumentException: The configuration value 'containerized.heap-cutoff-min' is higher (600) than the requested amount of memory 256 at org.apache.flink.yarn.Utils.calculateHeapSize(Utils.java:101) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.setupApplicationMasterContainer(AbstractYarnClusterDescriptor.java:1356) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:840) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:456) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:362) ... 13 more
Open flink-conf.yaml
and set the properties *.heap.mb
to 1024
(older versions set the default to 256 or 512, which is not enough):
jobmanager.heap.mb: 1024 taskmanager.heap.mb: 1024