« 上一篇下一篇 »

分布式高容错的体系架构Druid,10亿级数据实时查询处理能力的开源数据交互系统

 前言

    大数据时代,我们不仅仅要从网络上获得大量的数据基础,将它分析,归类,总结,而且还要将它以图像,动画的形式把它可视化,以最简单直观的方式呈现在受众面前。只有这样才能够适应今天这样高效率高节奏的工作及生活状态。

    为了实现这个,我们的工程师可谓花了大量的心思。Druid是一个拥有大数据实时查询和分析的高容错、高性能开源分布式系统,旨在快速处理大规模的数据,并能够实现快速查询和分析。尤其是当发生代码部署、机器故障以及其他产品系统遇到宕机等情况时,Druid仍然能够保持100%正常运行。创建Druid的最初意图主要是为了解决查询延时问题,当时试图使用hadoop来实现交互式查询分析,但是很难满足实时分析的需要。而Druid提供了以交互方式访问数据的能力,并权衡了查询的灵活性和性能二采取了特殊的存储格式。Druid允许以类似Dremel和PowerDrill的方式进行单表查询,同时还增加了一些新特性,如为局部嵌套数据结构提供列式存储格式、为快速过滤做索引、实时摄取和查询、高容错的分布式体系架构等。

     大数据技术从最早的Hadoop项目开始已经有十多年的历史了,而Druid是在2013年年底才开源的,虽然目前还不是Apache顶级项目,但是作为后起之秀,依然吸引了大量用户的目光,社区也非常活跃。那么,为什么会有Druid,而Druid又解决了传统大数据处理框架下的哪些“痛点”问题,下面我们来一一解答。
大数据时代,如何从海量数据中提取有价值的信息,是一个亟待解决的难题。针对这个问题,IT巨头们已经开

     发了大量的数据存储与分析类产品,比如IBM Netezza、HP Vertica、EMC GreenPlum等,但是他们大多是昂贵的商业付费类产品,业内使用者寥寥。
而受益于近年来高涨的开源精神,业内出现了众多优秀的开源项目,其中最有名的当属Apache Hadoop生态圈。时至今日,Hadoop已经成为了大数据的“标准”解决方案,但是,人们在享受Hadoop便捷数据分析的同时,也必须要忍受Hadoop在设计上的许多“痛点”,下面就罗列三方面的问题:

     Druid 是一个为在大数据集之上做实时统计分析而设计的开源数据存储。这个系统集合了一个面向列存储的层,一个分布式、shared-nothing的架构,和一个高级的索引结构,来达成在秒级以内对十亿行级别的表进行任意的探索分析。以下将详细阐述Druid的架构,如何支持快速聚合、灵活的过滤、和低延迟数据导入。Druid是一个用于大数据实时查询和分析的高容错、高性能开源分布式系统,旨在快速处理大规模的数据,并能够实现快速查询和分析。
在最近几年,互联网技术的快速增长已经产生了大量由机器产生的数据。单独来看,这些数据包含很少的有用信息,价值都是很低的。从这些巨大的数据里面分析出有用的信息需要大量的时间和资源。

   作为解决方案之一,Google推出了MapReduce,应运而生的是Hadoop。Hadoop擅长的是存储和大规模MapReduce处理离线数据,但是它并不提供任何性能上的保证它能多快获取到数据。此外,虽然Hadoop是一个高可用的系统,但是在高并发负载下性能会下降。最后,Hadoop对于存储数据可以工作得很好,但是并没有对数据导入进行优化,使导入的数据立即可读。Hadoop是一个很好的后端、批量处理和数据仓库系统。然而Hadoop却无法在高并发环境下(1000+用户)保证查询性能和数据可用性和提供产品级别的保证。

 
