标点符(钱魏 Way)

HLLC基数估算算法在腾讯数据仓库TDW中应用

分布式数据仓库(TDW)是一个以hive + hadoop为基础的大规模分布式系统,它提供了一种类SQL语言(称为HQL)让用户可以方便在其上进行编程开发。在数据分析领域,经常需要计算数据集不同元素的唯一值(Distinct值),也称为基数计算。 精确的基数计算需要消耗过多的计算资源,当数据量非常大时,这种资源的消耗就更加严重。因此在大数据领域,采用估值算法降低基数计算的成本成为一种新的选择。目前已经出现了一些优秀的基数估值算法,例如HLLC(HyperLogLog Counting)等。TDW引入了HLLC算法,用户可以写HQL简单的使用它。在能够容忍一定精度损失的前提下,HLLC算法往往拥有比精确计算高得多的计算效率。HLLC算法在TDW上已经取得了较多的应用并且收到了较好的效果。

TDW提供了一个特殊的聚合函数—est_distinct来实现基数估值的算法。这个函数的使用方法与一般的聚合函数基本一致。例如需要计算表 table1的a,b两列的基数,以c列分组,精确计算和估值的语法如下:

  • 精确计算:Select count(distinct a), count(distinct b) from table1 group by c;
  • 估值计算:Select est_distinct(a), est_distinct(b) from table1 group by c

这样,数据分析人员只需要简单写一条SQL就可以在TDW中使用基数估值算法了。

est_distinct的基本流程

1、HQL翻译

HQL最终需要翻译成MapReduce Job才能够执行。具体的翻译过程包括:语法解析,语意分析,逻辑执行计划生成,优化和物理执行计划生成等过程。est_distinct函数自然也不例外。est_distinct函数生成的逻辑执行计划和物理执行计划如下图所示:

hsq

由图可知,est_distinct函数最终会生成一个MapReduce Job,它的计算过程主要由一个Map端局部聚合算子,MapReduce Shuffle和一个Reduce端全局聚合算子三个阶段组成。

2、Map端计算

Map端主要计算过程实际上就是维护一张hash表。这张hash表的key就是Group by的key值,value是一个被划分为M个桶的数组(可以根据不同的精度要求选择不同的桶个数,典型值为64K,下面均以64K为例来说明算法流程,每个桶的大小为1个字节)。

map

当一行新的数据输入时,首先得到这一行数据的group by key值,如果这个key在Hash表中不存在,则需要将这个新的Key值插入hash表中,并为它初始化一个64K字节数组。接下来解析出需要做基数估值的列值,根据该值计算一个hash值(TDW使用的是murmurhash),这是一个64位的整数值:前16为作为桶的序号,取后48位的第一个“1”出现的位置作为当前行的桶存储值,如果该值比桶中已有的值大,则需要更新桶中的值,即保证桶中存储的值永远是最大的。这样Map段局部聚合的输出结构是一个<key,value>对,key就是Hash表的key值,value就是key对应的64K字节数组。接下来,Hadoop的shuffle过程将所有相同key的输出行送入同一个reduce中。由Reduce计算出最终的基数估算值。

3、Reduce端计算

Reduce计算的第一步就是要将相同key下所有的value进行合并得到一个全局的数组。合并的过程如下图所示:取得所有value的相同序号的桶的最大值,将这个最大值作为结果桶的值。待64K个桶都处理完毕后,得到一个全局的<key,value>对。

reduce

得到全局的<key,value>对后,就可以使用HLLC的公式估算基数值了:

gongshi

其中m为桶的个数(m=64K),M[i]为第i个桶的值

4、内存管理机制

在TDW中实现HLLC算法的难点并不在于实现算法流程本身,而是如何高效的使用内存和IO资源。为了保证足够的精度,HLLC算法通常需要更大的桶个数,这就意味着需要更多的内存。不管是map端局部的聚合还是reduce端全局的聚合,内存资源都是很宝贵的,这两者都需要维护一张巨大的hash表。在理想情况下,数据分布比较集中,也就是key个数较小,那么整张hash表都可以缓存在内存中,在计算过程中不用将hash表记录落地到磁盘,这样map端输出的数据量将非常小,shuffle过程可以迅速完成。

但实际情况往往并非如此:数据分布比较分散,数据比较倾斜,数据集中于少数几个key,其余绝大部分key对应的记录都很少。这两种情况下就意味着内存hash表行数较多,内存容纳不下整张hash表,因此如何在有限的内存中存下尽可能多的hash表记录成为了决定算法效率的关键。

分而治之是最容易想到的方法。在分配hash桶的时候没有必要一次性分配一个完整大小的桶,而是每次分配一小块内存(可配置)。这样的hash桶分配方式具有一定的自适应性,对应记录越多的key被分配的内存块越多,提高了内存使用效率。块大小也会对内存使用效率产生影响,块越小每次使用效率越高,但是需要更高的维护代价。当hash表占用的内存达到堆内存的设定比例时,需要将一部分<key,value>对输出到环形缓冲区。如何选择需要输出的key也会在一定程度上影响map输出结果的大小,目前是采用随机选择的方式。选出需要输出的key后需要对相应的<key,value>进行序列化操作,为了降低序列化的计算成本,同时为了减小序列化产生的数据量,TDW专门为此实现了一套特殊的SerDe方式。如下图所示,value事实上是由很多个小的内存块组成,它们被加上序号后写入二进制流。

kv

这种方案的优点是实现简单,也是TDW目前采用的实现方式,在实际应用中也取得了比较好的效果。一种更好的方案是采用内存池,进一步降低内存分配和维护的代价;同时也可以考虑自适应的hash表key淘汰策略,这些是TDW未来进一步优化的方向。

在实际应用中如何选择不同的算法

在允许有一定精度损失的条件下,TDW目前有两种计算方式可选:精确计算(count(distinct))和估值计算(est_distinct)。如何在实际应用中选择更加高效的算法呢?从前面几部分描述的算法过程也可以看得出来,估值算法的效率和数据分布有着很大的关系。下图简单描述了两种算法的内存和IO的对比。精确算法采用hash表进行去重,然后将去重后的value与key组成一对对<key,value>对输出,所以它的内存占用和IO消耗都和value的个数成正比,而估值算法的内存占用虽然也与value个数相关,但是有一个上限,不会超过设置的桶个数M。因此,数据分布越集中,估值算法的优势越明显。精确计算(上)vs 估值计算(下)内存和IO使用对比:

duibi

计算效果

下表展示了精确计算和估值计算的效率对比。选取了一个在生产环境使用的表,分别使用精确算法和估值算法对其中一些列进行基数计算(不带group by语句),分别测试了单列distinct和multi-distinct(2~5个列)。

在测试过程中,我们使用了64K的桶大小,估值计算的精度在99.4%以上。可以看出,在较为理想的条件下(数据分布集中),估值算法能够较大程度提升计算效率。较理想条件下估值算法和精确算法效率对比:

test

参考文档:http://mta.qq.com/mta/bigdata/?p=567

码字很辛苦,转载请注明来自标点符《HLLC基数估算算法在腾讯数据仓库TDW中应用》

评论