python3 多线程 与 mongo亿级消费日志数据 新鲜demo

原创
02/07 15:11
阅读数 61

还在为没有亿级数据应用而苦恼么 还在为没有mongo实际操作er遗憾么 还在为没有 python 多线程操作而惋惜么 现在,新鲜、热乎、刚出炉的 demo 出炉了,

基本完成,但是还有很多需要优化,上代码

# -*- coding: utf-8 -*-
"""
模拟单日 1.2亿 的交易流水
平均每秒的交易数据约为 1389 条
开启 10 个线程,每个线程每秒 139 条
时间改变一秒的概率为 1:200
根据业务量单线程用时较长,所以采用多线程生产数据,
同时也可以测试出多线程出现的常见问题
author -- acclea
date -- 2021-02-07
"""

try:
    from pymongo import MongoClient, ASCENDING, DESCENDING
except: "can not found pymongo lib, make it install"

try:
    import redis
except: "can not found redis lib, make it install"

import random, time, hashlib, json
import threading

"""
connect mongoDB
"""
def conn_mongo():
    conn = MongoClient('127.0.0.1', 27017)
    return conn.mon_test  #连接mydb数据库,没有则自动创建


"""
connect redis
"""
def conn_redis():
    return redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)


"""
create mongo set by set_name
"""
def mongo_create_set(set_name):
    mongo_db = conn_mongo()
    return mongo_db[set_name]  # 使用test_set集合,没有则自动创建


"""
insert data to mongo
"""
def mongo_insert(data = {}):
    if len(data) < 1:return False
    # mongo_set = mongo_create_set(set_name)
    return cost_set.insert_one(data)


"""
redis get
进行 json 逆向处理
"""
def rds_get(key):
    try:
        rds = conn_redis()
        return json_decode(rds.get(key))
    except:
        return False
        # return "this key is error, check it, please"


"""
redis set
所有输入的数据全部转化为 json 处理
"""
def rds_set(key,val,exp=3600):
    try:
        rds = conn_redis()
        rds.set(key, json_encode(val))
        rds.expire(key, exp)
    except: return False


# sha256=>64位
def sha256hex(data):
    sha256 = hashlib.sha256()
    sha256.update(data.encode())
    res = sha256.hexdigest()
    # print("sha256加密结果:", res)
    return res

# json_decode
def json_decode(input_data):
    try:
        if str(type(input_data)).find('object') == -1:
            return json.loads(input_data, encoding="utf-8")
        else:
            f = open(input_data, encoding="utf-8")
            return json.load(f, encoding="utf-8")
    except:
        return False
        # return "this input data is error, can not decode json"


# json_encode
def json_encode(input_data):
    try:
        if str(type(input_data)).find('Object') == -1:
            return json.dumps(input_data)
        else:
            f = open(input_data, encoding="utf-8")
            return json.dump(f)
    except:
        return False
        # return "input data is error, can not encode to json"


"""
generate user name --- 生成随机用户名
"""
def generation_user_name():
    random_upper_letter_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', ]
    random_lower_letter_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
    name = ''.join([random.choice(random_upper_letter_list),''.join(random.sample(random_lower_letter_list, random.randint(2, 10)))])
    print(name)
    return name


"""
generate user id 
用于mongo 系统的唯一 id 不支持 int/bigint/tinyint/smallint 类的 id 自增,所以需要自己实现
1、通过借助第三方来生成唯一自增 id ,如使用 redis 自增,
    注:redis 自增 有一个缺点,只要 redis 没有重启,那个自增值就会一直自增,不利于测试,重启后自增值 从 0 开始
2、通过 自己每次 取最大值 +1 实现
"""
def generation_user_id(set_name):
    redis_user_id_key = "user_id"
    try:
        user_id = int(rds_get(redis_user_id_key))
        # user_id = 0
    except:
        user_id = 0
    # print(user_id)


    if user_id == None or user_id < 1:
        print('user_id 缓存 过期')
        try:
            # query the max user_id in mongo set
            # cost_set = mongo_create_set(set_name)
            # 只输出 user_id 字段,第一个参数为查询条件,空代表查询所有
            # 如果需要输出的字段比较多,不想要某个字段,可以用排除字段的方法  mongo_set.find( {}, {"cost": 0 } )
            # 在MongoDB中使用sort()方法对数据进行排序,sort()方法可以通过参数指定排序的字段,并使用 1 和 -1 来指定排序的方式,其中 1 为升序,-1为降序

            results = cost_set.find({}, {"user_id": 1}).sort('user_id', DESCENDING).limit(1)
            user_id = [result['user_id'] for result in results][0]
            # user_id = 0

            print('=======')
            print(user_id)
        except:
            user_id = 0

        user_id = int(user_id)

    # user_id = rds.incr(user_id)

    if user_id > 80000000:
        user_id = random.randint(1,80000000)
    else:
        user_id += 1

    rds_set(redis_user_id_key, user_id, 60*5)
    print(user_id)
    return user_id


