大数据特点是数据量大(volume)、数据类型多(variety)、处理速度快(velocity)、数据价值密度低(value)。
大数据技术包括数据的采集与预处理、数据存储与管理、数据的计算与分析、数据可视化、数据安全和隐私保护。
- 大数据采集与预处理。利用ETL工具将分布在异构数据源中的数据,如关系型数据库中的数据、文本日志文件、数据库binlog等,抽取到临时中间层后进行清洗、转换,最后加载到数据仓库中用于后续的数据分析和挖掘。或是利用数据采集工具把实时采集的数据进行转换作为流计算的输入进行实时处理分析。
- 大数据存储与管理。利用关系型数据库、NoSQL数据库、数据仓库、分布式文件系统来实现大数据的存储和读取。
- 大数据计算与分析。利用分布式并行编程模型和计算框架,有时会结合机器学习和数据挖掘算法,实现对海量数据的处理和分析。对分析结果进行可视化呈现。
- 大数据安全和隐私。构建数据安全体系和隐私数据保护体系保护数据安全和个人隐私。
Hadoop生态
Hadoop是一个分布式处理大型数据集的框架。不依赖硬件配置而是使用多台普通电脑通过简单的编程模型来实现数据的可靠性和可扩展性。Hadoop提供了一个稳定的数据存储和分析系统,其中数据分析由MapReduce实现,数据存储由HDFS也就Hadoop分布式文件系统实现。
HDFS是为以流式数据访问模式存储超大文件设计的分布式文件系统。HDFS提供了在廉价服务器集群中进行大规模分布式文件存储的能力。HDFS具有很好的容错能力,并且兼容廉价的硬件设备,因此,可以以较低的成本利用现有机器实现大流量和大数据量的读写。除了Hadoop默认的HDFS实现外,还提供了文件系统的抽象接口。其它文件系统可以通过实现这些接口来无缝的融入Hadoop生态,例如Aliyun OSS、Tencent COS、Amazon S3、Azure Blob Storage、Azure Data Lake Storage、OpenStack Swift等。
HBase是一个将HDFS作为底层存储的高可靠、高性能、可伸缩、实时读写的分布式列式存储NoSQL数据库。HBase与传统关系数据库的一个重要区别是,HBase采用基于列的存储。HBase具有良好的横向扩展能力,可以通过不断增加廉价的商用服务器来提高存储能力。HBase利用MapReduce来处理HBase中的海量数据实现高性能计算;利用 Zookeeper 作为协同服务,实现稳定服务和失败恢复;使用HDFS作为高可靠的底层存储。此外,为了方便在HBase上进行数据处理,Sqoop为HBase提供了高效、便捷的RDBMS数据导入功能,Pig和Hive为HBase提供了高层语言支持。
YARN是Hadoop的计算资源管理和调度系统,接受任务请求,并根据请求的需要来分配资源,调度任务的执行。YARN 的目标是实现“一个集群多个框架”,即在一个集群上部署一个统一的资源调度管理框架YARN,在YARN之上可以部署其他各种计算框架,比如MapReduce、Tez、Storm、Giraph、Spark、OpenMPI等,由YARN为这些计算框架提供统一的资源调度管理服务(包括 CPU、内存等资源),并且能够根据各种计算框架的负载需求,调整各自占用的资源,实现集群资源共享和资源弹性收缩。通过这种方式,可以实现一个集群上的不同应用负载混搭,有效提高了集群的利用率,同时,不同计算框架可以共享底层存储(如HDFS),在一个集群上集成多个数据集,使用多个计算框架来访问这些数据集,从而避免了数据集跨集群移动,最后,这种部署方式也大大降低了企业运维成本。目前,可以运行在YARN之上的计算框架包括离线批处理框架MapReduce、内存计算框架Spark、流计算框架Storm和DAG计算框架Tez等。和YARN一样提供类似功能的其他资源管理调度框架还包括Mesos、Torca、Corona、Borg等。
MapReduce是线性的可伸缩的编程模型,它将并行计算抽象为两个函数Map和Reduce来完成海量数据的计算。MadReduce的核心思想就是分治,它把输入的数据集切分为若干独立的数据块,分发给一个主节点管理下的各个分节点来共同并行完成;最后,通过整合各个节点的中间结果得到最终结果。
Hive是一个基于Hadoop的数据仓库工具,可以用于对Hadoop文件中的数据集进行数据整理、特殊查询和分析存储。Hive提供了类似于关系数据库 SQL 的查询语言HiveQL,可以通过 HiveQL 语句快速实现简单的 MapReduce 任务,Hive 自身可以将HiveQL语句转换为MapReduce任务运行,而不必开发专门的MapReduce应用,因而适合数据仓库的统计分析。
Pig是一种数据流语言和运行环境,适合于使用Hadoop来查询大型半结构化数据集。虽然编写MapReduce应用程序不是十分复杂,但毕竟也是需要一定的开发经验的。Pig的出现大大简化了Hadoop常见的工作任务,它在MapReduce的基础上创建了更简单抽象的过程语言,为Hadoop应用程序提供了一种更加接近结构查询语言的接口。Pig是一种相对简单的语言,它可以执行语句,因此当我们需要从大型数据集中搜索满足某个给定搜索条件的记录时,采用Pig要比MapReduce具有明显的优势,前者只需要编写一个简单的脚本在集群中自动并行处理与分发,后者则需要编写一个单独的MapReduce应用程序。
Mahout提供一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建智能应用程序。Mahout包含许多实现,如聚类、分类、推荐过滤、频繁子项挖掘等。此外,通过使用Apache Hadoop库,Mahout可以有效地扩展到云中。
ZooKeeper提供了高可靠的分布式协调服务,可以用于维护配置信息、命名服务、分布式锁、集群管理。ZooKeeper使用Java编写,很容易编程接入,它使用了一个和文件树结构相似的数据模型,可以使用Java或者C来进行编程接入。
Flume是一个高可用的、高可靠的、分布式的海量日志采集、聚合和传输的系统。Flume 支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume 提供对数据进行简单处理并写到各种数据接收方的能力。
Sqoop是SQL-to-Hadoop的缩写,主要用来在Hadoop和关系数据库之间交换数据,可以改进数据的互操作性。通过Sqoop可以方便地将数据从MySQL、Oracle、PostgreSQL等关系数据库中导入Hadoop(可以导入HDFS、HBase或Hive),或者将数据从Hadoop导出到关系数据库,使传统关系数据库和Hadoop之间的数据迁移变得非常方便。Sqoop主要通过Java数据库连接(Java DataBase Connectivity,JDBC)和关系数据库进行交互,理论上,支持JDBC的关系数据库都可以使Sqoop和Hadoop进行数据交互。Sqoop是专门为大数据集设计的,支持增量更新,可以将新记录添加到最近一次导出的数据源上,或者指定上次修改的时间戳。
Apache Ambari是一种基于Web的工具,支持Apache Hadoop集群的安装、部署、配置和管理。Ambari支持大多数Hadoop组件,包括HDFS、MapReduce、Hive、Pig、HBase、ZooKeeper、Sqoop等。
大数据采集与预处理技术
ETL
ETL是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)到定义好的数据仓库模型的过程。
大数据存储技术
HDFS
HDFS是为以流式数据访问模式存储超大文件的分布式文件系统。
分布式文件系统的特点
- 透明性。用户感知不到它是本地文件系统还是分布式文件系统。
- 可伸缩性。节点可以动态的加入和退出而不影响
- 跨平台兼容。不同的硬件和操作系统都可以加入文件系统。
- 并发控制。多个客户端都可以访问。
- 容错。
HDFS缺点
- HDFS主要是用于大规模数据批量处理,采用流式数据读取,具有很高的数据吞吐率,但是,这也意味着较高的延迟。因此HDFS 不适合用在需要较低延迟(如数十毫秒)的应用场合。对于低延时要求的应用程序适合使用HBase。
- 另外,HDFS无法高效存储大量小文件,过多小文件会给系统扩展性和性能带来诸多问题。HDFS采用名称节点来管理文件系统的元数据,这些元数据被保存在内存中,从而使客户端可以快速获取文件实际存储位置。通常每个文件、目录和块大约占150Byte,如果有1000万个文件,每个文件对应一个块,那么,名称节点要消耗大量内存来保存这些元数据信息。这时元数据检索的效率就比较低了,需要花费较多的时间找到一个文件的实际存储位置。其次,用MapReduce处理大量小文件时,会产生过多的Map任务,进程管理开销会大大增加,因此处理大量小文件的速度远远低于处理同等规模的大文件的速度。再次,访问大量小文件的速度远远低于访问几个大文件的速度,因为访问大量小文件,需要不断从一个数据节点跳到另一个数据节点,严重影响性能。Hadoop Archives(HAR)是一个高效的将文件放入HDFS块中文件存档设备,减少文件名称节点内存使用的同时仍然允许对文件进行透明的访问。
- 隔离问题。由于集群中只有一个名称节点,只有一个命名空间,因此无法对不同应用程序进行隔离。
HDFS体系结构
文件系统一般会把磁盘空间划分为每512Byte一组,称为“磁盘块”,块是文件系统进行读写操作的最小单位,文件系统的块通常是磁盘块的整数倍,即每次读写的数据量必须是磁盘块大小的整数倍。HDFS也一样,HDFS更注重读取整个数据集的时间而不是读取一条记录的时间,磁盘中的块大小是它能够读写的最小数据集,HDFS中块的默认大小为64MB。HDFS的块比磁盘的块大目的是为了减小寻址开销,通过让一个块足够大,从磁盘转移数据的时间能远大于定位这个块开始端的时间。采用抽象块后可以支持大规模文件存储并且简化数据备份。
HDFS集群采用主从结构模型有两种节点以管理者-工作者模式运行,即一个名称节点和多个数据节点。名称节点保存了两个核心的数据结构FsImage和EditLog,管理文件系统的命名空间,维护文件系统树及树内所有文件和索引目录。FsImage用于维护文件系统树以及文件树中所有的文件和文件夹的元数据,操作日志文件 EditLog 中记录了所有针对文件的创建、删除、重命名等操作。名称节点还记录了每个文件的每个块所在的数据节点,但它不会永久保存块的位置,因为这些信息会在系统启动时由数据节点重建。数据节点存储并提供定位块的服务,并定时向名称节点发送它们存储的块的列表。客户端通过名称节点和数据节点来访问整个文件系统。
HDFS存储原理
为了保证系统的容错性和可用性,HDFS 采用了多副本方式对数据进行冗余存储,通常一个数据块的多个副本会被分布到不同的数据节点上。如图,数据块1被分别存放到数据节点A和C上,数据块2被存放在数据节点A和B上等。这种多副本方式的优点是:
- 加快数据传输速度。多个客户端需要同时访问同一个文件时,可以让各个客户端分别从不同的数据块副本中读取数据,这就大大加快了数据传输速度。
- 容易检查数据错误。HDFS 的数据节点之间通过网络传输数据,采用多个副本可以很容易判断数据传输是否出错。
- 保证数据的可靠性。即使某个数据节点出现故障失效,也不会造成数据丢失。
数据存取策略包括数据存放、数据读取和数据复制等。
数据存放:为了提高数据的可靠性与系统的可用性,同时充分利用网络带宽,HDFS采用了以机架(Rack)为基础的数据存放策略。一个HDFS集群包含多个机架,不同机架之间的数据通信需要经过交换机或者路由器,同一个机架中不同机器之间的通信则不需要经过交换机和路由器,因此同一个机架中不同机器之间的通信要比不同机架之间机器的通信带宽大。HDFS默认每个数据节点都在不同的机架上,这种方法的缺点是写入数据的时候不能充分利用同一机架内部机器之间的带宽。但可以获得很高的数据可靠性,即使一个机架发生故障,位于其他机架上的数据副本仍然是可用的;其次,可以在多个机架上并行读取数据,提高数据读取速度;最后,可以更容易地实现系统内部负载均衡和错误处理。
数据读取:HDFS提供了API可以确定一个数据节点所属的机架ID,客户端也可以调用API获取自己所属的机架ID。当客户端读取数据时,从名称节点获得数据块不同副本的存放位置列表,列表中包含了副本所在的数据节点,可以调用API来确定客户端和这些数据节点所属的机架ID。当发现某个数据块副本对应的机架ID和客户端对应的机架ID相同时,就优先选择该副本读取数据,如果没有发现,就随机选择一个副本读取数据。
数据错误与恢复:
- 名称节点恢复:名称节点的FsImage文件和EditLog文件保存了所有的元数据信息,如果这两个文件损坏,整个HDFS实例将失效。为了保证名称节点的高可用,Hadoop提供了两种机制。1.数据在写入本地磁盘的同时写入远程NFS挂载,这些写操作具有同步性和原子性。2. 运行一个二级名称节点定期合并EditLog与FsImage,减小EditLog文件大小缩短名称节点重启时间。其次,它可以作为名称节点的“检查点”,保存名称节点中的元数据信息。
- 数据节点恢复:数据节点会定期向名称节点发送“心跳”报告自己的状态。当数据节点发生故障,名称节点无法收到数据节点的“心跳”,数据节点就会被标记为“死机”,节点上面的所有数据都会被标记为“不可读”,名称节点也不会再发送任何I/O请求。数据节点的不可用可能会导致一些数据块的副本数量小于冗余因子。名称节点会定期检查这种情况,一旦发现某个数据块的副本数量小于冗余因子,就会启动数据冗余复制,为它生成新的副本。
- 数据出错:客户端在读取数据后,会采用MD5和SHA-1对数据块进行校验,以确定读取正确的数据。在文件被创建时,客户端就会对每一个文件块进行信息摘录,并把这些信息写入同一个路径的隐藏文件里面。当客户端读取文件的时候,会先读取该信息文件,然后利用该信息文件对每个读取的数据块进行校验。如果校验出错,客户端就会请求到另外一个数据节点读取该文件块,并且向名称节点报告这个文件块有错误,名称节点会定期检查并且重新复制这个块。
HDFS访问过程
当客户端需要访问一个文件时,客户端通过FileSystem对象的open()来读取文件,FileSystem通过使用RPC来调用名称节点获取文件开头部分块的位置和对应的排序后的数据节点和副本的数据节点。然后当客户端调用read()方法时就是依次从每个块的一个数据节点上读取数据。文件系统的一致模型描述了对文件读写的数据可见性。写入数据时当前正在被写入数据的块是不可见的,大小也没有被计算。HDFS以透明方式校验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。针对每个io.bytes.per.checksum字节,都会创建一个单独的校验和。默认值为512字节,因为CRC-32校验和是4字节,存储开销小于1%。
HDFS是一个部署在集群上的分布式文件系统,因此很多数据需要通过网络进行传输。所有的HDFS通信协议都是构建在TCP/IP上,客户端通过一个可配置的端口向名称节点主动发起TCP连接,并使用客户端协议与名称节点进行交互。名称节点和数据节点之间则使用数据节点协议进行交互。客户端与数据节点的交互通过远程过程调用来实现。在设计上,名称节点不会主动发起RPC,而是响应来自客户端和数据节点的RPC请求。
Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。其它的还有LocalFileSystem、HftpFileSystem、S3FileSystem。
文件压缩可以减少存储文件所需要的空间还可以加快数据在网络上的传输速度。在考虑如何压缩数据时,考虑压缩格式是否支持分割很重要,因为文件会被分割存储到不同的块,每个块的数据都会被不同的mapreduce任务处理,不会将整个文件合到一起。序列化算法也对数据传输的性能有较大影响。
sequenceFile文件是Hadoop用来存储二进制形式的[Key,Value]对而设计的一种平面文件。可以把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中可以高效的对小文件进行存储和处理。SequenceFile文件并不按照其存储的Key进行排序存储,SequenceFile的内部类Writer提供了append功能。在存储结构上,SequenceFile主要由一个Header后跟多条Record组成,Header主要包含了Key classname,value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。每条Record以键值对的方式进行存储,用来表示它的字符数组可以一次解析成:记录的长度、Key的长度、Key值和value值,并且Value值的结构取决于该记录是否被压缩。
MapFile是经过排序的带索引的sequencefile,可以根据键进行查找。mapfile可以认为是map的一种持久化形态,它会增长乃至超过内存大小。
常用命令:
- hadoop fs -help
- hadoop fs -copyFromLocal [htsf://blog.xiexiaoyuan.com/]
- hadoop fs -copyToLocal
- hadoop distcp -m 1000 hdfs://xxx hadf://xxx 用于高效的在集群间复制数据。
- hadoop fs -text | head
HBase
Hbase是一个高可靠、高性能、面向列、可伸缩的分布式数据库。Hbase使用HDFS作为底层存储,可以通过水平扩展支持超大规模数据存储;利用MapReduce处理海量数据的高性能计算;利用Zookeeper作为分布式协调服务,实现服务的可靠和失败恢复。
HBase数据模型
从逻辑视图看,Hbase以表的形式保存数据。从物理视图看,Hbase是一个稀疏的、分布式的、多维排序的由键值构成的Map。
HBase的表由行和列组成,每个行有一个行键,用来唯一标识一行。列由多个列族组成,每个列族包含多个列。通过行键、列族和列可以唯一定位一个单元格,每个单元格中保存着一个数据的多个版本,每个版本对应不同的时间戳。当用户对Hbase的数据进行新建、修改、删除时,Hbase不会直接修改原有数据,而是为新的数据生成一个新的时间戳,因此时间戳最大的数据就是最新的数据。因此在Hbase的Map中键就是[行键,列族、列限定符、时间戳、操作类型]。
HBase采用以列族为单位的列式存储,同一个列族下的数据保存在相同的文件中。列式存储适合批量处理和部分字段的实时查询。当只需要对部分字段进行处理时,列式存储需要扫描的磁盘块更少,处理效率更高。列式存储模型(Decomposition Storage Model,DSM)的缺点是连接操作性能低。
HBase系统架构
HBase系统架构中主要包括客户端、ZK、Master主服务器和一组Region服务器。
- 用户通过客户端访问ZK,查询要访问的数据所在的Region服务器列表,并缓存数据所在的Region服务器和Region,加快后续数据访问。
- ZK集群负责提供分布式协调服务,通过监听Region服务器的上下线调整服务策略。因此zk中维护中Region服务器列表,另外还维护Master服务器地址和定位数据所在Region时的Root表地址。
- Master服务器负责表和Region的管理。管理业务表的增删改查,在Region发生分裂和合并时重新调整Region的分布,Region服务器发生故障时迁移服务器上的Region。客户端不需要与Master服务器连接。
- Region服务器负责维护Region并响应用户的读写请求,HBase一般采用HDFS作为底层存储。
HBase表逻辑结构
Region服务器内部结构
一个Region服务器由一个HLog、一个BlockCache和多个Region组成。HLog用于记录用户对该Region服务器下的所有Region的操作,用来保证数据的可靠性。Blockcache用来缓存数据提升数据的读取性能。Region是业务表的水平分片,负责保存数据。每个Region包含多个Store,每个Store保存水平分片下的一个列族的数据。每个Store采用LSM数据模型,由一个内存块MemStore和多个HFile(StoreFile)组成。其中,MemStore的数据结构是跳表(ConcurrentSkipListMap),HFile是一个rowKey有序的文件。
HBase写入流程
HBase的写入流程主要分3个阶段:寻址、写入、flush。
寻址:HBase采用3级寻址。客户端首先从zk获取root所在的region服务器及region。然后获取meta表的region服务器,最后定位用户数据表的region服务器。
写入:HBase的任何写入操作都会首先追加到HLog文件中,后台线程会周期性的进行日志滚动,新建一个日志文件。日志滚动主要是为了方便以文件为单位删除过期的HLog。写入数据一旦从MemStore中落盘,对应的日志数据就会失效,当所有日志数据失效时,日志文件就会失效,HBase会将失效的日志文件移到特定的失效文件夹。HBase会周期性的检查失效文件夹删除所有失效日志文件。
Flush:写入HLog后数据会写入MemStore,MemStore是跳表结构的内存块,通过先将数据写入内存,待内存数据大小超过阈值时再顺序写入生成HFile文件。因为HBase是有序的键值数据库,数据是按照rowKey有序排列的,使用LSM树模型的优点是:1. 将一次随机写入转换成了一次顺序写入,提高写入性能;2. MemStore保存这最近写入的数据可以提高读取性能;3. 对于只保存最新版本数据的场景,MemStore写入HFile时可以丢弃老版本数据,压缩数据量。
正常情况下,大部分MemStore Flush操作都不会对业务读写产生太大影响。然而,一旦触发RegionServer级别限制导致flush,就会对用户请求产生较大的影响。在这种情况下,系统会阻塞所有落在该RegionServer上的写入操作,直至MemStore中数据量降低到配置阈值内。
随着HFile的文件越来越多,需要将Hfile进行合并以提升数据的读取性能。HFile合并采用归并排序进行合并,保证合并后的文件的有序性。
随着数据不断写入,当region的大小达到阈值时,会进行region分裂,将一个region分裂成2个region。(region分裂后,缓存的客户端的region信息就与region服务器的实际数据不一致,此时客户端需要重新查询并缓存)。
Kudu
Kudu是一个分布式的列式存储引擎。与HDFS和HBase相比,既有HDFS的高吞吐,又有HBase的随机读写性能,因此适用于既有随机访问,也有批量数据扫描的复合场景。
数据仓库技术
数据仓库也就是OLAP(online analyze process,联机分析处理)数据库。数据仓库的特点是数据容量大(PB级的数据),数据分析能力强,对事务的支持较弱。OLTP与OLAP数据库的本质区别在于,OLTP数据库通过规范化来减少数据冗余,保证数据一致性,实现事务;而OLAP数据库通过逆规范化,将所有相关数据保存在宽表(列非常多)中,来提高数据的分析能力。常见的数据技术有Hive、HBase、Kylin、ClickHouse等。
在使用OLAP进行数据分析时,需要对原始数据进行维度建模之后再进行分析。维度建模理论中,基于事实表和维度表构建数据仓库。在实际操作中,一般会使用ODS(Operational Data Store,运营数据存储)层、DW(Data Warehouse,数据仓库)层、ADS(Application Data Service,应用数据服务)层三级结构。
- ODS层一般作为业务数据库的镜像。在项目中,数仓工程师通常通过数据抽取工具(例如Sqoop、DataX等)将业务库的数据复制到数仓的ODS层,供后续建模使用。
- 将数据导入ODS层后对数据进行清洗、建模,生成DW层的数据。DW层的数据就是经过逆规范化的数据。
- ADS层保存分析后的供业务使用的数据。
ClickHouse
ClickHouse是一个开源的单机的列式的高性能分析型SQL数据库管理系统,适用于OLAP场景。它不仅仅是一种存储技术,而是一个完整的数据管理系统,包括计算引擎、存储引擎和数据存储。
ClickHouse采用MPP(大规模并行处理)架构。存储引擎采用列式存储,相比行存储减少了数据分析时大量无效数据的IO扫描。计算引擎采用采用向量化引擎,借助CPU的SIMD技术(Single instruction Multiple Data,单指令多数据流),充分发挥单机性能,提高实时分析性能。MPP和主从架构最大的区别在于主从架构将集群中所有的服务器整合成一个单独的系统,统一对外提供服务,而MPP架构则是一个松散的集群,集群中的任意一台服务器都可以单独对外提供服务,是一个多主的结构。
ClickHouse的体系结构分为计算引擎、存储引擎、服务接口、管理工具和后台服务等组成部分。存储引擎负责将内存中的数据按照特定的规则持久化到磁盘;计算引擎则将用户提交的SQL语句转换成执行计划并对内存中的数据进行计算;后台服务负责执行一些分区合并、数据删除等后台工作;管理工具进行数据库的配置、管理。
ClickHouse数据模型
ClickHouse是一个面向列存的数据仓库,ClickHouse将一列的多个字段作为整体进行处理。ClickHouse字段的数据类型有100多种,通过对不同数据类型使用不同的内置函数来提高系统处理性能。数据类型可分为两类:内存对齐的数据类型和内存不对齐的数据类型。内存对齐的数据类型只保存数据数组,在计算时也不需要额外处理数据边界,因此内存对齐的数据具有更高的存储与计算效率,内存中无法对齐的数据还额外保存了一个用于定界的offset数组。
ClickHouse组件之间在内存中以块(Block)为单位计算数据,块由数据区和索引区组成。数据区存储每一列的数据,索引区记录列名等元数据信息,ClickHouse默认每块存储8192行数据。
ClickHouse表支持不同的表引擎,表引擎决定了数据表中的数据如何以文件的形式保存,包括数据的组织形式、文件的组织形式等。而存储引擎则决定了表引擎中数据文件的存储位置,是保存到本地磁盘,还是保存到远程文件存储服务中。其中最常用的表引擎是MergeTree,它的特点是查询速度快,但不支持数据修改。
作为分析型数据库,ClickHouse的主要性能瓶颈在于有大量的IO。因此,ClickHouse通过LSM数据模型预排序来提高范围查询性能、通过列式存储减少无效数据的IO、通过数据压缩减少数据量,来减少IO耗时(与HBase理念一致)。
大数据计算技术
大数据计算技术包括批处理计算、流计算、图计算、查询分析计算。
批处理计算主要解决大规模数据的批量处理。MapReduce和Spark是最流行的计算技术,它们都是线性的可伸缩的编程模型。MapReduce将并行计算抽象为两个函数Map和Reduce来完成海量数据的计算,在MapReduce模型中数据的输入和输出源是文件系统,计算的中间结果全都保存在磁盘中。Spark是一个低延迟的分布式集群计算系统,它启用了内存分布式数据集,中间结果优先保存在内存中,因此比MapReduce快很多。
流计算主要解决流数据的实时计算,并给出秒级响应。流数据是指时间分布和数量上无限的动态数据集合,比如实时日志、实时用户行为、实时天气温度。主要开源流计算框架有Storm、Yahoo S4、Spark Streaming、Flink、Flume。
图计算主要解决图或网络形式的数据的计算。主要的框架有Pregel、Spark GraphX、Flink Gelly、PowerGraph。
查询分析计算主要为超大规模数据的存储管理和查询分析提供实时或准实时的响应。主要的框架有Hive、Impala、Dremel、Cassandra。
MapReduce
MapReduce是一种线性的可伸缩的并行编程模型,用于大规模数据的并行计算,它将复杂的、运行于大规模集群上的并行计算过程高度抽象为两个函数:Map和Reduce。适合用MapReduce来处理的数据集需要满足一个前提条件:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。MapReduce理念是“计算向数据靠拢”。Map任务的执行节点和输入数据的存储节点是同一个节点,可以解决网络带宽瓶颈,达到数据局部性优化(data locality optimization)。Map函数的任务将输入数据转换成格式,然后经过业务逻辑计算将执行结果以一批的格式输出到本地磁盘上,然后等待Reduce任务读取数据执行。Reduce函数的任务将输入的一系列具有相同键的键值对以某种方式组合起来,输出处理后的键值对,输出结果会合并成一个文件,待Reduce任务执行完成后删除Map任务的中间结果数据。通常情况下Reduce任务的输入往往是所有Map任务的输出的指定的key,因此Map的输出必须通过网络传输到Reduce任务运行的节点,经过Reduce任务计算后输出到HDFS中。通常Reduce输出的HDFS块第一个副本存储在本地节点,其它副本存储在其它节点。
Map和Reduce任务之间的数据流称为Shuffle洗牌。
Map任务的输出结果首先被写入缓存,默认大小是100 MB,当缓存满时就启动溢写操作,把缓存中的数据写入磁盘文件并清空缓存。当启动溢写操作时,首先需要对缓存中的数据进行分区,然后对每个分区的数据进行排序和合并,再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来“领取”属于自己处理的数据。为了保证Map结果能够持续写入缓存,不受溢写过程的影响,就必须让缓存中一直有可用的空间,不能等到全部占满才启动溢写过程,所以一般会设置一个溢写比例,如0.8。也就是说,当100 MB大小的缓存被填入80 MB数据时,就启动溢写过程,把已经写入的80 MB数据写入磁盘,剩余20 MB空间供Map结果继续写入。合并操作需要实现Combiner函数,Combiner将Map输出作为输入,Combiner输出作为Reduce输出。通过Combiner对Map的输出进行一定的聚合,减少溢写到磁盘的数据量,减少网络传输的数据量,但是Hadoop不一定为执行Combiner,所以我们需要保证是否调用Combiner,Reduce的输出结果都是一样的。磁盘文件归并指对所有溢写文件中的数据进行归并,生成一个经过分区和排序的溢写文件。进行磁盘文件归并时,如果磁盘中已经生成的溢写文件的数量过多,就可以再次运行Combiner,对数据进行合并操作,从而减少写入磁盘的数据量。
Reduce任务从Map端的不同Map机器“领取”属于自己处理的那部分数据,然后对数据进行归并后交给Reduce处理。每个Reduce任务会不断地通过RPC向JobTracker询问Map任务是否已经完成;JobTracker监测到一个Map任务完成后,就会通知相关的Reduce任务来“领取”数据;一旦一个Reduce任务收到JobTracker的通知,它就会到该Map任务所在机器上把属于自己处理的分区数据领取到本地磁盘中。Reduce任务会执行Reduce函数中定义的各种映射,输出最终结果,并将其保存到分布式文件系统中。Hadoop把输入数据划分为等长的小数据发送到MapReduce成为分片,Hadoop为每个分片创建Map任务,然后并行运行用户实现的Map函数,对于大多数作业一个理想的分片大小时一个HDFS块的大小,默认是64MB,这样可以最大程度保证一个分片的数据在同一个存储节点上。
在Hadoop中YARN负责资源管理和调度,MapReduce以客户端的形式向YARN提交任务,然后运行在YARN管理的各个节点上。
MapReduce也有其局限性:
- MapReduce的执行速度慢,大量中间结果需要写到磁盘上。
- 相比SQL过于底层易用性差。
- 不是所有算法都能用MapReduce实现。
Spark
Spark是一个开源的内存大数据计算平台。Spark能够提供内存计算框架,也可以支持SQL即席查询(Spark SQL)、流式计算(Spark Streaming)、机器学习(MLlib)和图计算(GraphX)等。Spark 可运行于独立的集群模式中,也可以运行在Hadoop环境中,并可以访问多种数据源 HDFS、HBase、Hive、Cassandra等。Spark支持使用Scala、Java、Python和R语言进行编程,并且可以通过Spark Shell进行交互式编程。
Hadoop中的MapReduce计算框架主要存在以下缺点:
- 表达能力有限。计算都必须要转化成Map和Reduce两个操作,但这不适合所有的情况,难以描述复杂的数据处理过程;
- 磁盘I/O开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写入到磁盘中,I/O开销较大,延迟高。
- 一次计算可能需要分解成一系列按顺序执行的MapReduce任务,任务之间的衔接由于涉及到I/O开销,会产生较高延迟。而且在前一个任务执行完成之前,其他任务无法开始,因此,难以胜任复杂、多阶段的计算任务。
相比于MapReduce,Spark主要具有如下优点:
- Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活;
- Spark提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率;
- Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。
Spark Core包含Spark最基础和最核心的功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等,主要面向批数据处理。Spark Core建立在统一的抽象RDD之上,使其可以以基本一致的方式应对不同的大数据处理场景。Spark Core通常被简称为Spark。RDD(弹性分布式数据集)是Spark数据在内存中的存储模型,可读不可写。
Spark SQL是用于结构化数据处理的组件,允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员不需要自己编写Spark应用程序,开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析。
BlinkDB 是建立在 Hive 和 Shark之上的近似查询引擎,已不再维护。
Spark Streaming是一种流计算框架,可以支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用Spark Core进行快速处理。Spark Streaming支持多种数据输入源,如Kafka、Flume和TCP套接字等。
MLBase是用于进行大规模机器学习计算,包括3个组件。MLlib提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习方面的工作。MLI实现了高层的机器学习程序的抽象。ML Optimizer目标是使机器学习的管道构建自动化,解决了机器学习算法和特征提取中的搜索问题。MLI和ML Optimizer都还在开发中。
GraphX是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化,GraphX性能良好,拥有丰富的功能和运算符,能在海量数据上运行复杂的图算法。
无论是Spark SQL、Spark Streaming、MLlib还是GraphX,都可以使用Spark Core的API处理问题,它们的方法几乎是通用的,处理的数据也可以共享,不同应用之间的数据可以无缝集成。
Spark Core运行架构
Spark运行架构包括集群资源管理器(Cluster Manager)、每个应用的任务控制节点(Driver Program)运行作业任务的工作节点(Worker Node)和每个工作节点上负责具体任务的执行进程(Executor)。Cluster Manager可以是 Spark 自带的资源管理器,也可以是YARN或Mesos等资源管理框架。就系统架构而言,Spark采用“主从架构”,包含一个Master(即Driver Program)和若干个Worker。
当执行一个应用时,任务控制节点(Driver Program)会向集群管理器(Cluster Manager)申请资源,启动Executor并向Executor发送应用程序代码和文件,然后在Executor上执行应用程序,运行结束后执行结果会返回给任务控制节点Driver,写到HDFS或者其他数据库中。
Executor是运行在工作节点(Worker Node)上的一个进程,负责运行任务,并为应用程序存储数据,Executor是利用多线程来执行具体任务,减少任务的启动开销。Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备(默认使用内存,当内存不够时,会写到磁盘),当需要多轮迭代计算时可以将中间结果存储到这个存储模块里,下次需要时,就可以直接读取该存储模块里的数据,而不需要读取HDFS等文件系统的数据,因而有效减少了I/O开销。在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写I/O性能。
在Spark中,一个应用程序由一个任务控制节点(Driver Program)和多个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。
Spark程序运行流程
当执行一个应用程序时,任务控制节点会向集群管理器申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给任务控制节点,写到HDFS或者其他数据库中。
- 当一个Spark应用被提交时,首先由任务控制节点创建一个SparkContext对象,由SparkContext负责和资源管理器的通信以及进行资源的申请、任务的分配和监控等,SparkContext 会向资源管理器注册并申请运行Executor的资源,SparkContext可以看成是应用程序连接集群的通道。
- 资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上。
- SparkContext根据RDD的依赖关系构建DAG图(有向无环图),DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把“阶段”提交给底层的任务调度器进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码和文件发放给Executor。
- 任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
每个Spark应用程序都有独立的Executor进程,并且该进程在应用运行期间一直驻留,Executor进程以多线程的方式运行任务。任务采用了流水线优化、数据本地性、推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间少,那么调度就会等待直到当前节点可用。
RDD(Resilient Distributed Dataset)编程
RDD特点
RDD是一个逻辑上的只读的多分区的分布式的内存数据集合。Spark SQL、Spark Streaming底层都是基于RDD来计算的,也因此Spark的各个组件可以无缝进行集成,在同一个应用程序中完成大数据计算任务。
- 内存:RDD是一个内存模型,数据保存在内存中。
- 只读:RDD的只读可以和Java中的String对象做类比。String对象是不可修改的,每次“修改”实际上是新建一个String对象。RDD也一样,每次“修改”只能在原有RDD上经过计算生成新的RDD。
- 分布式:String对象是保存在单一物理机的单一进程下的内存中。而RDD中的数据是分布式的保存在Spark集群的多个工作节点的内存上。
- 多分区:一个RDD的每个工作节点上的数据构成了RDD的一个分区,所以RDD是多分区的,分区的大小取决于分区的用户指定的数量。
RDD的特点是具有高效的容错性。有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,也就是在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销。在RDD的设计中,数据只读,必须从父RDD转换到子RDD,由此在不同RDD之间建立了血缘关系。所以RDD是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(比如检查点)实现容错,而只需通过RDD父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,而且重算过程可以在不同节点之间并行进行,实现了高效的容错。此外,RDD提供的转换操作都是一些粗粒度的操作(比如map、filter和join),RDD依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销。
RDD操作
对RDD的计算包括两种操作:Transformation和Action。Transformation接受一个RDD并返回一个RDD,如map、filter、groupBy、join等。Action接受RDD返回非RDD而是返回一个值或结果,如count、collect等。RDD的计算采用惰性调用机制,类似于Java Stream,在调用Transformation是并不会真正去执行计算,只是记录了RDD之间的依赖关系生成DAG图,而是在遇到Action操作时才会触发计算。
Shuffle对Spark任务的影响
在MapReduce中Shuffle是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。Shuffle过程分为Map端的操作和Reduce端的操作。Map端在溢写时对缓存中的数据进行分区、排序,不同分区的数据保存在磁盘中的不同逻辑桶中。Reduce端将多个Map任务的属于自己分区的数据重新归并排序。
Spark计算也存在相同的Shuffle操作,但是做了部分优化,Map端和Reduce根据RDD的操作来决定是否需要对数据进行排序,还是简单的聚合。Shuffle过程中每个Map任务会产生两个文件:数据文件和索引文件,数据文件是存储当前Map任务的输出结果,索引文件中则存储了数据文件中的数据的分区信息。下一个阶段的Reduce任务就是根据索引文件来获取属于自己处理的那个分区的数据。Reduce任务所拥有的内存,必须足以存放属于自己处理的所有key和value值,否则就会产生内存溢出问题,因此涉及到大量数据的Shuffle操作的时候尽量增加分区的数量,也就是增加Map和Reduce任务的数量。增加Map和Reduce任务的数量虽然可以减小分区的大小,使得内存可以容纳这个分区。但是,在Shuffle 写入环节,桶的数量是由Map和Reduce任务的数量决定的,任务越多桶的数量就越多,就需要更多的缓冲区,带来更多的内存消耗。因此,在内存使用方面,我们会陷入一个两难的境地,一方面,为了减少内存的使用,需要采取增加Map和Reduce任务数量的策略,另一方面,Map和Reduce任务数量的增多又会带来内存开销更大的问题。最终,为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行。也就是说尽管Spark经常被称为“基于内存的分布式计算框架”,但它的Shuffle过程依然需要把数据写入磁盘。
RDD 中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency),二者的主要区别在于是否包含Shuffle操作。总体而言,如果父RDD的一个分区只被一个子RDD的一个分区所使用就是窄依赖,否则就是宽依赖。窄依赖典型的操作包括map、filter、union等不会包含Shuffle操作;宽依赖典型的操作包括groupByKey、sortByKey等会包含Shuffle操作。Spark的这种依赖关系设计使得RDD数据集通过“血缘关系”记住了它是如何从其他 RDD 中演变过来的,当一个RDD的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。在两种依赖关系中,窄依赖的失败恢复更为高效,它只需要根据父RDD分区重新计算丢失的分区,而且可以并行地在不同节点进行重新计算。而对于宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父RDD分区,开销较大。此外,Spark还提供了数据检查点和记录日志,用于持久化中间RDD,从而使得在进行失败恢复时不需要追溯到最开始的阶段。在进行故障恢复时Spark会对数据检查点开销和重新计算RDD分区的开销进行比较,从而自动选择最优的恢复策略。
Spark根据DAG图中的RDD依赖关系,把一个作业分成多个阶段。逻辑上,每个RDD操作都是一个fork/join,把计算fork到每个 RDD 分区,完成计算后对各个分区得到的结果进行 join 操作,然后fork/join下一个RDD操作。如果把一个Spark作业直接翻译到物理实现,即执行完一个RDD操作再继续执行另外一个RDD操作,是很不经济的。首先,每一个RDD(即使是中间结果)都需要保存到内存或磁盘中,时间和空间开销大;其次,join 作为全局的路障(Barrier)代价是很昂贵的,所有分区上的计算都要完成以后,才能进行 join 得到结果,这样作业执行进度就会严重受制于最慢的那个节点。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join 合并为一个;如果连续的变换操作序列都是窄依赖,就可以把很多个 fork/join 合并为一个,通过这种合并,不但减少了大量的全局路障(Barrier),而且无需保存很多中间结果RDD,这样可以极大地提升性能。在Spark中,这个合并过程就被称为“流水线(Pipeline)优化”。
只有窄依赖可以实现流水线优化。对于窄依赖的RDD,可以以流水线的方式计算所有父分区。宽依赖的RDD都有Shuffle操作,即首先需要计算好所有父分区数据,然后在节点之间进行Shuffle,这个过程会涉及不同任务之间的等待,无法实现流水线方式处理。因此,RDD之间的依赖关系就成为把DAG图划分成不同阶段的依据。Spark通过分析各个RDD之间的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中(Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing)。每个阶段内部都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合,各个阶段之间存在依赖关系。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行。
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。对RDD进行分区,第一个功能是增加并行度,第二个功能是减少通信开销。通过对数据来源合理的分区,使得在执行连接操作时只需要在自身数据节点上数据进行连接,避免大量数据混洗,从而减少通信开销。RDD 分区的一个原则是使分区的个数尽量等于集群中的 CPU 核心(Core)数目。
RDD的转换操作 | filtermapflatMapgroupByKeyreduceByKey |
RDD的行动操作 | countcollectfirsttakereduceforeach |
持久化 | persistunpersistcache |
分区 | parallelizeclass Partitioner |
键值对操作 | groupByKeyreduceByKeykeysvaluessortByKeymapValuesjoincombineByKey |
Spark SQL
Spark SQL是Spark中用于结构化数据处理的组件,它提供了一种通用的访问多种数据源的方式,可访问的数据源包括Hive、Avro、Parquet、ORC、JSON和JDBC等。Spark SQL采用了DataFrame数据模型(即带有Schema信息的RDD),支持用户在Spark SQL中执行SQL语句,实现对结构化数据的处理。目前Spark SQL支持Scala、Java、Python等编程语言。Spark SQL可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系操作;其次,可以支持大量的数据源和数据分析算法,组合使用Spark SQL和Spark MLlib,可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力,满足各种复杂的应用需求。
DataFrame是一种以RDD为基础的分布式数据集,并提供了详细的结构信息,就相当于关系数据库的一张表。
val spark = SparkSession
# 读取文件创建DataFrame
val df = spark.read.csv("hdfs:x1/x2.csv")
# DataFrame写入文件
df.select("field1","field2").write.json("xx")
# 利用反射机制推断RDD转换为DataFrame
val df = spark.sparkContext.textFile("xxx.txt").map(v->Person(v(0),v(1))))).toDF()
# 编程定义RDD模式
# 定义字段 val fields = Array(StructField("name",StringType,true),StructField("age", IntegerType,true))
val schema = StructType(fields)
# 解析记录
val rdd = spark.sparkContext.textFile("xxx.txt").map(_.split(",")).map(v->Row(v(0),v(1))))).toDF()
# 合并
val df = spark.createDataFrame(rdd, schema)
Spark Streaming
Spark Streaming是构建在Spark上的流计算框架,Spark Streaming 可整合多种输入数据源,如 Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。Spark Streaming的基本原理是将实时输入数据流以时间片(通常在0.5~2秒之间)为单位进行拆分,然后采用Spark引擎以类似批处理的方式处理每个时间片数据。
Spark Streaming最主要的抽象是离散化数据流(Discretized Stream,DStream),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按照时间片(如1秒)分成一段一段,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终被转变为对相应的RDD的操作。在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的任务(Task)运行在一个Executor上,每个Receiver都会负责一个DStream输入流(如从文件中读取数据的文件流、套接字流或者从Kafka中读取的一个输入流等)。Receiver组件接收到数据源发来的数据后,会提交给Spark Streaming程序进行处理。
Spark Streaming和Storm最大的区别在于,Spark Streaming无法实现毫秒级的流计算,而Storm可以实现毫秒级响应。因为Spark Streaming将流数据分解为一系列批处理作业,在这个过程中,会产生多个Spark作业,且每一段数据的处理都会经过Spark DAG图分解、任务调度等过程,需要一定的开销,因此无法实现毫秒级响应。Spark Streaming难以满足对实时性要求非常高的场景。相比之下,Storm处理的数据单位为元组,只会产生极小的延迟。
Spark MLlib
Hive
Hive是一个基于Hadoop的数据仓库工具,可以对存储在Hadoop文件中的数据集进行数据整理、特殊查询和分析处理。Hive本身不存储和处理数据,Hive只是提供了一种用户编程接口,可以通过将HiveQL(类SQL) 解析为MapReduce任务,进而在MapReduce引擎上执行,然后输出查询结果。通过使用HiveQL可以避免编写复杂的MapReduce程序,适合数据仓库的统计分析。除了MapReduce,Hive还支持Tez,Kudu和Spark。
Hive HA是指将多个Hive实例组成资源池进行统一管理,由HAProxy对外提供统一的高可用的接口。
Hive与关系型数据库相比:
- Hive使用类SQL进行操作
- Hive操作的数据存储在Hadoop中,支持的文件格式包括textfile、sequencefile、rcfile。
- Hive支持在某些列上创建索引,但不支持主键和外键
- Hive使用一般使用MapReduce计算引擎
- Hive DML执行延迟较高。
示例代码
create database if not exists hive;
-- hive表氛围内部表和外部表。创建内部表时创建在hive配置的指定的目录下,创建外部表时需要制定文件保存的路径,路径中已经存在的文件会被当作已有的数据加载到表中。当删除内部表是表的元数据和文件数据都会被删除,删除外部表时只会删除表的元数据。
-- 创建hive内部表
create table if not exists usr(
id bigint,
name string,
age int comment '年龄'
) comment '用户信息表'
-- 按照ds和country分区,每个分区在单独的多级目录下。分区是表的部分列的集合,可以提高查询效率
partitioned by(ds string, country string)
-- 在分区的基础上通过哈希来实现分桶,将数据平均分配到3份文件中,可以同来进行数据抽样或提高JOIN操作效率
clustered by(id) sorted by(age desc) into 3 buckets
row format delimited -- 指定分隔符
fields terminated by ',' -- 字段之间用逗号分割
store as TEXTFILE;
-- 创建hive外部表
create external table if not exists usr(
id bigint,
name string,
age int comment '年龄'
) comment '用户信息表'
row format delimited
fields terminated by ','
store as TEXTFILE
location '/usr/local/...';
Impala
Impala是用于处理存储在Hadoop集群中大数据的MPP(大规模并行处理)SQL查询引擎,能够查询存储在HDFS、HBase、Kudu上的海量数据。Impala与Hive的区别主要在于,Hive底层采用MapReduce作为查询的执行引擎,而Impala采用基于内存的分布式查询引擎。MapReduce作为面向批处理的非实时计算框架,无法提供实时查询,而Impala则可以在几秒内分析PB级别的海量数据。因此Hive和Impala可以部署在同一套大数据平台上提供不同的功能。
Impala主要由3部分组成:Impala查询客户端、State Store和Impalad。
- Impala提供了3种查询客户端:Impala-shell、Hue界面和ODBC/JDBC驱动程序。
- Impalad(impala daemon)是安装在每个节点上的进程,负责协调客户端提交的查询的执行,给其它impalad分配任务以及收集其它impalad的执行结果进行汇总。Impalad进程主要包含query planner、query coordiantor和query exec engine3个模块,与HDFS的数据节点(HDFS DN)运行在同一个节点上,并完成分布运行在MPP架构上。
- State Store负责收集分布在集群中的各个Impalad进程的资源信息,跟踪impalad的健康状态及位置信息,处理impalad的注册订阅和心跳信息。各Impalad都会缓存一份State Store中的信息。当State Store离线后,Impalad一旦发现State Store处于离线状态时,就会进入恢复模式,并进行反复注册。当State Store重新加入集群后,自动恢复正常,更新缓存数据。
- 此外Impala海复用了Hive Metastore维护表的元数据。
Impala查询执行过程
- 执行查询前,impala先创建impalad进程负责具体的查询任务,impalad进程向State Store提交注册订阅信息。
- Impalad的Query Planner对SQL语句进行解析,生成解析树。然后,Planner把这个查询的解析树变成若干PlanFragment,发送到Query Coordinator。其中,PlanFragment由PlanNode组成的,能被分发到单独的节点上执行。每个PlanNode表示一个关系操作和对其执行优化需要的信息。
- Query Coordinator从MySQL元数据库中获取元数据,从 HDFS 的名称节点中获取数据节点地址。
- Query Coordinator初始化相应Impalad上的任务,把查询任务分配给所有存储这个查询相关数据的数据节点。
- Query Executor通过流式交换中间输出,并由Query Coordinator汇聚来自各个Impalad的结果
Flink
Flink是一个兼具高吞吐、低延迟和高性能的实时流计算框架。Flink同时支持批处理和流处理、状态管理、有条件的精确一次的状态一致性保障、时间窗口等。Flink可运行于独立的集群模式中,也可以运行在YARN、Mesos、Kubernates等。
第一代流处理框架Storm虽然可以做到低延迟,但是无法实现高吞吐,也不能在故障发生时准确地处理计算状态。
第二代流处理框架Spark Streaming通过采用微批处理方法实现了高吞吐和容错性,但是牺牲了低延迟和实时处理能力。
第三代流处理框架Flink实现了Google Dataflow流计算模型,是一种兼具高吞吐、低延迟和高性能的实时流计算框架,并且同时支持批处理和流处理。此外,Flink支持高度容错的状态管理,防止状态在计算过程中因为系统异常而出现丢失。
Flink Runtime核心层负责对上层接口提供基础服务,是Flink分布式计算框架的核心实现层。Runtime核心层提供了两套核心的API:流处理(DataStream API)和批处理(DataSet API)。
API&Libraries层在Runtime核心层的基础上抽象出不同应用类型的组件库,如CEP(基于流处理的复杂事件处理库)、SQL&Table库(既可以基于流处理,也可以基于批处理)、FlinkML(基于批处理的机器学习库)、Gelly(基于批处理的图计算库)等。
Flink运行架构
Flink运行架构包括JobManager和TaskManager。就系统架构而言,Flink也采用“主从架构”,包含一个Master(JobManager)和多个Slave(TaskManager)。
当执行一个Flink应用时,首先将应用提交给Job Client,然后Job Client将应用提交给资源管理器/分发器,然后分发器为每个应用创建一个单独的任务管理器,然后由任务管理器向资源管理器申请需要的资源。资源申请完成后,将任务提交给申请到的任务管理器。在接收任务时,TaskManager启动一个线程以开始执行。同时,TaskManager会向JobManager报告任务的状态更改。作业可以有各种状态,例如开始执行、正在进行或已完成。作业执行完成后,其结果将发送回客户端(Job Client)。
Flink程序运行流程
- 当执行一个Flink程序时,首先将程序Jar包提交给Job Client,JobClient将业务逻辑转换为JobGraph数据结构。在对数据进行转换过程中,Flink会将多个函数组成一个算子,每个程序的JobGraph就是一组算子的有向无环图。
- Job Client将应用提交给资源管理器/分发器。分发器负责为每个应用创建一个单独的任务管理器。
- 任务管理器将JobGraph转化为ExecutionGraph并向资源管理器申请资源,然后将ExecutionGraph以及具体的子任务部署到多个TaskManager上。JobManager还负责管理多个TaskManager,包括收集作业的状态信息、生成检查点、必要时进行故障恢复等。因为程序是被部署到多个TaskManager执行的,每个算子的并行度都可以自定义。每个酸子有多少并行度,就有多少个算子子任务。因此Flink需要根据JobGraph解析出ExecutionGraph数据结构,然后生成物理执行视图,也就是每个算子子任务的有向无环图。在构造物理执行视图时,Flink还会对算子子任务链接在一起组成算子链提交给同一个TaskManager执行,减少算子子任务的传输开销。
- TaskManager是实际负责执行计算的节点。同MapReduce、Spark一样,算子子任务执行期间存在交换数据的情况。
DataStream编程
Flink程序的编写分为5部分:
- 设置执行环境
- Source读取数据源
- Transformation对数据进行转换
- Sink输出转换结果
- 调用作业执行函数
Source和Sink
Flink常用的Source和Sink有标准输出、基于Socket、基于文件系统、Kafka、ES、HBase、Flume、Kudu、Redis等。我们也可以通过实现SourceFunction、RichSourceFunction、CheckpointedFunction、ParallelSourceFunction、RichParallelSourceFunction等来自定义Source。
在后续的Flink中对Source做了优化以支持批处理计算,实现批流一体的编程模型。新的Source接口包括三个组件:
- 分片(Split):Split是将数据源切分后的一小部分。
- 读取器(SourceReader):SourceReader负责Split的读取和处理,SourceReader运行在TaskManager上,可以分布式地并行运行。比如,某个SourceReader可以读取文件夹里的单个文件,多个SourceReader实例共同完成读取整个文件夹的任务。
- 分片枚举器(SplitEnumerator):SplitEnumerator负责发现和分配Split。SplitEnumerator运行在JobManager上,它会读取数据源的元数据并构建Split,然后按照负载均衡策略将多个Split分配给多个SourceReader。
Transformation
转换操作主要包括4种。
- 单数据流转换,如map、filter、flatMap;
- 基于key的分组转换:keyBy。将DataStream转换为KeyedStream,相同的key的数据会被发送到同一个算子子任务;
- 多数据流转换:union、 connect。union将多个同类型的DataStream合并为一个DataStream,connect将两个不同类型的DataStream合并为一个DataStream。connect()经常被应用于使用一个控制流对另一个数据流进行控制的场景,控制流可以是阈值、规则、机器学习模型或其他参数。
- 数据重分布转换:shuffle、rebalance、rescale、broadcast、global、partitionCustom。默认情况下,数据是自动分配到多个子任务上。shuffle基于正态分布,将数据随机分布到下游各算子子任务上。rebalance使用Round-Ribon方法将数据均匀分布到各子任务上。rescale也是将数据均匀分布到下游各子任务上,是就近发送给下游子任务,传输开销更小。global会将所有数据发送给下游算子的第一个子任务上。
时间窗口
窗口
流处理中时间窗口包括三种:滑动窗口、滚动窗口和会话窗口。还有一种基于数量的窗口
滑动窗口(Sliding Window)模式设有一个固定的窗口长度和一个固定的滑动间隔,滑动间隔和窗口长度可以不相等。比如滑动间隔为1分钟,窗口大小为5分钟,这种情况下窗口和窗口之间会有重叠。
滚动窗口(Tumbling Window)是滑动窗口的特例,窗口长度和滑动间隔相等。
会话窗口(Session Window)模式的窗口长度不固定,而是通过一个间隔来确定窗口,这个间隔被称为会话间隔(Session Gap)。当两个事件之间的间隔大于会话间隔,则两个事件被划分到不同的窗口中;当事件之间的间隔小于会话间隔,则两个事件被划分到同一窗口。
时间语义
Flink支持3种时间语义。
- Event Time指数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。
- Processing Time指使用算子的当前节点的操作系统时间。
- Ingestion Time是事件到达Flink Source的时间。从Source到下游各个算子中间可能有很多计算环节,任何一个算子处理速度的快慢可能影响下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间,因此不会被算子处理速度影响。
在Flink中使用Event Time作为时间语义时需要设置Watermark。Watermark是Flink插入到数据流中的一种特殊的数据结构,它包含一个时间戳,并假设后续的数据的Event Time大于该时间戳的数据。我们可以通过参数设置Flink的JobManager周期性的向算子中发送Warkmark,并设置Watermark的准确度和延迟。比如,设置Watermark每5秒发送一次,假如截止到目前数据的最大Event Time为10秒,延迟5秒发送,时间戳为最大EventTime+3秒,那么在第15秒的时候JobManager会向算子中发送水印,时间戳为13秒,表示在第15秒的时候,EventTime为13秒内的数据已经发送完成。当算子接收到该Watermark时就认为该窗口的数据已经接收完成,从而触发后续的计算。Watermark是一种在延迟和准确性之间平衡的策略,Watermark与事件的时间戳贴合较紧,一些重要数据有可能被当成迟到数据,影响计算结果的准确性;Watermark设置得较大,整个应用的延迟增加,更多的数据会先被缓存以等待计算,会增加内存的压力。因此对待具体的业务场景,我们可能需要反复尝试,不断迭代和调整时间策略。
对于Event Time,我们使用Watermark来判断数据是否迟到。一个迟到数据是指数据到达窗口算子时,该数据本该被分配到某个窗口,但由于延迟窗口已经触发计算。对于迟到的数据有3种处理方式:直接丢弃;发送到另一个数据流;重新执行计算。
Flink支持在KeyedStream上使用Timer(定时器),通过Timer注册一个未来的时间,当时间到达时,Flink会回调指定的函数,指定用户自定义的业务逻辑。
状态管理
Flink支持数据的状态管理,通过状态我们可以支持数据去重、故障恢复、有条件的精确一次保障、检查点机制、保存点持久化备份机制、并行度平滑扩展。
Flink状态分为2种:托管状态(Managed State)和原生状态(Raw State)。托管状态由Flink Runtime托管,自动存储、自动恢复、自动扩展。原生状态需要有开发者自己维护。托管状态分为Keyed State和Operator State。Keyed State是KeyedStream上的状态,每个Key对应一个自己的状态。Operator State可以用在所有算子上,每个算子子任务共享一个状态,流入这个算子子任务的所有数据都可以访问和更新这个状态。
根据数据结构KeyedState包括BroadcastState、ValueState、MapState、AppendingState(ListState、ReducingState、AggregatingState)
检查点
Flink定期保存状态数据到存储空间上,故障发生后从之前的备份中恢复,这个过程被称为Checkpoint机制。默认情况下,Checkpoint机制是关闭的,需要调用env.enableCheckpointing(n)来开启,每隔n毫秒进行一次Checkpoint。Checkpoint是一种负载较重的任务,如果状态比较大,同时n又比较小,那可能一次Checkpoint还没完成,下次Checkpoint已经被触发,占用太多本该用于正常数据处理的资源。增大n意味着一个作业的Checkpoint次数更少,整个作业用于进行Checkpoint的资源更少,可以将更多的资源用于正常的流数据处理。同时,更大的n意味着重启后,整个作业需要从更长的Offset开始重新处理数据。
Checkpoint Barrier是Flink插入到数据流中的检查点屏障,将数据流分割成一段一段,用来表示此时需要进行保存检查点。每个Checkpoint Barrier一个唯一id,从上次Checkpoint Barrier到本次Checkpoint Barrier之间的数据属于该Checkpoint Barrier。当ID为n的Checkpoint Barrier到达算子后,表示要对n-1和n之间状态更新做snapshot。当算子做完snapshot后,还会给Checkpoint Coodinator发送确认(Acknowledgement,ACK),告知自己已经做完了相应的工作。同时向下游广播Checkpoint Barrier。下游算子可能接收多个上游的Checkpoint Barrier,必须等待上游所有算子的相同ID的Checkpoint Barrier到达才能进行snapshot。
Flink对Checkpoint机制做了一定的优化:
- 异步快照。使用后台线程复制本地状态进行snapshot,当进行snapshot的时,立即向下广播Checkpoint Barrier,不会停止数据处理。
- 允许跳过对齐(Unaligned Checkpoint)。一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,就可以直接广播Checkpoint Barrier。对晚到的数据再次进行snapshot。
保存点
Checkpoint机制和Savepoint机制在代码层面使用的分布式快照逻辑基本相同,生成的数据也几乎一样。Checkpoint机制的目的是为了故障重启,使得作业中的状态数据与故障重启之前的保持一致,是一种应对意外情况的有力保障。Savepoint机制的目的是手动备份数据,以便进行调试、迁移、迭代等,是一种协助开发者的支持功能。就像数据库的binlog日志、redolog和dump的区别。
精确一次(Exactly-Once)保障
精确一次是指系统对每一条数据处理一次,即使是在发生故障时也能够保证,不漏掉每一条数据,不重复处理每一条数据。
checkpoint机制可以让Flink重启或故障恢复时保证Flink内部状态的Exactly-Once一致性,但并不能保证端到端的精确一次。checkpoint机制只能让算子恢复到之前的snapshot状态,但是系统停止之前的已经被发送到外部系统的还没有来得及进行下一次checkpoint的数据就会在服务启动后重新处理,导致数据重新发往外部系统。
端到端的精确一次需要依赖source和sink。source本身要支持记录状态且支持重发,如kafka、pulsar,像socket就无法支持数据重发;Sink则要支持幂等。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢失、不重复。
想要保证数据对下游只产生一次影响还需要sink支持幂等写或事务写。如果是支持幂等写,同时业务要考虑到时间闪回的问题。如果是事务写,Flink支持两种方式:预写日志(Write-Ahead-Log,WAL)和两阶段提交(Two-Phase-Commit,2PC)。Write-Ahead-Log方式使用Operator State缓存待输出的数据。事务写能提供端到端的Exactly-Once保障,它的代价是牺牲延迟,输出数据不再实时写入外部系统,而是分批次地提交。
预写日志方式:当Checkpoint Barrier到达时,sink会将待输出批次写入预写日志(Operator State),同时后续数据开启新的批次,后续数据属于下一次Checkpoint Barrier。然后调用外部系统检查该批次是否已经写入,如果还未提交则写入,同时外部系统更新写入记录,Sink删除Operator State中存储的已经提交的数据。不过,Write-Ahead-Log仍然无法提供百分之百的Exactly-Once保障,写入外部系统可能失败或者写入外部系统后Sink可能没有收到返回结果,都可能导致数据被多次写入。
Table API和SQL
Table API和SQL以关系型数据库中表为基础模型,提供一个批流一体的编程模型。使用Table API和SQL时我们可以像访问关系型数据库表一样通过SQL访问流数据。Flink使用执行计划器将Table API和SQL转换为可执行的Flink作业。具体步骤为Table API和SQL首先转换为一个没有优化过的逻辑执行计划(Logical Plan),之后优化器(Optimizer)会对Logical Plan进行优化得到物理执行计划(Physical Plan),然后将物理执行计划转换为Flink的Transformation,然后转换为JobGraph,最后JobGraph提交到Flink集群执行。
在对数据进行处理之前首先要定义并注册表到TableEnvironment中。注册表的方式有3种。
- 使用SQL DDL中的CREATE TABLE…创建表。
- 使用TableEnvironment.connect()连接一个外部系统,如文件系统、kafka、es、hbase。
- 从Catalog中获取已注册的表。
Catalog用于记录并管理各类元数据信息,包括数据库、数据库表、数据存储形式 (文件、MQ等)、函数等。常见的Catalog包括:GenericInMemoryCatalog、HiveCatalog和用户自定义的Catalog。在企业级的应用中我们都是用HiveCatalog管理元数据,Hive可以管理包括纯Hive表和非Hive表。
当在流数据使用SQL查询时需要根据流入的数据动态的输出查询结果。动态表的输出有2种模式。一是在之前的输出结果末尾追加,称为追加模式(Append-only)(DataStream dsRow = tEnv.toAppendStream(table, Row.class));二是既在末尾追加,有对已有数据进行更新,称为更新模式(Update)。Update分为2种:撤回模式(Retract)是将旧数据撤回再添加新数据,使用Retract模式会使用一个标志位来表示数据是否已经撤回(DataStream> ds = tEnv.toRetractStream(table, Row.class)),Retract模式会导致表的数据越来越大;更新或插入模式(Upsert)是直接在旧数据上进行更新。
SQL中专门进行窗口处理的函数为OVER WINDOW。OVER WINDOW与GROUP BY有些不同,它对每一行数据都生成窗口,在窗口上进行聚合,聚合的结果会生成一个新字段。或者说,OVER WINDOW一般是指一行变一行。
程序示例
// 快照存储在HDFS
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));
// 启用checkpoint机制
env.enableCheckpointing(n);
//清除空闲状态数据
env.getConfig().setIdleStateRetentionTime(Time.hours(1), Time.hours(2));
// 使用Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 添加source
DataStream<T> stream = env.addSource(new SimpleSource());
// 侧输出OutputTag
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
stream
.windowAll(<WindowAssigner>) // 数据流中的元素分配到相应窗口
.trigger(<Trigger>) // 窗口触发器决定何时启动窗口处理函数来处理窗口中的数据
.evictor(<Evictor>) // 窗口清除器用来在窗口处理函数执行前和执行后清除一些数据
.reduce/aggregate/process() // 窗口处理函数处理窗口中的元素集合
DataStream result = stream
.keyBy(<KeySelector>) // 数据流中的元素按key分组
.window(<WindowAssigner>) // 数据流中的元素分配到相应窗口
.allowedLateness(<time>) // 允许迟到多久
.sideOutputLateData(lateOutputTag) // 迟到数据发送到侧输出
.trigger(<Trigger>) // 窗口触发器
.evictor(<Evictor>) // 窗口清除器
.reduce/aggregate/process() // 窗口处理函数处理窗口中的元素集合
DataStream lateStrema = result.getSideOutput(lateOutputTag); // 获取测输出流
// 基于StreamExecutionEnvironment创建TableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 读取数据源创建表
tEnv.connect(...)
.createTemporaryTable("t1");
tEnv.connect(...)
.createTemporaryTable("t2");
// 使用API查询
Table r1 = tEnv.from("t1").select(...);
Table r2 = tEnv.sqlQuery("select f1,f2 from t1");
//输出查询结果
r1.executeInsert("t2");
-- 基于SQL的时间窗口
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
-- 在原有Schema基础上添加一列proctime
proctime as PROCTIME()
) WITH (
...
);
-- 使用processtime进行计算
SELECT
user_id,
COUNT(behavior) AS behavior_cnt,
-- 滚动窗口结束时间。TUMBLE_START或TUMBLE_END返回的是可展示的时间,已经不再是一个时间属性,无法被后续其他查询用来作为时间属性做进一步查询。假如我们想基于窗口时间戳做进一步的查询,比如内联视图子查询或Join等操作,我们需要使用TUMBLE_ROWTIME和TUMBLE_PROCTIME
TUMBLE_END(proctime, INTERVAL '1' MINUTE) AS end_ts
FROM user_behavior
-- 窗口聚合
GROUP BY user_id, TUMBLE(proctime, INTERVAL '1' MINUTE)
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
-- 定义ts字段为Event Time时间戳,Watermark比时间戳最大值还延迟了5秒
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
...
);
SELECT
TUMBLE_END(rowtime, INTERVAL '20' MINUTE),
user_id,
SUM(cnt)
FROM (
SELECT
user_id,
COUNT(behavior) AS cnt,
TUMBLE_ROWTIME(ts, INTERVAL '10' SECOND) AS rowtime
FROM user_behavior
GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)
)
GROUP BY TUMBLE(rowtime, INTERVAL '20' MINUTE), user_id
-- 基于行的over window
SELECT
user_id,
behavior,
COUNT(*) OVER w AS behavior_count,
ts
FROM user_behavior
WINDOW w AS (
PARTITION BY user_id
ORDER BY ts
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
-- 基于时间的over window
SELECT
user_id,
COUNT(*) OVER w AS behavior_count,
ts
FROM user_behavior
WINDOW w AS (
PARTITION BY user_id
ORDER BY ts
RANGE BETWEEN INTERVAL '2' SECOND PRECEDING AND CURRENT ROW
)
-- hivecatalog
TableEnvironment tEnv = ...
// 创建一个HiveCatalog
// 4个参数分别为:catalogName、databaseName、hiveConfDir、hiveVersion
Catalog catalog = new HiveCatalog("mycatalog", null, "<path_of_hive_conf>",
"<hive_version>");
// 注册Catalog,取名为mycatalog
tEnv.registerCatalog("mycatalog", catalog);
// 创建一个Database,取名为mydb
tEnv.executeSql("CREATE DATABASE mydb WITH (...)");
// 创建一个表,取名为mytable
tEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
// 返回所有表
tEnv.listTables();
图计算
略
Beam
Beam是由谷歌贡献的Apache顶级项目,是一个开源的统一的编程模型,开发者可以使用Beam SDK来创建数据处理管道,然后,这些程序可以在任何支持的执行引擎上运行,比如运行在Spark、Flink上。如图1-10所示,终端用户用Beam来实现自己所需的流计算功能,使用的终端语言可能是Python、Java等,Beam为每种语言提供了一个对应的SDK,用户可以使用相应的SDK创建数据处理管道,用户写出的程序可以被运行在各个Runner上,每个Runner都实现了从Beam管道到平台功能的映射。通过这种方式,Beam使用一套高层抽象的API屏蔽了多种计算引擎的区别,开发者只需要编写一套代码就可以运行在不同的计算引擎之上。
大数据集群资源管理调度
YARN
在YARN中JobTracker在集群中扮演Master的角色,负责任务调度和集群资源监控,不参与具体的计算。一个Hadoop集群只有一个JobTracker,存在单点故障的可能。TaskTracker在集群中扮演了Slave的角色,负责汇报心跳和执行JobTracker的命令。一个集群可以有多个TaskTracker,但一个节点只会有一个TaskTracker,并且TaskTracker和DataNode运行在同一个节点中,这样一个节点既是计算节点又是存储节点。JobTracker通过调度任务在TaskTracker上执行。TaskTracker运行任务的同时把进度报告到JobTracker,如果其中一个任务失败,JobTracker则重新调度任务到另一个TaskTracker。
用户向Hadoop提交作业,JobTracker会将该作业拆分为多个任务,并根据心跳信息交由空闲的TaskTracker启动。一个TaskTracker能够启动的任务数量是由TaskTracker配置的任务槽(slot)决定。槽是Hadoop的计算资源的表示模型,Hadoop将各个节点上的多维度资源(CPU、内存等)抽象成一维度的槽,这样就将多维度资源分配问题转换成一维度的槽分配的问题。在实际情况中,Map任务和Reduce任务需要的计算资源不尽相同,Hadoop又将槽分成Map槽和Reduce槽,并且Map任务只能使用Map槽,Reduce任务只能使用Reduce槽。
大数据搜索引擎
ElasticSearch
ElasticSearch是一款强大的开源搜索引擎和大数据近实时分析引擎,可以快速从海量的数据中找到相应的内容,它可以将海量的结构化的、非结构化的数据配合其它中间件按需创建可视化的报表,对监控数据设置报警阈值,甚至使用机器学习技术自动识别异常及时发出报警。es是一个分布式的搜索引擎,集群的规模可以从单个节点扩展到数百个节点,从而实现服务的高可用和水平扩展。es提供了多种语言的sdk,同时还提供了RESTful API,可以很方面的接入。在实际开发中,es可以单独存储数据,也可以结合支持事务的数据库结合使用。
除了搜索还可以结合Elastic Stack中的其它中间件像Kibana、Logstash、Beats, es还可以应用于大数据的近实时分析,比如日志分析、指标监控等 。在elastic生态中,es负责存储和计算数据,kibana提供可视化的洁面,Logstash和beat进行数据抓取。logstash是服务器端数据处理管道,可以从不同的来源采集数据,转换数据然后发送到不同的存储库。beats是轻量级的数据采集器。cerebro可以原来管理es集群。
技术对比
Hive与ClickHouse
优点 | 缺点 | 适用场景 | |
Hive | 横向无限扩展支持更新、删除操作 | 部署、运维效率低。Hive构建在Hadoop生态上,需要部署对Hadoop组建进行部署运维。延迟高。Hive 一般采用MapReduce作为计算引擎,MapReduce作为面向批处理的非实时计算框架,无法提供实时查询。成本高。HDFS默认使用三副本策略存储数据,存储成本高。Hadoop组件较多HDFS、ZK、YARN、MapReduce等,运维成本高。 | |
ClickHouse | 实时查询速度快单机性能强大部署运维成本低 | 单机数据量有限,需要通过MPP架构实现数据横向扩展不支持更新、删除操作 | 实时报表生成日志分析营销人群圈选 |
HBase与ClickHouse
优点 | 缺点 | 适用场景 | |
HBase | 底层使用了HDFS,具备大数据海量存储能力HBase是KV数据库,具有高效的点查能力 | ||
ClickHouse | 提供实时分析查询单机性能强大部署运维成本低 | 单机数据量有限,需要通过MPP架构实现数据横向扩展不支持更新、删除操作点查能力弱 | 实时报表生成日志分析营销人群圈选 |
ES与ClickHouse
日志分析
参考
- 各种框架的官方网站
- 《大数据技术原理与应用:概念、存储、处理、分析鱼应用》
- 《Hadoop权威指南》
- 《Spark编程基础》
- 《Flink原理与实践》
- 《HBase原理与实践》