Django中使用Mysql、ES实现Timeline

目的

Django中使用Mysql、ES实现Timeline

Django中使用Mysql、ES实现Timeline

实现上面接口,后端Django框架,数据存储使用的是ES和Mysql,横轴聚合周期是小时、天、月、年。

ES实现timeline

interval='day' //聚合周期:小时、天、周、月、年
es_search = do_query(query_sql, agg_index=True)

format_dict = {
  "hour": "yyyy-MM-dd HH:mm:ss",
  "day": "yyyy-MM-dd",
  "week": "Y年第w周",
  "month": "yyyy-MM",
  "year": "yyyy",
}
bounds_format_dict = {
  "hour": "%Y-%m-%d %H:%M:%S",
  "day": "%Y-%m-%d",
  "week": "%Y年第{}周",
  "month": "%Y-%m",
  "year": "%Y",
}
now_date = time.strftime(
  bounds_format_dict.get(interval).format(time.strftime("%W")),
  time.localtime(),
)
agg = EsAggBuilder(es_search).bucket_date_histogram( //见下面封装
  interval,
  "create_time", //时间字段,毫秒时间戳
  format=format_dict.get(interval),
  time_zone="Asia/Shanghai",
  min_doc_count=0,
  extended_bounds={
    "max": "{}".format(now_date)
    if not interval == "week"
    else int(round(time.time() * 1000))
}
)
agg.metric_cardinality("md5")  //去重计数, 见下面封装
result = extract_aggregations_result(agg, True)

此处涉及ES的操作做了封装

class EsAggBuilder:

    def __init__(self, search_or_bucket, is_bucket=False, root_builder=None):
        if root_builder is not None:
            self.search = None
            self.aggs = search_or_bucket
            self.root_builder = root_builder
        elif is_bucket:
            self.search = None
            self.aggs = search_or_bucket
            self.root_builder = None
        else:
            self.search = search_or_bucket.extra(size=0)
            self.aggs = self.search.aggs
            self.root_builder = None

    def bucket_date_histogram(self, period, field="ctime", bucket_name="timeline", format="yyyy-MM-dd", **kwargs):
        return self.__class__(self.aggs.bucket(
            bucket_name, "date_histogram", field=field, calendar_interval=period, format=format, **kwargs),
            True, self.root_builder or self)

		def metric_cardinality(self, field, metric_name="cardinality", **kwargs):
        self.aggs.metric(metric_name, "cardinality", field=field, **kwargs)
        return self

def extract_aggregations_result(agg, need_count=False):
    result = agg.execute().to_dict()
    return [] if 'aggregations' not in result else 
        loads_buckets_result(result.get('aggregations', {}), need_count)

结果:

Django中使用Mysql、ES实现Timeline

Mysql实现timeline

Django中并没有上面ES那么强大,不足的是ES能对缺失的数据补全,比如:按天分割,数据中没有8.19的数据,ES可以进行补全,而Mysql就不能。

result = list(
  MessageModel.objects.filter(
      query_sql //过滤条件
  ).extra(
    select=select, order_by=["key_as_string"]
  ).values("key_as_string").annotate(
      doc_count=Count("id") //聚合后求和
  )
)

结果发现,中间数据没有补全,且不连贯:

Django中使用Mysql、ES实现Timeline

上网搜了好多,都是从SQL层面来做的,并且不友好,只能从结果上做填充了

def get_date_list(self, interval, start=None, end=None):
    start = datetime.fromtimestamp(int(start) / 1000)
    end = datetime.fromtimestamp(int(end) / 1000)
    data = []
    for d in self.gen_dates(start, (end - start).days + 1, interval):
        data.append({
            "key_as_string": d.strftime("%Y-%m-%d" if interval == "day" else "%Y-%m-%d %H:%M:%S"),
            "doc_count": 0
        })
    return data

def gen_dates(self, b_date, days, interval):
    day_count = timedelta(days=1)
    if interval == "hour":
        day_count = timedelta(hours=1)
        days *= 24

    for i in range(days):
        yield b_date + day_count * i

补全缺失:

date_list = self.get_date_list(interval=interval, start=start_timestamp, end=end_timestamp)
new_result = {}
for d in chain(date_list, result):
    key_as_string = d['key_as_string']
    if key_as_string in new_result:
        new_result[key_as_string].update(d)
    else:
        new_result[key_as_string] = dict(d)

大功告成:

Django中使用Mysql、ES实现Timeline

总结

写得比较仓促,大家有疑问可以私信我哦

展开阅读全文

页面更新:2024-05-16

标签:目的   大功告成   天分   字段   私信   仓促   缺失   层面   周期   框架   接口   强大   小时   时间   数据   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top