【Flnik】解决提交程序到flink集群的异常:Could not complete snapshot 3

一、 情况描述

之前一直在IDEA中运行Flink程序,程序也都很顺利的跑通。但是当把程序打包发布到集群上运行便遇到了一些情况:

 bin/flink run -m hadoop102:8081 -c com.ryl.cdc.FlinkCDC_01_DS ./flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar

第一次提交时的异常信息:

------------------------------------------------------------The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'Flink Streaming Job'.at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink Streaming Job'.at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)at com.ryl.cdc.FlinkCDC_01_DS.main(FlinkCDC_01_DS.java:44)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)... 8 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:488)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:334)at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:516)at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:375)at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322)at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276)at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:342)at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:327)at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:163)at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:472)... 4 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:531)at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408)at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:64)at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:518)at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:331)... 19 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:55)at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527)... 24 more

经过一番搜索查询得知时flink缺少访问HDFS的相关Jar包,解决办法:
在flink的lib目录下添加:commons-cli-1.4.jarflink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar后重启flink集群便得以解决。
但是再次提交后缺再次抛出异常:

Job has been submitted with JobID 0ef03d7e76f8bfb91f147dc7d91514ce------------------------------------------------------------The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0ef03d7e76f8bfb91f147dc7d91514ce)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0ef03d7e76f8bfb91f147dc7d91514ce)at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)at com.ryl.cdc.FlinkCDC_01_DS.main(FlinkCDC_01_DS.java:44)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0ef03d7e76f8bfb91f147dc7d91514ce)at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)... 24 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=2000)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator Source: Custom Source -> Sink: Print to Std. Out (1/1)#2.at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:933)at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:886)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 3 for operator Source: Custom Source -> Sink: Print to Std. Out (1/1)#2. Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:697)at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:618)at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:583)at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:309)at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1013)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:997)at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:921)... 10 more
Caused by: java.lang.NullPointerExceptionat com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.snapshotOffsetState(DebeziumSourceFunction.java:262)at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.snapshotState(DebeziumSourceFunction.java:240)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)... 20 more

这次的异常是由于Checkpoint was declined产生,再次进行搜索查询得知是由于Flink无法读取Hadoop的一些类库导致。
解决办法:在安装Hadoop的各个节点的环境变量配置文件中添加以下配置

export HADOOP_CLASSPATH=`hadoop classpath`

二、解决

  1. 在flink的lib目录下添加
    commons-cli-1.4.jar
    flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

  2. 在Hadoop的各个节点的环境变量中配置

export HADOOP_CLASSPATH=`hadoop classpath`
  1. 刷新环境变量配置文件并重启Flink集群
  2. 重新提交程序
bin/flink run -m hadoop102:8081 -c com.ryl.cdc.FlinkCDC_01_DS ./flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID d0b943a1ffeee650dd446a8630262454

可以看到程序成功的运行,访问Flink的WebUI查看程序运行

最后附上Jar包链接

本文链接:https://my.lmcjl.com/post/7598.html

展开阅读全文

4 评论

留下您的评论.