Java项目,做数据统计的技术方案

项目中一般都会有数据统计模块,比如说,按天、月、年统计订单、会员的数量、金额等。对于这类需求,下面我将简单介绍一下我的技术方案,抛砖引玉。

一、如果项目小,数据量不大,就直接写 SQL 语句去实时统计;
二、数据库做集群,主从复制,在从库上做数据统计;
三、建一个数据汇总表,把统计数据写入这个表,然后统计报表从这个表去查询,这样性能就好很多了;
四、把数据同步到 ElasticSearch 这类分布式搜索和分析引擎,然后用 ElasticSearch 去做统计。数据同步,可以采用阿里巴巴的 Canal 工具,也可以在发生业务的时候推送到 ElasticSearch。

关于方案三,数据写入汇总表,然后统计数据从汇总表去查询,这个可能是最常用的方案。那么,怎么把数据写入汇总表呢?这里介绍 2 种办法:
1、发生业务的时候,把数据写入汇总表。但这样的不好之处是:旧的数据没有统计到;耦合度高,容易 bug。至于性能,则可以考虑异步处理。
2、写个 Python 这类脚本来定时统计。

我喜欢的是用 Python 来做定时统计。Python 很适合用来做这类工作,代码量绝对比 Java 少多了,这就是动态语言的厉害之处。Java 这类语言,适合团队协作,多人开发,而 Python 这类语言很适合做一些小工具,更适合敏捷开发。代码例子如下:

# coding: utf-8
# !/usr/bin/python3
import argparse
import sys
import time

sys.path.append('..')
import datetime
import pymysql
import os
# 引入 logging 模块
import logging
# 注意安装命令是:pip3 install python-dateutil
from dateutil.relativedelta import relativedelta


logging.basicConfig(level=logging.INFO,  # 控制台打印的日志级别
                    filename=os.path.join(os.getcwd(), 'data_count' + datetime.date.today().strftime("%Y-%m-%d") + '.txt'),
                    filemode='a',  ## 模式,有 w 和 a,w 就是写模式,每次都会重新写日志,覆盖之前的日志
                    # a 是追加模式,默认如果不写的话,就是追加模式
                    format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
                    # 日志格式
                    )
# active 参数,正式环境 prod,测试环境 test 用法:data_count.py --active=test
parser = argparse.ArgumentParser(description='命令行参数测试')
parser.add_argument('--active', type=str, default='')
args = parser.parse_args()
active = args.active

"""这个程序的作用是:数据统计"""
class dataCount(object):
    def __init__(self):
        if active == 'prod':
            self.conn_default = pymysql.connect(host='count-db.mysql.polardb.rds.aliyuncs.com',
                                                user='user', passwd='passwd',
                                                db='count_db',
                                                port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
                                                connect_timeout=7200)
        else:
            self.conn_default = pymysql.connect(host='192.168.0.8',
                                                user='user', passwd='passwd',
                                                db='test_db',
                                                port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
                                                connect_timeout=7200)

        self.cursor_default = self.conn_default.cursor()

    # 检查日序列是否存在
    def exist_day_index(self, start_day: str):
        day_index = start_day.replace('-', '')
        sql = f"""select count(*) as count from statistics t where t.day_index = {day_index} and t.del_flag = 0"""
        self.cursor_default.execute(sql)
        row = self.cursor_default.fetchone()
        if not row or row.get('count') == 0:
            sql = f"""
            insert into statistics(day_index, contract_num, contract_amount)
        values({day_index}, 0, 0)
            """
            self.cursor_default.execute(sql)


    def get_start_day(self):
        now = datetime.datetime.now()
        return now.strftime("%Y-%m-%d")

    # 根据身份证号码获取性别
    def get_gender(self, id_card: str):
        # 男:0 女:1
        num = int(id_card[16:17])
        if num % 2 == 0:
            return 1
        else:
            return 0

    # 统计合同总数
    def count_contracts(self, start_day: str):
        start_time = start_day + '00:00:00'
        end_time = start_day + '23:59:59'
        day_index = start_day.replace('-', '')
        sql = f"""select count(*) as count from contract where create_date >='{start_time}'and create_date <='{end_time}'and del_flag = 0"""
        self.cursor_default.execute(sql)
        row = self.cursor_default.fetchone()
        num = row.get('count')
        if num and num > 0:
            sql = f"""
                            UPDATE statistics t
                        SET t.contract_num = {num}
                        WHERE t.day_index = {day_index} and t.del_flag = 0
                                """
            self.cursor_default.execute(sql)

    # 统计合同金额
    def count_contract_amount(self, start_day: str):
        start_time = start_day + '00:00:00'
        end_time = start_day + '23:59:59'
        day_index = start_day.replace('-', '')
        sql = f"""select sum(money) as money from contract where create_date >='{start_time}'and create_date <='{end_time}'and del_flag = 0"""
        self.cursor_default.execute(sql)
        row = self.cursor_default.fetchone()
        money = row.get('money')
        if money and money > 0:
            sql = f"""
                            UPDATE statistics t
                        SET t.contract_amount = {money}
                        WHERE t.day_index = {day_index} and t.del_flag = 0
                                """
            self.cursor_default.execute(sql)

    # 更新性别
    def update_gender(self):
        sql = """
        select sys_user.id, sys_user.gender, info_user.id_card from sys_user INNER JOIN info_user ON info_user.user_id = sys_user.id
            WHERE info_user.id_card is not null;
        """
        self.cursor_default.execute(sql)
        rows = self.cursor_default.fetchall()
        for row in rows:
            id = row.get('id')
            id_card = row.get('id_card')
            gender_database = row.get('gender')
            # 计算性别
            gender = self.get_gender(id_card)
            if gender_database != gender:
                logging.info(f"身份证号码为 {id_card} 的会员性别有误")
                sql = f"""update sys_user set gender  = {gender} where id = {id} and gender != {gender}"""
                self.cursor_default.execute(sql)
                # 延迟 1 秒
                time.sleep(1)


    # 循环日期去统计
    def loop_date_count(self, days: int):
        begin = datetime.datetime.now()
        for i in range(0, days):
            day = begin + datetime.timedelta(days=-i)
            start_day = day.strftime("%Y-%m-%d")
            # 更新错误的性别
            self.update_gender()
            self.exist_day_index(start_day)
            # 统计合同总数
            self.count_contracts(start_day)
            # 统计合同金额
            self.count_contract_amount(start_day)


if __name__ == '__main__':
    data_count = dataCount()
    # 任务相关统计
    data_count.loop_date_count(3)
    # start_day = data_count.get_start_day()
    # 数据库提交
    data_count.conn_default.commit()
    # 游标关闭
    data_count.cursor_default.close()
    # 连接关闭
    data_count.conn_default.close()
    logging.info("已经完成了")
    sys.exit(1)

怎么样?很简单吧。

Linux 操作系统一般都自带 Python,新的系统都普遍是 Python3 了。直接在服务器上做个定时任务,每天执行,就完事了。当然,也可以 Java 去执行 shell 命令的方式去运行 Python。办法有很多。

正文完
 0
评论(没有评论)