项目中一般都会有数据统计模块,比如说,按天、月、年统计订单、会员的数量、金额等。对于这类需求,下面我将简单介绍一下我的技术方案,抛砖引玉。
一、如果项目小,数据量不大,就直接写 SQL 语句去实时统计;
二、数据库做集群,主从复制,在从库上做数据统计;
三、建一个数据汇总表,把统计数据写入这个表,然后统计报表从这个表去查询,这样性能就好很多了;
四、把数据同步到 ElasticSearch 这类分布式搜索和分析引擎,然后用 ElasticSearch 去做统计。数据同步,可以采用阿里巴巴的 Canal 工具,也可以在发生业务的时候推送到 ElasticSearch。
关于方案三,数据写入汇总表,然后统计数据从汇总表去查询,这个可能是最常用的方案。那么,怎么把数据写入汇总表呢?这里介绍 2 种办法:
1、发生业务的时候,把数据写入汇总表。但这样的不好之处是:旧的数据没有统计到;耦合度高,容易 bug。至于性能,则可以考虑异步处理。
2、写个 Python 这类脚本来定时统计。
我喜欢的是用 Python 来做定时统计。Python 很适合用来做这类工作,代码量绝对比 Java 少多了,这就是动态语言的厉害之处。Java 这类语言,适合团队协作,多人开发,而 Python 这类语言很适合做一些小工具,更适合敏捷开发。代码例子如下:
# coding: utf-8
# !/usr/bin/python3
import argparse
import sys
import time
sys.path.append('..')
import datetime
import pymysql
import os
# 引入 logging 模块
import logging
# 注意安装命令是:pip3 install python-dateutil
from dateutil.relativedelta import relativedelta
logging.basicConfig(level=logging.INFO, # 控制台打印的日志级别
filename=os.path.join(os.getcwd(), 'data_count' + datetime.date.today().strftime("%Y-%m-%d") + '.txt'),
filemode='a', ## 模式,有 w 和 a,w 就是写模式,每次都会重新写日志,覆盖之前的日志
# a 是追加模式,默认如果不写的话,就是追加模式
format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
# 日志格式
)
# active 参数,正式环境 prod,测试环境 test 用法:data_count.py --active=test
parser = argparse.ArgumentParser(description='命令行参数测试')
parser.add_argument('--active', type=str, default='')
args = parser.parse_args()
active = args.active
"""这个程序的作用是:数据统计"""
class dataCount(object):
def __init__(self):
if active == 'prod':
self.conn_default = pymysql.connect(host='count-db.mysql.polardb.rds.aliyuncs.com',
user='user', passwd='passwd',
db='count_db',
port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
connect_timeout=7200)
else:
self.conn_default = pymysql.connect(host='192.168.0.8',
user='user', passwd='passwd',
db='test_db',
port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
connect_timeout=7200)
self.cursor_default = self.conn_default.cursor()
# 检查日序列是否存在
def exist_day_index(self, start_day: str):
day_index = start_day.replace('-', '')
sql = f"""select count(*) as count from statistics t where t.day_index = {day_index} and t.del_flag = 0"""
self.cursor_default.execute(sql)
row = self.cursor_default.fetchone()
if not row or row.get('count') == 0:
sql = f"""
insert into statistics(day_index, contract_num, contract_amount)
values({day_index}, 0, 0)
"""
self.cursor_default.execute(sql)
def get_start_day(self):
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d")
# 根据身份证号码获取性别
def get_gender(self, id_card: str):
# 男:0 女:1
num = int(id_card[16:17])
if num % 2 == 0:
return 1
else:
return 0
# 统计合同总数
def count_contracts(self, start_day: str):
start_time = start_day + '00:00:00'
end_time = start_day + '23:59:59'
day_index = start_day.replace('-', '')
sql = f"""select count(*) as count from contract where create_date >='{start_time}'and create_date <='{end_time}'and del_flag = 0"""
self.cursor_default.execute(sql)
row = self.cursor_default.fetchone()
num = row.get('count')
if num and num > 0:
sql = f"""
UPDATE statistics t
SET t.contract_num = {num}
WHERE t.day_index = {day_index} and t.del_flag = 0
"""
self.cursor_default.execute(sql)
# 统计合同金额
def count_contract_amount(self, start_day: str):
start_time = start_day + '00:00:00'
end_time = start_day + '23:59:59'
day_index = start_day.replace('-', '')
sql = f"""select sum(money) as money from contract where create_date >='{start_time}'and create_date <='{end_time}'and del_flag = 0"""
self.cursor_default.execute(sql)
row = self.cursor_default.fetchone()
money = row.get('money')
if money and money > 0:
sql = f"""
UPDATE statistics t
SET t.contract_amount = {money}
WHERE t.day_index = {day_index} and t.del_flag = 0
"""
self.cursor_default.execute(sql)
# 更新性别
def update_gender(self):
sql = """
select sys_user.id, sys_user.gender, info_user.id_card from sys_user INNER JOIN info_user ON info_user.user_id = sys_user.id
WHERE info_user.id_card is not null;
"""
self.cursor_default.execute(sql)
rows = self.cursor_default.fetchall()
for row in rows:
id = row.get('id')
id_card = row.get('id_card')
gender_database = row.get('gender')
# 计算性别
gender = self.get_gender(id_card)
if gender_database != gender:
logging.info(f"身份证号码为 {id_card} 的会员性别有误")
sql = f"""update sys_user set gender = {gender} where id = {id} and gender != {gender}"""
self.cursor_default.execute(sql)
# 延迟 1 秒
time.sleep(1)
# 循环日期去统计
def loop_date_count(self, days: int):
begin = datetime.datetime.now()
for i in range(0, days):
day = begin + datetime.timedelta(days=-i)
start_day = day.strftime("%Y-%m-%d")
# 更新错误的性别
self.update_gender()
self.exist_day_index(start_day)
# 统计合同总数
self.count_contracts(start_day)
# 统计合同金额
self.count_contract_amount(start_day)
if __name__ == '__main__':
data_count = dataCount()
# 任务相关统计
data_count.loop_date_count(3)
# start_day = data_count.get_start_day()
# 数据库提交
data_count.conn_default.commit()
# 游标关闭
data_count.cursor_default.close()
# 连接关闭
data_count.conn_default.close()
logging.info("已经完成了")
sys.exit(1)
怎么样?很简单吧。
Linux 操作系统一般都自带 Python,新的系统都普遍是 Python3 了。直接在服务器上做个定时任务,每天执行,就完事了。当然,也可以 Java 去执行 shell 命令的方式去运行 Python。办法有很多。