Druid的Select查询初探


Druid是一个分布式的支持实时分析的数据存储系统,在大数据分析领域比较常用。如果想要了解更多相关知识,可以阅读官方的文档,并且推荐《Druid实时大数据分析原理与实践》这本书。


因为Druid的原始数据量很大,大部分情况都是查询按指定维度聚合后数据,此时一般用groupby/timeseries/topn方法,如果要查询原始的行,则需要用到select/scan方法,本文简单介绍一下select方法。Druid的查询语句使用json格式构建,然后发送http请求到对应的查询节点,返回的结果也是json格式,现在我们用pythonpydruid来查询,形式上区别不大。


SELECT查询示例代码

from pydruid.client import *

# Druid的broke节点地址
broke_host = 'ec2-xx-xx-xx-xxx.cn-north-1.compute.amazonaws.com.cn'
port = 8082
end_point = 'druid/v2'

client = PyDruid("http://{}:{}/".format(broke_host, port), end_point)

# 查询的参数
query = {
    'datasource': 'your datasource name',  # 类似SQL中的table 
    'intervals': '2018-07-30T19:25:04+00:00/2018-08-06T19:25:04+00:00',
    'paging_spec': {"pagingIdentifiers": {}, "threshold": 5}
}

# 进行查询
client.select(**query)

# 获取结果
result = client.__dict__['result']

关于查询的参数

Druidselect查询可以传入很多参数,详情可以查看Druid Doc Select Queries,这里说说granularitypaging_spec两个参数,其他参数相对含义比较简单。

granularity

granularity顾名思义就是粒度,指的是时间粒度,select查询返回的结果是原始的行,因此这个参数只会改变查询结果的形式,而内容不会变。上面的代码中的query没有granularity,此时使用的是默认值granularity='all',在这种情况下,返回的结果只有一个bucket,所有结果都在这个bucket里,如果参数是granularity='hour',那么每一个小时就有一个bucket,每个bucket里是属于这个小时的数据。举个例子就明白了:

# 原始数据
''' 
timestamp                user        money
2018-08-01T00:00:00      a           1.2
2018-08-01T01:00:00      b           1.3
2018-08-02T00:00:00      c           1.4
2018-08-02T00:00:00      d           1.5
'''

# granularity='all'时的结果,返回的列表只有1个元素,即1个bucket
result = [{'timestamp': '2018-08-01T00:00:00', 
           'result':{
                'pagingIdentifiers': {...省略...},
                'events': { 4行数据 }
                }}]

# granularity='day'时的结果,返回的列表有2个元素,即2个bucket
result = [{'timestamp': '2018-08-01T00:00:00', 
           'result':{
                'pagingIdentifiers': {...省略...},
                'events': { 08-01的2行数据 }
                }},
        {'timestamp': '2018-08-02T00:00:00', 
           'result':{
                'pagingIdentifiers': {...省略...},
                'events': { 08-02的2行数据 }
                }}]
                        
# granularity='hour'时的结果,返回的列表有3个元素,即3个bucket                      
result = [{'timestamp': '2018-08-01T00:00:00', 
           'result':{
                'pagingIdentifiers': {...省略...},
                'events': { 08-01 0时的1行数据 }
                }},
        {'timestamp': '2018-08-01T01:00:00', 
           'result':{
                'pagingIdentifiers': {...省略...},
                'events': { 08-01 1时的1行数据 }
                }},
        {'timestamp': '2018-08-02T00:00:00', 
           'result':{
                'pagingIdentifiers': {...省略...},
                'events': { 08-02 0时的2行数据 }
        }}]

因此我们可以看到,不论granularity选择什么,都可以获取的原始的4行数据,只是分组的形式不一样。


paging_spec

paging_spec是分页的参数,上面的代码中我们传入的参数是paging_spec={"pagingIdentifiers": {}, "threshold":5}threshold很好理解,类似SQL中的limit,表示每个bucket只返回5条数据,那么pagingIdentifiers就可以理解成从哪里开始分页,当它为空的时候,表示从第一条数据开始计数共计返回5条数据,此时返回的结果如上面所示,也有一个pagingIdentifiers字段,如果把query中的pagingIdentifiers替换成上一次查询结果中的pagingIdentifiers,我们就可以获取到下一页的结果了。也简单举个例子如下:

# 原始数据
''' 
timestamp                user        money
2018-08-01T00:00:00      a           1.2
2018-08-01T01:00:00      b           1.3
2018-08-02T00:00:00      c           1.4
2018-08-02T00:00:00      d           1.5
'''

# granularity='all', paging_spec={"pagingIdentifiers": {}, "threshold": 1}时的结果
result = [{'timestamp': '2018-08-01T00:00:00', 
           'result':{
                'pagingIdentifiers': {key1: value1},
                'events': { 第1行数据 }
                }}]
                
# 把上次查询结果中的pagingIdentifiers放入到查询query的pagingIdentifiers里
# granularity='all', paging_spec={"pagingIdentifiers": {key1: value1}, "threshold": 1}
result = [{'timestamp': '2018-08-01T00:00:00', 
           'result':{
                'pagingIdentifiers': {key2: value2},
                'events': { 第2行数据 }
                }}]             

以此类推,因此我们要查询某段时间的所有行,可以不断替换pagingIdentifiers参数,直到events的结果是空的时候停止,当然也可以把threshold设得特别大,一次返回所有的行。还有一点需要注意的就是如果有多个bucket每次要把每个bucketpagingIdentifiers都放入到querypagingIdentifiers,否则对应的bucket还是查的第一页的数据。