传统的流式处理与批式处理存在以下问题:
1.大数据量查询速度慢
2.大规模集群构建成本高
3.多维度交叉计算能力差
4.流式计算无法回溯

   
     而受益于近年来高涨的开源精神,业内出现了众多优秀的开源项目,其中最有名的当属Apache Hadoop生态圈。时至今日,Hadoop已经成为了大数据的“标准”解决方案,但是,人们在享受Hadoop便捷数据分析的同时,也必须要忍受Hadoop在设计上的许多“痛点”,下面就罗列三方面的问题:
1. 何时能进行数据查询?对于Hadoop使用的Map/Reduce批处理框架,数据何时能够查询没有性能保证。
2. 随机IO问题。Map/Reduce批处理框架所处理的数据需要存储在HDFS上,而HDFS是一个以集群硬盘作为存储资源池的分布式文件系统,那么在海量数据的处理过程中,必然会引起大量的读写操作,此时随机IO就成为了高并发场景下的性能瓶颈。
3. 数据可视化问题。HDFS是一个优秀的分布式文件系统,但是对于数据分析以及数据的即席查询,HDFS并不是最优的选择。
传统的大数据处理架构Hadoop更倾向于一种“后台批处理的数据仓库系统”,其作为海量历史数据保存、冷数据分析,确实是一个优秀的通用解决方案,但是如何保证高并发环境下海量数据的查询分析性能,以及如何实现海量实时数据的查询分析与可视化,Hadoop确实显得有些无能为力。
    于是Druid出现了,一个开源的、分布式、列存储、实时分析的数据存储。在许多方面,Druid和其他OLAP系统有很多相似之处,交互式查询系统,内存数据库(MMDB),众所周知的分布式数据存储。其中的分布式和查询模型都参考了当前的一些搜索引擎的基础架构。Druid是一个为大型冷数据集上实时探索查询而设计的开源数据分析和存储系统,提供极具成本效益并且永远在线的实时数据摄取和任意数据处理。
   Druid的母公司MetaMarket在2011年以前也是Hadoop的拥趸者,但是在高并发环境下,Hadoop并不能对数据可用性以及查询性能给出产品级别的保证,使得MetaMarket必须去寻找新的解决方案,当尝试使用了各种关系型数据库以及NoSQL产品后,他们觉得这些已有的工具都不能解决他们的“痛点”,所以决定在2011年开始研发自己的“轮子”Druid,他们将Druid定义为“开源、分布式、面向列式存储的实时分析数据存储系统”,所要解决的“痛点”也是上文中反复提及的“在高并发环境下,保证海量数据查询分析性能,同时又提供海量实时数据的查询、分析与可视化功能”。
Druid的创建允许获取大量很少变化的数据集,Druid设计的主要目的是作为一种服务面对代码部署、机器故障或者其他不可预测的事件发生的生产系统,能够保证100%的正常运行。

主要特性:
• 为分析而设计——Druid是为OLAP工作流的探索性分析而构建。它支持各种filter、aggregator和查询类型,并为添加新功能提供了一个框架。用户已经利用Druid的基础设施开发了高级K查询和直方图功能。
• 交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为Druid的查询延时通过只读取和扫描有必要的元素被优化。Aggregate和 filter没有坐等结果。
• 高可用性——Druid是用来支持需要一直在线的SaaS的实现。你的数据在系统更新时依然可用、可查询。规模的扩大和缩小不会造成数据丢失。
• 可伸缩——现有的Druid部署每天处理数十亿事件和TB级数据。Druid被设计成PB级别。亚秒级的OLAP查询分析。Druid采用了列式存储、倒排索引、位图索引等关键技术,能够在亚秒级别内完成海量数据的过滤、聚合以及多维分析等操作。
• 实时流数据分析。区别于传统分析型数据库采用的批量导入数据进行分析的方式,Druid提供了实时流数据分析,采用LSM(Long structure merge)-Tree结构使Druid拥有极高的实时写入性能;同时实现了实时数据在亚秒级内的可视化。
• 丰富的数据分析功能。针对不同用户群体,Druid提供了友好的可视化界面、类SQL查询语言以及REST 查询接口。
• 高可用性与高可拓展性。Druid采用分布式、SN(share-nothing)架构,管理类节点可配置HA,工作节点功能单一,不相互依赖,这些特性都使得Druid集群在管理、容错、灾备、扩容等方面变得十分简单。

