《小象训练营》第一期 扫二维码继续学习 二维码时效为半小时

(2评价)
价格: 9999.00元
flume 向kafka写数据失败。
suhuadong发起了该问答 ·2015-08-26
0
回复
8362
浏览

flume的配置:

kafka-pull-agent.sources = log_source
kafka-pull-agent.channels = log_channel
kafka-pull-agent.sinks = log_sink

kafka-pull-agent.sources.log_source.type = spooldir
kafka-pull-agent.sources.log_source.channels = log_channel
kafka-pull-agent.sources.log_source.spoolDir = /home/git_projects/bootcamp/practise/sogouquery/data/generated
kafka-pull-agent.sources.log_source.fileHeader = true
kafka-pull-agent.sources.log_source.inputCharset=GB18030
kafka-puss-agent.sources.log_source.deserializer.outputCharset = UTF-8


kafka-pull-agent.channels.log_channel.type = file
kafka-pull-agent.channels.log_channel.checkpointDir = /mnt/flume/checkpoint
kafka-pull-agent.channels.log_channel.dataDirs = /mnt/flume/data
kafka-pull-agent.channels.log_channel.capacity = 100000000 


kafka-pull-agent.sinks.log_sink.type = org.apache.flume.sink.kafka.KafkaSink
kafka-pull-agent.sinks.log_sink.topic = search-query
kafka-pull-agent.sinks.log_sink.brokerList = SU-2:9092,SU-3:9092,SU-4:9092
kafka-pull-agent.sinks.log_sink.requiredAcks = 1
kafka-pull-agent.sinks.log_sink.batchSize = 20

kafka-pull-agent.sinks.log_sink.channel = log_channel
kafka-pull-agent.sources.log_source.channel = log_channel

 

flume 日志报错:

6 Aug 2015 17:43:16,984 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Fetching metadata from broker id:0,host:SU-2,port:9092 with correlation id 37404 for 1 topic(s) Set(search-query)
26 Aug 2015 17:43:16,985 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Connected to SU-2:9092 for producing
26 Aug 2015 17:43:16,985 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Disconnecting from SU-2:9092
26 Aug 2015 17:43:16,986 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata     partition 0   leader: none     replicas:       isr:    isUnderReplicated: false for topic partition [search-query,0]: [class kafka.common.LeaderNotAvailableException]
26 Aug 2015 17:43:16,986 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.error:97)  - Failed to send requests for topics search-query with correlation ids in [37397,37404]
26 Aug 2015 17:43:16,986 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:139)  - Failed to publish events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
26 Aug 2015 17:43:16,986 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:150)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        ... 3 more
26 Aug 2015 17:43:21,987 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:83)  - Failed to collate messages by topic,partition due to: No leader for any partition in topic search-query
26 Aug 2015 17:43:21,987 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying send. Remaining retries = 3
26 Aug 2015 17:43:21,989 INFO  [Log-BackgroundWorker-log_channel] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:230)  - Start checkpoint for /mnt/flume/checkpoint/checkpoint, elements to sync = 20
26 Aug 2015 17:43:22,003 INFO  [Log-BackgroundWorker-log_channel] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:255)  - Updating checkpoint metadata: logWriteOrderID: 1439864425839, queueSize: 58780, queueHead: 14704
26 Aug 2015 17:43:22,017 INFO  [Log-BackgroundWorker-log_channel] (org.apache.flume.channel.file.Log.writeCheckpoint:1016)  - Updated checkpoint for file: /mnt/flume/data/log-2 position: 17860541 logWriteOrderID: 1439864425839
26 Aug 2015 17:43:22,088 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Fetching metadata from broker id:2,host:SU-4,port:9092 with correlation id 37406 for 1 topic(s) Set(search-query)
26 Aug 2015 17:43:22,089 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.error:103)  - Producer connection to SU-4:9092 unsuccessful
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
26 Aug 2015 17:43:22,089 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:89)  - Fetching topic metadata with correlation id 37406 for topics [Set(search-query)] from broker [id:2,host:SU-4,port:9092] failed
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:129)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
26 Aug 2015 17:43:22,089 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Fetching metadata from broker id:1,host:SU-3,port:9092 with correlation id 37406 for 1 topic(s) Set(search-query)
26 Aug 2015 17:43:22,089 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Connected to SU-3:9092 for producing
26 Aug 2015 17:43:22,090 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Disconnecting from SU-3:9092
26 Aug 2015 17:43:22,090 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata     partition 0   leader: none     replicas:       isr:    isUnderReplicated: false for topic partition [search-query,0]: [class kafka.common.LeaderNotAvailableException]
26 Aug 2015 17:43:22,091 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:83)  - Failed to collate messages by topic,partition due to: No leader for any partition in topic search-query
26 Aug 2015 17:43:22,091 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying send. Remaining retries = 2
26 Aug 2015 17:43:22,191 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Fetching metadata from broker id:0,host:SU-2,port:9092 with correlation id 37408 for 1 topic(s) Set(search-query)
26 Aug 2015 17:43:22,192 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Connected to SU-2:9092 for producing
26 Aug 2015 17:43:22,193 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Disconnecting from SU-2:9092
26 Aug 2015 17:43:22,193 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata     partition 0   leader: none     replicas:       isr:    isUnderReplicated: false for topic partition [search-query,0]: [class kafka.common.LeaderNotAvailableException]
26 Aug 2015 17:43:22,193 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.warn:83)  - Failed to collate messages by topic,partition due to: No leader for any partition in topic search-query
26 Aug 2015 17:43:22,193 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (kafka.utils.Logging$class.info:68)  - Back off for 100 ms before retrying send. Remaining retries = 1

 

 

 

