首页 > 代码库 > Kafka+Flume+Morphline+Solr+Hue数据组合索引
Kafka+Flume+Morphline+Solr+Hue数据组合索引
背景:Kafka消息总线的建成,使各个系统的数据得以在kafka节点中汇聚,接下来面临的任务是最大化数据的价值,让数据“慧”说话。
环境准备:
Kafka服务器*3。
CDH 5.8.3服务器*3,安装Flume,Solr,Hue,HDFS,Zookeeper服务。
Flume提供了可扩展的实时数据传输通道,Morphline提供了轻量级的ETL功能,SolrCloud+Hue提供了高性能搜索引擎和多样的数据展现形式。
一.环境安装(略)
二.修改CDH默认配置:
1.在Flume配置界面配置Flume依赖Solr。
2.在Solr配置界面配置Solr使用Zookeeper存储配置文件,使用HDFS存储索引文件。
3.在Hue配置界面配置Hue依赖Solr
4.配置Hue界面可以被外网访问。
三.按场景配置各CDH服务及开发代码。
Kafka Topic: eventCount
Topic数据格式:
{ "timestamp": "1481077173000", "accountName": "旺小宝", "tagNames": [ "incoming" ], "account": "WXB", "eventType": "phone", "eventTags": [ { "value": 1, "name": "incoming" } ]}
1.Solr创建对应Collection。
1)登录任意CDH节点。生成collection配置文件骨架。
$ solrctl instancedir --generate $HOME/solr_configs
2)找到文件夹中的schema.xml文件,修改collection的schema。
第一步:修改filed。schema.xml中预定义了很多filed,除了name=id,_root_,_version_不能去掉之外,其他的全部可以去掉。field对应的是json中需要被索引的字段。
(Notice:json中的timestamp对应的是下面的eventTime,而下面的timestamp是flume接受kafka数据的时间。这是通过Morphline配置实现的转换)
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <!-- points to the root document of a block of nested documents. Required for nested document support, may be removed otherwise --> <field name="_root_" type="string" indexed="true" stored="false"/> <field name="account" type="string" indexed="true" stored="true"/> <field name="accountName" type="string" indexed="true" stored="true"/> <field name="subaccount" type="string" indexed="true" stored="true"/> <field name="subaccountName" type="string" indexed="true" stored="true"/> <field name="eventTime" type="tlong" indexed="false" stored="true"/> <field name="eventType" type="string" indexed="true" stored="true"/> <field name="eventTags" type="string" indexed="true" stored="true" multiValued="true"/> <field name="_attachment_body" type="string" indexed="false" stored="true"/> <field name="timestamp" type="tlong" indexed="false" stored="true"/> <field name="_version_" type="long" indexed="true" stored="true"/>
第二步:去掉所有copy field。
第三步:添加动态字段dynamicFiled。
<dynamicField name="tws_*" type="text_ws" indexed="true" stored="true" multiValued="true"/>
3) 上传配置,创建collection
$ solrctl instancedir --create event_count_records solr_configs$ solrctl collection --create event_count_records -s 3 -c event_count_records
2.Flume配置
创建一个新的角色组kafka2solr,修改代理名称为kafka2solr,并为该角色组分配服务器。
# 配置 source channel sink 的名字kafka2solr.sources = source_from_kafkakafka2solr.channels = mem_channelkafka2solr.sinks = solrSink# 配置Source类别为kafkakafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSourcekafka2solr.sources.source_from_kafka.channels = mem_channelkafka2solr.sources.source_from_kafka.batchSize = 100kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092kafka2solr.sources.source_from_kafka.kafka.topics = eventCountkafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_callerkafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest#配置channel type为memory,通常生产环境中设置为file或者直接用kafka作为channelkafka2solr.channels.mem_channel.type = memorykafka2solr.channels.mem_channel.keep-alive = 60 # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel kafka2solr.channels.mem_channel.capacity = 10000 kafka2solr.channels.mem_channel.transactionCapacity = 3000 # 配置sink到solr,并使用morphline转换数据kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSinkkafka2solr.sinks.solrSink.channel = mem_channelkafka2solr.sinks.solrSink.morphlineFile = morphlines.confkafka2solr.sinks.solrSink.morphlineId=morphline1kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true
3.Flume-NG的Solr接收器配置
SOLR_LOCATOR : { # Name of solr collection collection : event_count_records # ZooKeeper ensemble
#CDH的专有写法,开源版本不支持。 zkHost : "$ZK_HOST" }morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { #Flume传过来的kafka的json数据是用二进制流的形式,需要先读取json readJson{}}{ #读出来的json字段必须转换成filed才能被solr索引到extractJsonPaths { flatten:true paths:{account:/accountaccountName:/accountNamesubaccount:/subaccountsubaccountName:/subaccountNameeventTime:/timestampeventType:/eventTypeeventTags:"/eventTags[]/name"#按分钟存timestampeventTimeInMinute_tdt:/timestamp#按小时存timestampeventTimeInHour_tdt:/timestamp#按天存timestampeventTimeInDay_tdt:/timestamp#_tdt后缀会被动态识别为日期类型的索引字段#按不同时间间隔存索引以增加查询性能} }}#转换long型时间为Date格式{convertTimestamp { field : eventTimeInMinute_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z/MINUTE‘" outputTimezone : Asia/Shanghai}}{convertTimestamp { field : eventTimeInHour_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z/HOUR‘" outputTimezone : Asia/Shanghai}}{convertTimestamp { field : eventTimeInDay_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z/DAY‘" outputTimezone : Asia/Shanghai}}#kafka中的json数据传到flume中时会被放入_attachment_body字段,readJson后会变成JsonNode对象,需要toString之后才能保存{toString { field : _attachment_body }}#为每一条记录生成一个UUID{generateUUID { field : id}}#对未定义的Solr字段加tws前缀,根据schema.xml中定义的tws_*为text_ws类型,会动态未未定义的字段建索引。 { sanitizeUnknownSolrFields { # Location from which to fetch Solr schema solrLocator : ${SOLR_LOCATOR} renameToPrefix:"tws_" } } #将数据导入到solr中 {loadSolr {solrLocator : ${SOLR_LOCATOR}}} ] }]
重启被影响的Flume节点,数据开始导入solr。
3.通过Hue查询Solr中的数据。
见Solr+Hue实战。
Kafka+Flume+Morphline+Solr+Hue数据组合索引