首页 > 代码库 > Easticsearch 数据迁移至influxdb【python】
Easticsearch 数据迁移至influxdb【python】
Easticsearch 数据迁移至influxdb python
需求:将Easticsearch部分数据迁移至influxdb中。
见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。
解决方案:大批量从Easticsearch取数据,两种方案。1.from...size 2.scroll (类似于数据库的游标) 脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。
#!/usr/bin/env python #coding=utf-8 import sys import json import datetime import elasticsearch from influxdb import InfluxDBClient #连接Easticsearch class ES(object): @classmethod def connect_host(cls): url = "http://192.168.121.33:9202/" es = elasticsearch.Elasticsearch(url,timeout=120) return es es = ES.connect_host() #连接influxdb client = InfluxDBClient(host="192.168.121.33", port="8086", username=‘admin‘, password=‘admin‘, database=‘esl‘) client.create_database(‘esl‘) #DSL查询语法 data = { "query": { "match_all" : {}}, "size": 100 } # 设置要过滤返回的字段值,要什么字段。 ‘hits.hits._source.resource_id‘, ‘hits.hits._source.timestamp‘, ‘hits.hits._source.counter_volume‘, ‘hits.hits._source.@timestamp‘, ] # 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 res = es.search( index=‘pipefilter_meters*‘, doc_type =‘canaledge.flow.bytes‘, body=data, search_type="scan", scroll="10m" ) scroll_id = res[‘_scroll_id‘] response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,) scroll_id = response[‘_scroll_id‘] #获取第二次scroll_id hits = response[‘hits‘][‘hits‘] in_data = [] while len(hits) > 0: for i in hits: res_id = i[‘_source‘][‘resource_id‘] r_id, r_type = res_id.split(‘:‘) datas = { "measurement": "es_net", "tags": { "resource_id": r_id, "type": r_type }, "time": i[‘_source‘][‘timestamp‘], "fields": { "counter_volume": i[‘_source‘][‘counter_volume‘] } } in_data.append(datas) #循环写入influxdb client.write_points(in_data) in_data = [] #每次循环完重新定义列表为空 data = { "query": { "match_all" : {}}, "size": 100 } ## 设置要过滤返回的字段值,要什么字段。 ‘_scroll_id‘, ‘hits.hits._source.resource_id‘, ‘hits.hits._source.timestamp‘, ‘hits.hits._source.counter_volume‘, ‘hits.hits._source.@timestamp‘, ] ## 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,) #调试 #if not response.get(‘hits‘): # print response # sys.exit(1) #else: hits = response[‘hits‘][‘hits‘] scroll_id = response["_scroll_id"] #获取第三次scroll_id
本文出自 “生锈的老枪_技术博客” 博客,转载请与作者联系!
Easticsearch 数据迁移至influxdb【python】
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。