<>错误异常日志
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at
akka.actor.Actor.aroundReceive(Actor.scala:517) at
akka.actor.Actor.aroundReceive$(Actor.scala:515) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Connection timed out (Connection timed
out) at java.net.PlainSocketImpl.socketConnect(Native Method) at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at
java.net.Socket.connect(Socket.java:606) at
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:97)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
<>解决办法
数据源不稳定,连接超时,导致程序宕机,可以通过适当的配置重新启动策略来让程序重新运行 #重新启动策略:失败率 restart-strategy:
failure-rate #重新启动策略失败率每间隔最大失败次数:3
restart-strategy.failure-rate.max-failures-per-interval: 3 #重新启动策略失败率间隔:5分钟
restart-strategy.failure-rate.failure-rate-interval: 5 min #重新启动策略失败率延迟:10 s
restart-strategy.failure-rate.delay: 10 s Flink支持不同的重启策略,以在故障发生时控制作业如何重启
集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。
如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数
restart-strategy 定义了哪个策略被使用。 常用的重启: 1.策略固定间隔 (Fixed delay) 2.失败率 (Failure rate)
3.无重启 (No restart) 如果没有启用 checkpointing,则使用无重启 (no restart) 策略。如果启用了
checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略
重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置 固定间隔 第一种:全局配置
flink-conf.yaml restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10
s 第二种:应用代码设置: env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,//
尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 间隔 )); 失败率
失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间
下面配置是5分钟内若失败了3次则认为该job失败,重试间隔为10s 第一种:全局配置 flink-conf.yaml restart-strategy:
failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s 第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.failureRateRestart( 3,//一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段 Time.of(10, TimeUnit.SECONDS) //
间隔 )); 无重启策略 第一种:全局配置 flink-conf.yaml restart-strategy: none 第二种:应用代码设置
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart()); 实际代码演示 public class
RestartTest { public static void main(String[] args) { //获取flink的运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000
ms进行启动一个检查点【设置checkpoint的周期】 env.enableCheckpointing(1000); // 间隔10秒 重启3次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.seconds(10)));
//5分钟内若失败了3次则认为该job失败,重试间隔为10s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));
//不重试 env.setRestartStrategy(RestartStrategies.noRestart()); }// }

技术
©2019-2020 Toolsou All rights reserved,
[数据结构]八大排序算法(C语言)总结G1垃圾收集器面试题Android中使用微信H5支付时支付结果刷新问题32-jdbc工具类大学里要参加竞赛吗?都有哪些竞赛可以参加?震惊!!C++居然可以发出声音!C语言之链表入门(超详解)Java实现一个疫情人数管理系统如何用python实现斐波那契数列的前100个MyBatis循环Map(高级用法)