首页 > 代码库 > Durid(一): 原理架构

Durid(一): 原理架构

      Durid是在2013年底开源出来的,当前最新版本0.9.2, 主要解决的是对实时数据以及较近时间的历史数据的多维查询提供高并发(多用户),低延时,高可靠性的问题。对比Druid与其他解决方案,Kylin对数据按照分区每天构建前一天的cube数据提供给用户查询,用户查询的是历史数据。而Druid不断的从ingest去拉取数据,持续构建cube,提供实时查询,主要作者下面两位, 其中一位创建了一家公司继续发展druid (Impty.io)

技术分享          技术分享


目录:

  • Druid特性
  • 使用场景
  • Druid介绍
  • 角色功能
  • 架构介绍
  • Segment
  • 分布式集群

Durid特性


  • 亚秒级查询:druid提供了快速的聚合能力以及亚秒级的OLAP查询能力,多租户的设计,是面向用户分析应用的理想方式
  • 实时数据注入:druid支持流数据的注入,并提供了数据的事件驱动,保证在实时和离线环境下事件的实效性和统一性
  • 可扩展的PB级存储:druid集群可以很方便的扩容到PB的数据量,每秒百万级别的数据注入。即便在加大数据规模的情况下,也能保证时其效性
  • 多环境部署:druid既可以运行在商业的硬件上,也可以运行在云上。它可以从多种数据系统中注入数据,包括hadoop,spark,kafka,storm和samza等
  • 丰富的社区:druid拥有丰富的社区,供大家学习

使用场景


第一:适用于清洗好的记录实时录入,但不需要更新操作
第二:支持宽表,不用join的方式(换句话说就是一张单表)
第三:可以总结出基础的统计指标,可以用一个字段表示
第四:对时区和时间维度(year、month、week、day、hour等)要求高的(甚至到分钟级别)
第五:实时性很重要
第六:对数据质量的敏感度不高
第七:用于定位效果分析和策略决策参考

Druid介绍


  • Druid将数据集分成三种类型: Timestamp column, Dimension columns(过滤数据), Metric columns(聚合和计算)
    1. 事件的定义: 时间戳
    2. Dimensions: (things to filter on) 参与事件过滤
    3. Metrics: (things to aggregate over) 要聚合的字段
  • 以Ad中的点击数据产生的事件为例,Druid会将publisher,advertiser,gender,country当做维度列,click和price当做指标列,如下:
    timestamp             publisher          advertiser  gender  country  click  price
    2011-01-01T01:01:35Z  bieberfever.com    google.com  Male    USA      0      0.65
    2011-01-01T01:03:63Z  bieberfever.com    google.com  Male    USA      0      0.62
    2011-01-01T01:04:51Z  bieberfever.com    google.com  Male    USA      1      0.45
    2011-01-01T01:00:00Z  ultratrimfast.com  google.com  Female  UK       0      0.87
    2011-01-01T02:00:00Z  ultratrimfast.com  google.com  Female  UK       0      0.99
    2011-01-01T02:00:00Z  ultratrimfast.com  google.com  Female  UK       1      1.53
  • ruid读取数据的入口并不会直接存储原始数据, 而是使用Roll-up这种first-level聚合操作压缩原始数据,如下:
    timestamp             publisher          advertiser  gender country impressions clicks revenue
    2011-01-01T01:00:00Z  ultratrimfast.com  google.com  Male   USA     1800        25     15.70
    2011-01-01T01:00:00Z  bieberfever.com    google.com  Male   USA     2912        42     29.18
    2011-01-01T02:00:00Z  ultratrimfast.com  google.com  Male   UK      1953        17     17.31
    2011-01-01T02:00:00Z  bieberfever.com    google.com  Male   UK      3194        170    34.01
  •  用SQL表示类似于对时间撮和所有维度列进行分组,并以原始的指标列做常用的聚合操作

    GROUP BY timestamp, publisher, advertiser, gender, country
      :: impressions = COUNT(1),  clicks = SUM(click),  revenue = SUM(price)
  •  为什么不存原始数据? 因为原始数据量可能非常大,对于广告的场景,一秒钟的点击数是以千万计数. 如果能够在读取数据的同时就进行一点聚合运算,就可以大大减少数据量的存储.这种方式的缺点是不能查询单条事件,也就是你无法查到每条事件具体的click和price值了.由于后面的查询都将以上面的查询为基础,所以Roll-up的结果一定要能满足查询的需求.通常count和sum就足够了,因此Rollup的粒度是你能查询的数据的最小时间单位. 假设每隔1秒Rollup一次,后面的查询你最小只能以一秒为单位,不能查询一毫秒的事件.默认的粒度单位是ms.


  • Druid的分片是Segment文件. Druid首先总是以时间撮进行分片, 因为事件数据总是有时间撮. 假设以小时为粒度创建下面的两个Segment文件技术分享
  • 在几乎所有的NoSQL中都有数据分片的概念,比如ES的分片,HBase的Region,都表示的是数据的存储介质.为什么要进行分片,因为数据大了,不能都存成一个大文件吧,所以要拆分成小文件以便于快速查询. 伴随拆分通常都有合并小文件
  • 从Segment文件的名称可以看出它包含的数据一定是在文件名称对应的起始和结束时间间隔之内的
  • Segment文件名称的格式:dataSource_interval_version_partitionNumber.最后一个分区号是当同一个时间撮下数据量超过阈值要分成多个分区了
    1. 分片和分区都表示将数据进行切分. 分片是将不同时间撮分布在不同的文件中, 而分区是相同时间撮放不下了,分成多个分区
    2. 巧合的是Kafka中也有Segment和Partition的概念.Kafka的Partition是topic物理上的分组,一个topic可以分为多个partition,它的partition物理上由多个segment组成.即Partition包含Segment,而Druid是Segment包含Partition.

角色功能


  • realtime node: 负责监听输入流数据并使得数据在Druid系统中可以立即被使用. 实时节点响应Broker的查询请求并返回结果给Broker节点.过期的数据会被推送/转存到DeepStorage, 并通知ZooKeeper. 历史节点会接收到这个通知发现有新的Segments需要被加载/或被删除
    1. 实时处理可以使用标准的实时节点或者IndexService(两者的逻辑处理是一样的). 实时处理会读取数据,索引数据,创建Segments
    2. 并且将Segments转存到历史节点. 数据一旦被实时节点接收就可用于查询. 在实时节点转存到历史节点这段时间内,查询仍然是可用的
    3. 只有历史节点通知说已经成功转存,可以用于查询时, 实时节点之前保存的Segments才会被删除. 这份Segment数据现在存在于历史节点中了
  • historical node:是Druid集群的支柱,它会从DeepStorage下载不可变的Segments文件到本地,并加载Segment服务于查询请求.
    1. 历史节点是ShareNothing的架构(当掉任何一个节点都没有关系),它会负责加载Segments,删除Segments,并在Segment上查询数据
    2. 历史节点负责存储数据,并且查询历史的数据.历史节点会从DeepStorage下载Segments,对Broker的查询进行响应,返回结果给Broker.
    3. 历史节点会写入ZooKeeper通知自己(的节点信息)以及自己服务了/保存了哪些Segments.(正如HDFS的DN上保存了哪些Block会向NN通知)
    4. 历史节点还会从ZooKeeper中得到任务:加载新的Segments(从实时节点转存过来的)或者删除自己已经保存的旧的Segments
    5. 技术分享
    6. 每个历史节点维护和ZooKeeper的持续连接,新到来的Segment信息会写到ZK的节点中,历史节点会监听这个节点的配置
    7. 协调节点会负责分配新的Segment给历史节点: 在ZK中为分配到任务的历史节点的load queue path创建一个临时节点
    8. 历史节点之间没有相互联系,也不会和协调节点直接联系,而是通过ZK来完成任务的分发和任务的领取
  • coordinator node:管理历史节点上的Segments.协调节点告诉历史节点加载新的Segments,删除旧的Segments,或者移动Segments进行负载均衡.
    1. 协调节点监测历史节点的分组,确保数据是可用的,副本充足的
    2. 从元数据存储中读取Segment的元数据,并决定哪些Segments需要被加载到集群中
    3. 使用ZooKeeper查看已经存在的历史节点都有哪些
    4. 创建一个ZK的条目告诉历史节点加载加载或删除新的Segments
    5. 协调节点类似于Master,Master要负责管理,并指派任务给Worker执行

             协调节点对Segment的负载均衡(如果某个节点数据量太大不应该分配任务,而把任务分配给数据量小的节点)是如何实现的?

    1. 当新创建了一个Segment,协调节点还没有分配给历史节点时,它要选择一个适合的历史节点,而不是随便选择一个历史节点.
    2. 协调节点会将系统中可用的历史节点列表首先按照容量排序,最小容量的服务器将获得最高的优先级(不会让穷人越穷富人越富)

            当历史节点重启或不可用时,处理方式:

    1. 当历史节点重启或不可用时,协调节点会注意到节点丢失并认为这个节点服务的所有Segments都是dropped
    2. 如果给予一段充足的时间,这些segments也许会被重新分配给集群中的其他历史节点. 然后被丢弃的每一个Segment并不会被立即遗忘掉.
    3. 实际上存在一种transitional的数据结构用来存储所有丢弃的segments,并且关联了一个使用期
    4. 这个使用期表示一段时间内协调节点不会重新分配丢弃的segments. 因此如果一个历史节点下线后在很短的时间内又上线了,历史节点会直接从它的缓存中服务这些segments(这些segments都没有在跨越集群重新分配)
  • broker node: 负责将查询请求分发到历史节点和实时节点,并聚合这些节点的结果数据
    1. Broker节点知道Segment都存放在哪些节点上,Broker如何知道Segment的存放路径?
    2. 首先Broker查询实时节点的Segment,这些Segment都在实时节点的内存中
    3. 只有历史的数据(非实时的数据)在转存到历史节点之后, 历史节点就会通知ZK说自己已经成功地加载了某一个Segment
    4. 这样Broker只需要查询ZK就知道Segment都属于哪些节点,从而将请求转发到对应的历史节点,由历史节点去查询需要的数据.
    5. 通常在ShareNothing的架构中,如果一个节点变得不可用了,会有一个服务将下线的这个节点的数据搬迁到其他节点
    6. 但是如果这个节点下线后又立即重启,而如果服务在一下线的时候就开始搬迁数据,是会产生跨集群的数据传输,实际上是没有必要的
    7. 因为分布式文件系统对同一份数据会有多个副本,搬迁数据实际上是为了满足副本数.而下线又重启的节点上的数据不会有什么丢失的
    8. 因此短期的副本不足并不会影响整体的数据健康状况.何况跨机器搬迁数据也需要一定的时间,何不如给定一段时间如果它真的死了,才开始搬迁.

                补充:

    1.  在实时节点启动的时候, 实时节点会将自己,以及它自己服务了哪些Segments的信息都发布到ZK中
    2. Broker可以从发布到ZK的元数据了解到哪些Segments存在于哪个节点上,并且会路由查询请求,命中到正确的节点.
    3. 一般的Druid查询会指定包含的时间间隔,表示要请求的数据是在这个时间段之内的. 与此同时,Druid的Segments也是以指定的时间间隔粒度分区存储的.
      并且这些Segments分布在集群的各个节点. 如果查询的时间跨度比存储的时间粒度要大,则查询会命中不止一个Segment,因此会命中多个节点
    4. 为了决定请求分发到哪些节点上,Broker会首先构建一个ZooKeeper的视图信息.在ZK中维护了历史节点和实时节点以及他们服务的Segments
    5. 对于ZK中的每个数据源,Broker会构建Segment的时间线,以及服务这些Segment的节点. 当查询指定数据库和时间段,Broker会寻找时间线中
      符合数据源和时间段的Segment,并获取包含这次查询数据的节点, 最后Broker会将查询分发到这些节点上去.
    6. Broker节点对于缓存使用了LRU失效策略. Broker的缓存会保存每个Segment的结果. 缓存可以是本地的(针对Broker节点)或者分布式比如MemCached.
      Broker接收到每一次查询,首先将本次查询映射到一系列的Segments. 这些Segments集合的子集可能已经存在于缓存中,它们的结果可以直接从缓存中获取.
    7. 对于没有存在于缓存中的Segments结果,Broker节点会将查询分发到历史节点.当历史节点返回结果,Broker就会结果也缓存到Cache中.
      注意实时节点的Segments永远不会被缓存,所以实时数据的查询总是会被分发到实时节点.因为实时数据总是在变化,缓存实时数据是不可靠的

架构介绍


  • Druid 由上面介绍的角色组成的构架图:
  • 技术分享
  • 查询路径:红色箭头:①客户端向Broker发起请求,Broker会将请求路由到②实时节点和③历史节点
  • Druid数据流转:黑色箭头:数据源包括实时流和批量数据. ④实时流经过索引直接写到实时节点,⑤批量数据通过IndexService存储到DeepStorage,⑥再由历史节点加载. ⑦实时节点也可以将数据转存到DeepStorage

  • Druid的集群依赖了ZooKeeper来维护数据拓扑. 每个组件都会与ZooKeeper交互,如下:
  • 技术分享
  • 实时节点在转存Segment到DeepStorage, 会写入自己转存了什么Segment
  • 协调节点管理历史节点,它负责从ZooKeeper中获取要同步/下载的Segment,并指派任务给具体的历史节点去完成
  • 历史节点从ZooKeeper中领取任务,任务完成后要将ZooKeeper条目删除表示完成了任务
  • Broker节点根据ZooKeeper中的Segment所在的节点, 将查询请求路由到指定的节点
  • 对于一个查询路由路径,Broker只会将请求分发到实时节点和历史节点, 因此元数据存储和DeepStorage都不会参与查询中(看做是后台的进程).

    MetaData Storage 与 Zookeeper


  • MetaStore和ZooKeeper中保存的信息是不一样的. ZooKeeper中保存的是Segment属于哪些节点. 而MetaStore则是保存Segment的元数据信息
  • 为了使得一个Segment存在于集群中,MetaStore存储的记录是关于Segment的自描述元数据: Segment的元数据,大小,所在的DeepStorage
  • 元数据存储的数据会被协调节点用来知道集群中可用的数据应该有哪些(Segment可以通过实时节点转存或者批量数据直接写入).

  • 除了上面介绍的节点角色外,Druid还依赖于外部的三个组件:ZooKeeper, Metadata Storage, Deep Storage,数据与查询流的交互图如下:
  • 技术分享
  • ① 实时数据写入到实时节点,会创建索引结构的Segment.
  • ② 实时节点的Segment经过一段时间会转存到DeepStorage
  • ③ 元数据写入MySQL; 实时节点转存的Segment会在ZooKeeper中新增一条记录
  • ④ 协调节点从MySQL获取元数据,比如schema信息(维度列和指标列)
  • ⑤ 协调节点监测ZK中有新分配/要删除的Segment,写入ZooKeeper信息:历史节点需要加载/删除Segment
  • ⑥ 历史节点监测ZK, 从ZooKeeper中得到要执行任务的Segment
  • ⑦ 历史节点从DeepStorage下载Segment并加载到内存/或者将已经保存的Segment删除掉
  • ⑧ 历史节点的Segment可以用于Broker的查询路由 

 


  • 由于各个节点和其他节点都是最小化解耦的, 所以下面两张图分别表示实时节点和批量数据的流程:
  • 技术分享
  • 数据从Kafka导入到实时节点, 客户端直接查询实时节点的数据.
  • 技术分享
  •  批量数据使用IndexService,接收Post请求的任务,直接产生Segment写到DeepStorage里.DeepStorage中的数据只会被历史节点使用.
    所以这里要启动的服务有: IndexService(overlord), Historical, Coordinator(协调节点通知历史节点下载Segment),

segment


  • 数据进入到Druid首先会进行索引, 这给予了Druid一个机会可以进行分析数据, 添加索引结构, 压缩, 为查询优化调整存储结构
    1. 转换为列式结构
    2. 使用BitMap索引
    3. 使用不同的压缩算法
  • 索引的结果是生成Segment文件,Segment中除了保存不同的维度和指标,还保存了这些列的索引信息
  • Druid将索引数据保存到Segment文件中,Segment文件根据时间进行分片. 最基本的设置中, 每一个时间间隔都会创建一个Segment文件
  • 这个时间间隔的长度配置在granularitySpec的segmentGranularity参数.为了Druid工作良好,通常Segment文件大小为300-700M
  • 前面Roll-up时也有一个时间粒度:queryGranularity指的是在读取时就进行聚合.segmentGranularity则是用于分片进来之后的数据.

分布式集群(测试)


  • 2台 Kafka
  • 2台realtime节点,并安装了zookeeper节点
  • 1台broker节点
  • 3台historical节点,这几台机器比较独立
  • 1台coordinator节点,并安装了mysql服务,zookeeper节点
  • deep storage -> hdfs

Durid(一): 原理架构