kafka的日志如下:

SU-2这台机器

 tail -500 state-change.log

[2015-08-23 15:20:55,296] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:1),AllReplicas:2) for partition [search-query,0] in response to UpdateMetadata request sent by controller 2 epoch 12 with correlation id 58 (state.change.logger)
[2015-08-23 15:23:14,339] TRACE Controller 0 epoch 13 started leader election for partition [search-query,0] (state.change.logger)
[2015-08-23 15:23:14,367] ERROR Controller 0 epoch 13 initiated state change for partition [search-query,0] from OfflinePartition to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [search-query,0] is alive. Live brokers are: [Set(0, 1)], Assigned replicas are: [List(2)]
        at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
        at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357)
        at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
        at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
        at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:314)
        at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
        at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:81)
        at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:139)
        at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
        at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
        at kafka.utils.Utils$.inLock(Utils.scala:535)
        at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2015-08-23 15:23:14,388] TRACE Controller 0 epoch 13 sending UpdateMetadata request (Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:12) with correlationId 2 to broker 1 for partition [search-query,0] (state.change.logger)
[2015-08-23 15:23:14,388] TRACE Controller 0 epoch 13 sending UpdateMetadata request (Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:12) with correlationId 2 to broker 0 for partition [search-query,0] (state.change.logger)
[2015-08-23 15:23:14,397] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:1),AllReplicas:2) for partition [search-query,0] in response to UpdateMetadata request sent by controller 0 epoch 13 with correlation id 2 (state.change.logger)
[2015-08-23 15:23:14,420] TRACE Controller 0 epoch 13 received response UpdateMetadataResponse(2,0) for a request sent to broker id:1,host:SU-3,port:9092 (state.change.logger)
[2015-08-23 15:23:14,420] TRACE Controller 0 epoch 13 received response UpdateMetadataResponse(2,0) for a request sent to broker id:0,host:SU-2,port:9092 (state.change.logger)

 

SU-4这台机器,

 tail -200 server.log
[2015-08-18 22:27:06,814] ERROR Closing socket for /10.116.41.239 because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at kafka.utils.Utils$.read(Utils.scala:380)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:444)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)

发表回复
你还没有登录,请先登录注册