首页 > 代码库 > 实时事件统计项目:优化flume:用file channel代替mem channel
实时事件统计项目:优化flume:用file channel代替mem channel
背景:利用kafka+flume+morphline+solr做实时统计。
solr从12月23号开始一直没有数据。查看日志发现,因为有一个同事加了一条格式错误的埋点数据,导致大量error。
据推断,是因为使用mem channel占满,消息来不及处理,导致新来的数据都丢失了。
修改flume使用file channel:
kafka2solr.sources = source_from_kafkakafka2solr.channels = file_channelkafka2solr.sinks = solrSink# For each one of the sources, the type is defined kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSourcekafka2solr.sources.source_from_kafka.channels = file_channelkafka2solr.sources.source_from_kafka.batchSize = 100kafka2solr.sources.source_from_kafka.useFlumeEventFormat=falsekafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092kafka2solr.sources.source_from_kafka.kafka.topics = eventCountkafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_callerkafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest# file channel kafka2solr.channels.file_channel.type = filekafka2solr.channels.file_channel.checkpointDir = /var/log/flume-ng/checkpointkafka2solr.channels.file_channel.dataDirs = /var/log/flume-ng/datakafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSinkkafka2solr.sinks.solrSink.channel = file_channel#kafka2solr.sinks.solrSink.batchSize = 1000#kafka2solr.sinks.solrSink.batchDurationMillis = 1000kafka2solr.sinks.solrSink.morphlineFile = morphlines.confkafka2solr.sinks.solrSink.morphlineId=morphline1kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true
使得数据持久化到磁盘不会丢失。
实时事件统计项目:优化flume:用file channel代替mem channel
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。