Python 常用工具类
Python 常用工具类
猿神出窍 发表于11个月前
Python 常用工具类
  • 发表于 11个月前
  • 阅读 101
  • 收藏 0
  • 点赞 0
  • 评论 0

【腾讯云】买域名送云解析+SSL证书+建站!>>>   

def _create_secret(length=10):
    """
    获取由10位随机大小写字母、数字组成的值
    :param length:
    :return:
    """
    key = ''
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
    len_chars = len(chars) - 1
    for i in xrange(length):
        # 每次从chars中随机取一位
        key += chars[random.randint(0, len_chars)]
    return key


def _create_percent():
    """
    创建赢家百分比
    :return:
    """
    return random.uniform(1, 100)


def _get(url, params=None, timeout=5, headers=_get_headers()):
    """
    Get请求
    :param url:
    :param params:
    :param timeout:
    :param headers:
    :return:
    """
    try:
        response = requests.get(url=url, params=params, timeout=timeout, headers=headers)
        response.raise_for_status()
        result = response.json()
    except BaseException as e:
        logger.info(e.message)
        client.captureException()
        result = None
    return result


def _post(url, params=None, json=None, timeout=5, headers=_get_headers()):
    """
    Post请求
    :param url:
    :param params:
    :param json:
    :param timeout:
    :param headers:
    :return:
    """
    try:
        response = requests.post(url=url, data=params, json=json, headers=headers, timeout=timeout)
        response.raise_for_status()
        result = response.json()
    except BaseException as e:
        logger.info(e.message)
        client.captureException()
        result = {'status': False, 'message': ''}
    return result

def valid_signature(params, timeout=settings.SIGNATURE_TIMEOUT):
    """
    验签
    :param params: 
    :param timeout: 
    :return: 
    """
    if not params or not isinstance(params, dict):
        return False

    signature = params.pop('signature', '')
    app_id = params.get('app_id', '')
    if not signature or not app_id:
        return False

    timestamp = params.get('timestamp', '')
    current_timestamp = int(time.time())
    if current_timestamp - int(timestamp) > timeout:
        return False

    website = Website.obj.get_by_appid(app_id)
    if not website:
        return False

    sign = generate_signature(website.secret_key, params)
    if sign != signature:
        return False

    return True


def generate_signature(secret_key, params):
    """
    生成签名
    :param secret_key: 
    :param params: 
    :return: 
    """
    params['secret_key'] = secret_key
    keys = sorted(params.keys())
    tmp_str = ''
    for key in keys:
        tmp_str = '{tmp_str}{key}={value}'.format(tmp_str=tmp_str, key=key, value=unicode(params.get(key)))
    md5_str = _md5(tmp_str.encode('utf-8'))
    params.pop('secret_key')
    return md5_str


def _md5(str):
    """
    加密
    :param str:
    :return:
    """
    try:
        my_md5 = hashlib.md5()
        my_md5.update(str)
        md5_str = my_md5.hexdigest()
    except BaseException as e:
        return None
    return md5_str.upper()


def createVocabList(dataSet):
    """
    将二维数组转换为一维数组
    :param dataSet:
    :return:
    """
    vocabSet = set([])
    for document in dataSet:
        vocabSet = vocabSet | set(document)
    return list(vocabSet)

def get_user_ticket(room_id, bet_id, bet_amount):
    """
    计算中奖票号
    :param room_id:
    :param bet_id:
    :param bet_amount:
    :return:
    """
    bet_amount = float(bet_amount)
    user_list = RouletteStake.objects.filter(room_id=room_id, bet_id=bet_id).order_by('created')
    amount = 0
    for user in user_list:
        amount += float(user.amount)

    if amount == 0:
        tickets = [1, int(bet_amount * 100)]
    else:
        tickets = [int(amount * 100) + 1, int(amount * 100 + bet_amount * 100)]

    return tickets


def _draw_algor(user_list, percentage, room_id, bet_id):
    """
    抽奖算法
    :param user_list:
    :param percentage:
    :return:
    """
    result = dict()
    amount = 0
    for user in user_list:
        amount += float(user.get('amount'))

    user_ratio = dict()
    for user in user_list:
        user_ratio[user.get('id')] = float(decimal_div(user.get('amount'), amount, 4) * 100)

    # 中奖票号
    ticket = int(math.floor((float(amount) * 100 - 0.0000000001) * (percentage / 100)))

    # 找出中奖者 组装票
    process_list = RouletteStake.objects.filter(room_id=room_id, bet_id=bet_id).values('account_id', 'tickets',
                                                                                       'account__ucenter_id')
    for stake in process_list:
        if stake.get('tickets')[0] <= ticket <= stake.get('tickets')[1]:
            result['win_id'] = stake.get('account_id')
            result['ucenter_id'] = stake.get('account__ucenter_id')
            break
    result['win_ticket'] = ticket
    result['amount'] = amount
    result['user_ratio'] = user_ratio
    return result

