首页 > 代码库 > kafka + spark Streaming + Tranquility Server发送数据到druid
kafka + spark Streaming + Tranquility Server发送数据到druid
花了很长时间尝试druid官网上说的Tranquility嵌入代码进行实时发送数据到druid,结果失败了,各种各样的原因造成了失败,现在还没有找到原因,在IDEA中可以跑起,放到线上就死活不行,有成功了的同仁希望贴个链接供我来学习学习;后来又尝试了从kafka实时发送到druid,还是有些错误,感觉不太靠谱;最后没办法呀,使用Tranquility Server呗 _ _!
Tranquility Server的配置和启动请移步:https://github.com/druid-io/tranquility/blob/master/docs/server.md
(一)、在启动了自己定制的server之后可以利用druid bin目录下的generate-example-metrics生成测试数据 (定制的server.json如下)
server.json的配置
{ "dataSources" : { "zcx_metrics" : { "spec" : { "dataSchema" : { "dataSource" : "reynold", "parser" : { "type" : "string", "parseSpec" : { "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { "dimensions" : [], "dimensionExclusions" : [ "timestamp", "value" ] }, "format" : "json" } }, "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "hour", "queryGranularity" : "none" }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "name" : "value_sum", "type" : "doubleSum", "fieldName" : "value" }, { "fieldName" : "value", "name" : "value_min", "type" : "doubleMin" }, { "type" : "doubleMax", "name" : "value_max", "fieldName" : "value" } ] }, "ioConfig" : { "type" : "realtime" }, "tuningConfig" : { "type" : "realtime", "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } }, "properties" : { "task.partitions" : "1", "task.replicants" : "1" } } }, "properties" : { "zookeeper.connect" : "tagtic-master:2181,tagtic-slave02:2181,tagtic-slave03:2181", "druid.discovery.curator.path" : "/druid/discovery", "druid.selectors.indexing.serviceName" : "druid/overlord", "http.port" : "8200", "http.threads" : "16" } }
(二)、创建kafka的topic并往里面发送数据
删除topic:kafka-topics --delete --topic reynold --zookeeper localhost:2181 创建topic:kafka-topics --create --topic reynold --zookeeper localhost:2181 --partitions 10 --replication-factor 1 消费数据:kafka-console-consumer --topic reynold --zookeeper localhost:2181 --from-beginning 生产数据:kafka-console-producer --broker-list tagtic-master:9092 --topic reynold
{"count": 1, "value_min": 74.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 74.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 74.0, "http_code": "200", "unit": "milliseconds", "page": "/"}
{"count": 1, "value_min": 75.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 75.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 75.0, "http_code": "200", "unit": "milliseconds", "page": "/list"}
{"count": 1, "value_min": 143.0, "timestamp": "2017-03-09T02:38:06.000Z", "value_max": 143.0, "metricType": "request/latency", "server": "www2.example.com", "http_method": "GET", "value_sum": 143.0, "http_code": "200", "unit": "milliseconds", "page": "/"}
(三)、使用spark streaming消费kafka中的数据并通过http发送到druid
object SparkDruid { val kafkaParam = Map[String, String]( "metadata.broker.list" -> "tagtic-master:9092,tagtic-slave01:9092,tagtic-slave02:9092,tagtic-slave03:9092", "auto.offset.reset" -> "smallest" ) def main(args: Array[String]): Unit = { val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("SparkDruidBeam")) val ssc = new StreamingContext(sparkContext, Seconds(3)) val topic: String = "reynold" //消费的 topic 名字 val topics: Set[String] = Set(topic) //创建 stream 时使用的 topic 名字集合 var kafkaStream: InputDStream[(String, String)] = null kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) kafkaStream.map(msg => msg._2).foreachRDD { rdd => rdd.foreach(strJson => Https.post("http://tagtic-master:8200/v1/post/zcx_metrics", strJson)) } ssc.start() ssc.awaitTermination() } }
Https类如下:
kafka + spark Streaming + Tranquility Server发送数据到druid