Druid现在可以做类似Dremel和PowerDrill的单表查询。同时增加了一些新特性:
1.为局部嵌套数据结构提供列式存储格式
2.分布式查询树模型
3.为快速过滤做索引
4.实时摄入数据和查询(摄入的数据是立即可用于查询)
5.高容错的分布式体系架构

    至于和其他系统的比较,Druid功能介于Dremel和PowerDrill之间,Druid实现了几乎Dremel的所有功能(Dremel可以处理任意嵌套数据结构,而Druid只能处理单一的数组类型),从PowerDrill借鉴了一下比较实用的数据布局和压缩方法。
Druid是一个大数据流、单一的数据摄取产品的不错的选择。特别是如果你的目标是想构建一个无停机和流入的数据以时间为导向的产品,Druid很适合。当谈查询速度的话题时,要明确一个很重要说明,"fast"的意义是什么:Druid完全有可能实现不到一秒钟完成对数以万亿行数据的查询。
 

Druid集群配置
3.1 环境信息
我这里有两台机器,node1有32G内存,上面部署了Histotical Node和Coordinator Node;node2有72G内存,上面部署了其他四个服务。

海量数据实时OLAP分析系统-Druid.io安装配置和体验

3.2 通用配置(Common Configuration)
##创建MySQL数据库

CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;

##配置文件

cd $DRUID_HOME/config/_common
vi common.runtime.properties(所有节点)

##使用Mysql存储元数据
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight", "io.druid.extensions:mysql-metadata-storage"]

##zookeeper
druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181

##Mysql配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd1234

##配置deep storage到HDFS
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://cdh5/tmp/druid/storage

##配置查询缓存,暂用本地,可配置memcached
druid.cache.type=local
druid.cache.sizeInBytes=10737418240

##配置监控
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]

##配置Indexing service的名字
druid.selectors.indexing.serviceName=druid/overlord

##
druid.emitter=logging
3.3 Overlord Node(Indexing Service)
在运行Overlord Node节点上:

cd $DRUID_HOME/config/overlord
vi runtime.properties

druid.host=node2
druid.port=8090
druid.service=druid/overlord

# Only required if you are autoscaling middle managers
druid.indexer.autoscale.doAutoscale=true
druid.indexer.autoscale.strategy=ec2
druid.indexer.autoscale.workerIdleTimeout=PT90m
druid.indexer.autoscale.terminatePeriod=PT5M
druid.indexer.autoscale.workerVersion=0

# Upload all task logs to deep storage
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

# Run in remote mode
druid.indexer.runner.type=remote
druid.indexer.runner.minWorkerVersion=0

# Store all task state in the metadata storage
druid.indexer.storage.type=metadata
3.4 MiddleManager Node
在运行MiddleManager Node节点上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties

druid.host=node2
druid.port=8091
druid.service=druid/middlemanager

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

# Resources for peons
druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
druid.indexer.task.baseTaskDir=/tmp/persistent/task/
3.5 Coordinator Node
在运行Coordinator Node节点上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties

druid.host=node1
druid.port=8081
druid.service=coordinator

druid.coordinator.startDelay=PT5M
3.6 Historical Node
在运行Historical Node节点上:
cd $DRUID_HOME/config/historical
vi runtime.properties

druid.host=node1
druid.port=8082
druid.service=druid/historical

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=9

druid.server.http.numThreads=9
druid.server.maxSize=300000000000

druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}]

druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

以上仅供大家探讨,如有错误请多多指出交流,数据吧/云群网络科技有限公司