# -*- coding:utf-8 -*-
from pyDes import *
from binascii import b2a_hex, a2b_hex

KEY = "_KEY"  # 密钥
IV = "__IV"  # 偏转向量


def encrypt_des(data, key=KEY, iv=IV):
    """
    对称加密
    :param data:
    :param key:
    :param iv:
    :return:
    """
    k = des(key, CBC, iv, pad=None, padmode=PAD_PKCS5)
    d = k.encrypt(data)
    return b2a_hex(d)


def decrypt_des(code, key=KEY, iv=IV):
    """
    解密
    :param code:
    :param key:
    :param iv:
    :return:
    """
    k = des(key, CBC, iv, pad=None, padmode=PAD_PKCS5)
    code = a2b_hex(code)
    return k.decrypt(code)

得到IP:
 

# coding=utf-8
import urllib2
import random
import time
import requests


def dl():
    a1={
    'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64)'}
    o_g=['114.239.3.149:808','61.232.254.39:3128','218.18.232.29:8080']
    a=0
    for a in range(0,3):
        proxies_l = {'http': o_g[a],

             }
        print(proxies_l['http'])

        try:
            req=requests.get('http://httpbin.org/ip',headers=a1,proxies=proxies_l)
            print('finish')
            print (req.text)
        except:
            print('no proxies')
        sleep_time=random.randint(1,3)
        time.sleep(sleep_time)
        print('Wait%ds'%sleep_time)

dl()

django批量操作:

# -*- coding:utf-8 -*-
from __future__ import unicode_literals


def insert_many(objects, using="default"):
    """
    Insert list of Django objects in one SQL query.
    Objects must be of the same Django model.
    Note that save is not called and signals on the model are not raised.
    Mostly from: http://people.iola.dk/olau/python/bulkops.py
    """
    if not objects:
        return

    import django.db.models
    from django.db import connections
    from django.db import transaction
    con = connections[using]

    model = objects[0].__class__
    fields = [f for f in model._meta.fields
              if not isinstance(f, django.db.models.AutoField)]
    parameters = []
    for o in objects:
        params = tuple(f.get_db_prep_save(f.pre_save(o, True), connection=con)
                       for f in fields)
        parameters.append(params)

    table = model._meta.db_table
    column_names = ",".join(con.ops.quote_name(f.column) for f in fields)
    placeholders = ",".join(("%s",) * len(fields))
    con.cursor().executemany("insert into %s (%s) values (%s)"
                             % (table, column_names, placeholders), parameters)
    transaction.commit()


def update_many(objects, fields=None, using="default"):
    """Update list of Django objects in one SQL query, optionally only
    overwrite the given fields (as names, e.g. fields=["foo"]).
    Objects must be of the same Django model. Note that save is not
    called and signals on the model are not raised.

    Mostly from: http://people.iola.dk/olau/python/bulkops.py
    """
    if not objects:
        return

    import django.db.models
    from django.db import connections
    from django.db import transaction
    con = connections[using]

    if fields is None:
        fields = []
    names = fields
    meta = objects[0]._meta

    fields = [f for f in meta.fields
              if not isinstance(f, django.db.models.AutoField)
              and (not names or f.name in names)
              and (not (hasattr(f, 'auto_now_add') and f.auto_now_add))]

    if not fields:
        raise ValueError("No fields to update, field names are %s." % names)

    fields_with_pk = fields + [meta.pk]
    parameters = []
    for o in objects:
        parameters.append(tuple(f.get_db_prep_save(f.pre_save(o, True),
                                                   connection=con) for f in fields_with_pk))

    table = meta.db_table
    assignments = ",".join(("%s=%%s" % con.ops.quote_name(f.column))
                           for f in fields)

    con.cursor().executemany("update %s set %s where %s=%%s"
                             % (table, assignments,
                                con.ops.quote_name(meta.pk.column)),
                             parameters)

    transaction.commit()

读取文件:

def read_file(fpath): 
   BLOCK_SIZE = 1024 
   with open(fpath, 'rb') as f: 
       while True: 
           block = f.read(BLOCK_SIZE) 
           if block: 
               yield block 
           else: 
               return

 

标签: Python
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 15
博文 156
码字总数 55877
×
猿神出窍
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: