1. xml-apis 冲突问题

javax.xml.parsers.FactoryConfigurationError: Provider for class javax.xml.parsers.DocumentBuilderFactory cannot be created
java.lang.RuntimeException: Provider for class javax.xml.parsers.DocumentBuilderFactory cannot be created

flink如果要操作hdfs,需要一个hadoop-hdfs的包.

如果项目里面有用dom4j相关包,就会与之有冲突,出现如下错误.

06/05/2018 11:40:40     Job execution switched to status FAILING.
javax.xml.parsers.FactoryConfigurationError: Provider for class javax.xml.parsers.DocumentBuilderFactory cannot be created
        at javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
        at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
        at javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2516)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
        at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2189)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:99)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
        at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:246)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Provider for class javax.xml.parsers.DocumentBuilderFactory cannot be created
        at javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:308)
        ... 20 more

解决方案如下:

原因就是xml-apis包与dom4j冲突了.

把所有引的包的xml-apis依赖全部排除了

(顺便吐槽一下网上,有的说加这个包,有的说加哪个包,最后应该是全部排除xml-apis的依赖才对)

这里只是个样例,可以使用mvn tree看看依赖,全部排除干净才行

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  version>2.7.1</version>
  <exclusions>
    <exclusion>
      <groupId>xml-apis</groupId>
      <artifactId>xml-apis</artifactId>
    </exclusion>
  </exclusions>
</dependency>

2. checkpoint 路径问题

Could not flush and close the file system output stream *** in order to obtain the stream state handle
java.lang.Exception: Could not materialize checkpoint 1 for operator

这类问题有2个地方需要修改

1,checkpoint地址
2,state.backend: filesystem

如果flink集群在配置中,配置了默认checkpoint地址.

那么flink程序在设置checkpoint的时候,要与集群配置的地址一致.

错误现象如下:

java.lang.Exception: Could not perform checkpoint 1 for operator abcSink1 -> Sink: abcSink2 (4/5).
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator abcSink1 -> Sink: abcSink2 (4/5).
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
        ... 8 more
Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://localhost:8020/data/flink/checkpoint/2c418a966ed84cb035bb0574cb7c8d13/chk-1/989695d0-f3d8-40a0-addf-e4d2cc367794 in order to obtain the stream state handle
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
        at org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
        at org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
        at org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
        at org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:358)
        ... 13 more
Caused by: java.io.IOException: DataStreamer Exception: 
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)

解决方案如下:

flink checkpoint 集群配置地址与 job的地址不一致,导致的问题

配置文件中的地址 state.backend.fs.checkpointdir: 地址

代码中的地址 env.setStateBackend(new FsStateBackend(地址))

这两个地方地址要一致

并且还要修改配置文件中的state.backend: 参数

默认是内存,如果使用hdfs得写成filesystem


3. oom问题

java.lang.OutOfMemoryError: GC overhead limit exceeded

java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:177)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:166)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

解决方案如下:

1.检查slot槽位够不够
2.程序起的并行是否都正常分配了(会有这样的情况出现,假如5个并行,但是只有2个在几点上生效了,另外3个没有数据流动)
3.集群资源够不够

内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!

相关课程