实现上面接口,后端Django框架,数据存储使用的是ES和Mysql,横轴聚合周期是小时、天、月、年。
interval='day' //聚合周期:小时、天、周、月、年
es_search = do_query(query_sql, agg_index=True)
format_dict = {
"hour": "yyyy-MM-dd HH:mm:ss",
"day": "yyyy-MM-dd",
"week": "Y年第w周",
"month": "yyyy-MM",
"year": "yyyy",
}
bounds_format_dict = {
"hour": "%Y-%m-%d %H:%M:%S",
"day": "%Y-%m-%d",
"week": "%Y年第{}周",
"month": "%Y-%m",
"year": "%Y",
}
now_date = time.strftime(
bounds_format_dict.get(interval).format(time.strftime("%W")),
time.localtime(),
)
agg = EsAggBuilder(es_search).bucket_date_histogram( //见下面封装
interval,
"create_time", //时间字段,毫秒时间戳
format=format_dict.get(interval),
time_zone="Asia/Shanghai",
min_doc_count=0,
extended_bounds={
"max": "{}".format(now_date)
if not interval == "week"
else int(round(time.time() * 1000))
}
)
agg.metric_cardinality("md5") //去重计数, 见下面封装
result = extract_aggregations_result(agg, True)
此处涉及ES的操作做了封装
class EsAggBuilder:
def __init__(self, search_or_bucket, is_bucket=False, root_builder=None):
if root_builder is not None:
self.search = None
self.aggs = search_or_bucket
self.root_builder = root_builder
elif is_bucket:
self.search = None
self.aggs = search_or_bucket
self.root_builder = None
else:
self.search = search_or_bucket.extra(size=0)
self.aggs = self.search.aggs
self.root_builder = None
def bucket_date_histogram(self, period, field="ctime", bucket_name="timeline", format="yyyy-MM-dd", **kwargs):
return self.__class__(self.aggs.bucket(
bucket_name, "date_histogram", field=field, calendar_interval=period, format=format, **kwargs),
True, self.root_builder or self)
def metric_cardinality(self, field, metric_name="cardinality", **kwargs):
self.aggs.metric(metric_name, "cardinality", field=field, **kwargs)
return self
def extract_aggregations_result(agg, need_count=False):
result = agg.execute().to_dict()
return [] if 'aggregations' not in result else
loads_buckets_result(result.get('aggregations', {}), need_count)
结果:
Django中并没有上面ES那么强大,不足的是ES能对缺失的数据补全,比如:按天分割,数据中没有8.19的数据,ES可以进行补全,而Mysql就不能。
result = list(
MessageModel.objects.filter(
query_sql //过滤条件
).extra(
select=select, order_by=["key_as_string"]
).values("key_as_string").annotate(
doc_count=Count("id") //聚合后求和
)
)
结果发现,中间数据没有补全,且不连贯:
上网搜了好多,都是从SQL层面来做的,并且不友好,只能从结果上做填充了
def get_date_list(self, interval, start=None, end=None):
start = datetime.fromtimestamp(int(start) / 1000)
end = datetime.fromtimestamp(int(end) / 1000)
data = []
for d in self.gen_dates(start, (end - start).days + 1, interval):
data.append({
"key_as_string": d.strftime("%Y-%m-%d" if interval == "day" else "%Y-%m-%d %H:%M:%S"),
"doc_count": 0
})
return data
def gen_dates(self, b_date, days, interval):
day_count = timedelta(days=1)
if interval == "hour":
day_count = timedelta(hours=1)
days *= 24
for i in range(days):
yield b_date + day_count * i
补全缺失:
date_list = self.get_date_list(interval=interval, start=start_timestamp, end=end_timestamp)
new_result = {}
for d in chain(date_list, result):
key_as_string = d['key_as_string']
if key_as_string in new_result:
new_result[key_as_string].update(d)
else:
new_result[key_as_string] = dict(d)
大功告成:
写得比较仓促,大家有疑问可以私信我哦
页面更新:2024-05-16
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号