def time_unix(input_time = None, is_full = False):
    """
    时间戳
    :return:
    """
    if input_time == None:
        if is_full:
            return time.time()
        return int(time.time())

    else:
        try:
            time_list = time.strptime(input_time, "%Y-%m-%d %H:%M:%S")
            if is_full:
                return int(time.mktime(time_list))
            return int(time.mktime(time_list))
        except: return "date info is error"


def date_str(format ,t = 0):
    """
    时间戳转时间
    %Y
    %m
    %d
    %H
    %M
    %S
    %z
    %a
    %A
    %b
    %B
    %c
    %I
    %p
    :param format:
    :param t:
    :return:
    """
    if int(t) < 1: t = time.time()
    timeArray = time.localtime(t)
    try:
        return time.strftime(format, timeArray)
    except Exception as e:
        print("转化错误:case%s"%e)
        return False


def total_cost_by_day():
    # todo
    # this day total cost
    pass

def day_cost_for_hour():
    # todo
    # this day per hour total cost
    pass

def top50_cost_user():
    # todo
    # this day user cost top50
    pass


def earliest_cost_user():
    # todo
    # this day earliest cost user, limit 50
    pass

def latest_cost_user():
    # todo
    # this day latest cost user, limit 50
    pass


"""
create user cost info
模拟生成消费数据
"""
def create_user_info():
    # 以当前的日期作为集合的名称
    default_date = date_str("%Y-%m-%d")
    default_time = " ".join([default_date, '00:00:00'])

    # 获取---锁
    user_id_lock.acquire()

    # 时间, 当天的 0:00:00 开始记录,截至时间为 23:59:59, === 由于为模拟数据所以设定时间,实际业务的时间以系统时间为准 ===
    try:
        # user_id = 0

        max_time_results = cost_set.find({}, {"create_time": 1}).sort('create_time', DESCENDING).limit(1)
        base_time_str = [result['create_time'] for result in max_time_results][0]
        print('mongo---time')
        print(base_time_str)

        if base_time_str < 1:
            base_time_str = time_unix(default_time)

    except:
        base_time_str = time_unix(default_time)
    # print(default_time)
    print(base_time_str)

    # 根据当前数据量,时间改变一秒的概率为 1:200
    incre_second_flag = False
    temp_random_num = random.randint(1, 200)
    if temp_random_num == 200:
        incre_second_flag = True

    if incre_second_flag == True:
        if str(type(base_time_str)).find('int') == -1: base_time_str = int(base_time_str)
        base_time_str += 1


    cur_date = date_str("%Y-%m-%d %H:%M:%S", base_time_str)


    user_name = generation_user_name()

    # 根据缓存用户信息判断当前用户是否存在,缓存有效期为当天 23:59:59,实际业务根据数据量决定
    cache_user_info = sha256hex(user_name)

    user_id = rds_get(cache_user_info)
    if user_id == False:
        user_id = generation_user_id(set_name)
        rds_set(cache_user_info, user_id, 24*60*60)

    # 释放---锁
    user_id_lock.release()

    # 消费  2 位小数
    cost = random.uniform(1, 80000)
    cost = round(cost, 2)

    # 写入mongo
    user_cost_info = {"user_name": user_name, "user_id": user_id, "cost": cost, "create_time": base_time_str, "create_at": cur_date, }
    mongo_insert(user_cost_info)

if __name__ == '__main__':
    # 开启线程数
    srd_num = 10

    # 最大测试数据量
    total_data_num = 120000000


    # 创建--线程锁,
    # 注:确保线程锁的有效/唯一,因此只能把线程锁放在线程开启前
    user_id_lock = threading.Lock()

    # pymongo.errors.AutoReconnect: 127.0.0.1:27017: [WinError 10048] 通常每个套接字地址(协议/网络地址/端口)只允许使用一次
    # 出现该错误,是由于把连接数据库的操作也加入了多线程,需要把 连接数据库 独立出来
    set_name = "_".join(["user_cost_log", date_str("%Y%m%d")])
    cost_set = mongo_create_set(set_name)


    for i in range(1, total_data_num, srd_num):
        if srd_num < 1 or srd_num > total_data_num:
            break

        print(range(i, i + srd_num))
        thread_list = []
        try:
            for cost_user in range(i, i + srd_num):
                thr_node = threading.Timer(0, create_user_info)
                thr_node.start()
                thread_list.append(thr_node)

            for thr in thread_list:
                thr.join()
        except:
            print("=====线程无法启动====")

找时间优化

原文地址

展开阅读全文
0
0 收藏
分享
加载中
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部