TB级NFS数据平滑迁移方案设计与实现

2019/06/01 17:00
阅读数 174

平台原来挂载的/mnt/ccdbfs不变,由于里面有很多软链接,因此如果迁移后把平台代码里面的存储路径改了的话,万一有没改的地方就不好处理了。

故先设计双写,保证在复制老数据的时候, /mnt/ccdbfs 和/mnt/mfs同时写入新数据,保证数据一致性。等全部复制完后,观察没有diff了,再切换挂载,让 /mnt/ccdbfs 挂载到新搭建的 moosefs 主服务器。

inotify文件系统监控机制。 rsync是linux系统下的数据镜像备份工具。使用快速增量备份工具Sync可以远程同步,支持本地复制,或者与其他SSH、rsync主机同步。

有用户组和权限双重问题,后面临时修改系统cp命令,完全切换玩后又切回去。

下文前两种方案,是最开始调研的网上大多数人使用的方案修改出来的,但是实测后发现,对超大目录的同步非常无力,光inotify就要扫很久,并且扫完后,对新文件的检测也会出现问题。故最后是用最后一种方案,修改了老nfs服务的日志级别,从中捞取文件变化,引入延迟同步+diff无则同步策略。

下文出现ccdbfs的地方代表老nfs服务,mfs代表新nfs服务。

inotify+rsync

对于文件里量不大的目录,复制起来快,且代码量少。

但是对超大目录,扫码起来无力。且事件感知会出现重复。

copy_ccdbfs_update_to_mfs.sh

# rsync auto sync script with inotify
# 2019/03/21

# configs
source_path=/mnt/ccdbfs/
target_path=/mnt/mfs/
rsync_bin=/usr/bin/rsync
INOTIFY_EXCLUDE='aladata.*'


cd $(dirname $0)

RSYNC_EXCLUDE=$(pwd)/rsync_exclude.lst

current_date=$(date +%Y-%m-%d_%H%M%S)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)

log_file=$(pwd)/runcopy.log.${current_date}
inotifywait_bin=$(pwd)/inotifywait

inotify_fun(){
    ${inotifywait_bin} -mrq --timefmt '%Y/%m/%d-%H:%M:%S' --format '%T %w %f' \
     --exclude ${INOTIFY_EXCLUDE} -e modify,create,move,attrib ${source_path} \
    | while read file
        do
            ${rsync_bin} -auvrtzopgP --exclude-from=${RSYNC_EXCLUDE} --progress --bwlimit=800000 ${source_path} ${target_path}
        done
}

#inotify log
inotify_fun &> ${log_file} &

inotify+自定义复制脚本

inotify启一个光监控文件新增修改事件的脚本,导入到change_file.list。

自定义复制脚本去读取并且逻辑控制后复制。 inotify_ccdbfs_change.sh

# find ccdbfs change with inotify, export to file
# 2019/03/21

# configs
source_path=/mnt/ccdbfs/
INOTIFY_EXCLUDE='.*(\.swx|\.swp)$|aladata.*'

cd $(dirname $0)
current_date=$(date +%Y-%m-%d_%H%M%S)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)
inotifywait_bin=$(pwd)/inotifywait

${inotifywait_bin} -mrq --timefmt '%Y/%m/%d-%H:%M:%S' --format '%w%f %e %T' \
    --exclude ${INOTIFY_EXCLUDE} -e create,close_write ${source_path} \
| while read line
    do
        echo ${line} >> $(pwd)/change_file.list
    done &

change_file.list

/mnt/mfs/dirtest4 CREATE,ISDIR 2019/03/21-22:30:06
/mnt/mfs/dirtest4/5 CREATE,ISDIR 2019/03/21-22:30:06
/mnt/mfs/a.test CLOSE_WRITE,CLOSE 2019/03/21-22:30:17
/mnt/mfs/c.test CLOSE_WRITE,CLOSE 2019/03/21-22:30:24
/mnt/mfs/aa CREATE 2019/03/21-22:30:45
/mnt/mfs/aa CLOSE_WRITE,CLOSE 2019/03/21-22:30:45

inotify_ccdbfs_change_cp.py

# -*- coding: utf-8 -*-
# 持续读取inotify_ccdbfs_change.sh监控到的变化文件change_file.list,从ccdbfs同步到mfs
# 2019/03/22
import os
import time


SOURCE_ROOT = '/mnt/ccdbfs'
TARGET_ROOT = '/mnt/mfs'

def deal_ori_change_file_list():
    '''获取change_file.list文件得到变化文件list
       顺便追加change_file.list到change_file.bak
       然后清空change_file.list
    '''
    change_file_path = os.getcwd() + '/change_file.list'
    change_file_back_path = os.getcwd() + '/change_file.bak'
    with open(change_file_path) as f:
        row_list = f.read().splitlines()
    # 追加备份
    os.system('echo "" >> ' + change_file_back_path)  # 相当于换行
    os.system('cat ' + change_file_path + ' >> ' + change_file_back_path)

    # 清空原change_file.list
    os.system('cat /dev/null > ' + change_file_path)

    return row_list


def uniq_row_list(row_list):
    '''对获取到的原始row_list拆分和保序去重
       返回 dir_list, file_list
    '''
    dir_list = []
    file_list = []

    for row in row_list:
        try:
            obj, effect, _ = row.split(' ')
            if 'ISDIR' in effect:
                if obj not in dir_list:
                    dir_list.append(obj)
            else:
                if obj not in file_list:
                    file_list.append(obj)
        except:
            print(str(time.time()) + 'uniq_row_list except')

    return dir_list, file_list


def copy_change(dir_list, file_list):
    '''在mfs中复制ccdbfs中的变化
       先复制创建目录,再复制
    '''
    # 优先在mfs中创建没有的目录,再去复制文件
    for a_dir in dir_list:
        dst = a_dir.replace(SOURCE_ROOT, TARGET_ROOT)
        if os.path.exists(dst) == False:
            os.system('mkdir -p ' + dst)
    
    for a_file in file_list:
        dst_file = a_file.replace(SOURCE_ROOT, TARGET_ROOT)
        # 防止目标地址目录没创建
        (file_path,file_name) = os.path.split(dst_file)
        if os.path.exists(file_path) == False:
            os.system('mkdir -p ' + file_path)
        # 复制文件或者软链接
        os.system('cp -d ' + a_file + ' ' + dst_file)


if __name__ == '__main__':
    last_time = 0
    while True:
        change_file_path = os.getcwd() + '/change_file.list'
        change_file_back_path = os.getcwd() + '/change_file.bak'
        # 获取change_file.list文件得到变化文件list
        with open(change_file_path) as f:
            row_list = f.read().splitlines()

        # 执行备份条件是 变化记录满100条 或 间隔5s且有变化记录
        # 所以当没满100条,且没满5s的时候,或者满了5s但是没有变化记录,不进行后面的备份
        if len(row_list) < 100:
            if time.time() - last_time < 5:
                print('row_list < 100 and delta_time < 5')
                time.sleep(1)
                continue
            elif len(row_list) == 0:
                print('delta_time > 5 but row_list == 0')
                time.sleep(1)
                continue

        # 满足条件,开始备份
        print('cp start')
        last_time = time.time()
        # 追加备份历史记录
        os.system('echo "" >> ' + change_file_back_path)  # 相当于换行
        os.system('cat ' + change_file_path + ' >> ' + change_file_back_path)
        # 清空原change_file.list
        os.system('cat /dev/null > ' + change_file_path)

        dir_list, file_list = uniq_row_list(row_list)
        print('dir_list: ', dir_list)
        print('file_list: ', file_list)
        copy_change(dir_list, file_list)
        time.sleep(1)

增量双写,diff复制策略(实操方案)

捞取老nfs日志+自定义双写脚本

cp_ccdbfs_change.py 增量更新同步双写脚本。集群业务机器,有对老ccdbfs写入的都启一个。

对于捞取到的文件变化日志进行双重筛选判断,因:

  • 测试发现,复制没问题,但是清理复制过的记录的时候,要去查询一下文件的修改时间,又产生了一条读的操作,还是一模一样的日志,又会再删除后新添加进全局map去,所以这样的话就是无限循环删不掉。
  • 打算修改方案,在复制后,不把那条记录删除,而是把存的log时间改为0,当轮询线程取出来的记录,时间为0,直接跳过,不去查询修改时间了。
  • 之后如果该文件再次修改,触发了日志后,会看如果时间为0,证明上次复制过了,但是现在又新修改了,那么重新刷成触发日志的新时间。
  • 顺便,如果是目录,时间存成1,也不再处理。
  • 内存应该撑得住,因为存储的map只是文件的信息而不是文件内容,并且,经常有进入目录或者查询修改时间产生的日志记录。
  • 上面这样的话,目录可以排除了,但是对于文件,还是会因为查询修改时间,产生日志记录,并且刷新log_time。所以还应该设置一个检查过的map记录(值为上一次复制或检查完成的时间),两个结合判断。
# -*- coding: utf-8 -*-
# 持续读取从ccdbfs日志捞取出来的变更文件,并同步到mfs对应路径下
# 2019/03/26
import os
import time
import sys
import subprocess
import threading

CCDBFS_LOG_PATH = '/home/work/nfs_client/log/Client.log'
SOURCE_ROOT = '/mnt/ccdbfs'
TARGET_ROOT = '/mnt/mfs'
CCDBFS_SAVE_ROOT = '/disk'
CCDBFS_SEARCH_TOOLS_ROOT = '/home/work/copy_ccdbfs_to_mfs/output'

EXCLUDE_LIST = ['/mnt/ccdbfs/aladata']

DELAY_DEAL_TIME = 120

MAP_DICT = {}  # 存储有变更的文件信息
INODE_PATH_DICT = {}  # 提供存储和查询inode对应路径用
FINISHED_DICT = {}  # 存储复制过的文件信息,值记录上次检查完成时间,值为1表示不再做检查

def myprint(*args):
    ltime = time.localtime(time.time())
    now_time = time.strftime("%Y-%m-%d %H:%M:%S, ", ltime)        
    print(now_time, args)


def copy_change(obj_path):
    '''在mfs中复制ccdbfs中的变化
    '''
    try:
        target_path = obj_path.replace(SOURCE_ROOT, TARGET_ROOT)
        # 先在mfs目标路径下判断路径是否存在,没有则创建
        (file_path,file_name) = os.path.split(target_path)
        if os.path.exists(file_path) == False:
            os.system('mkdir -p ' + file_path)

        # 复制文件或者软链接
        # -p 修改时间和访问权限也复制到新文件
        # -d 复制时保留链接
        # -u 源文件修改时间更新才复制
        os.system("""sh -c 'nohup cp -pdu "%s" "%s" &'""" % (obj_path, target_path))
    except:
        myprint('cp change error: ' + obj_path)

def extract_dir_objname(log_buff):
    '''从日志语句中提取出父目录以及存储对象名
    '''
    try:
        tmplist = log_buff.split(' ')
        log_time = '2019-' + tmplist[1] + ' ' + tmplist[2]
        log_time_stamp = time.mktime(time.strptime(log_time, "%Y-%m-%d %H:%M:%S:"))
        # 去掉 parentDirInode:0xa0000000101b9c32, 的多余字符
        dir_inode = tmplist[8].replace('parentDirInode:', '').replace(',', '')
        # 只取出存储对象名字
        objname = tmplist[9].replace('\n','').split(':')[1]
        return log_time_stamp, dir_inode, objname
    except:
        myprint('extract_dir_objname error, ' + log_buff)
        return None, None, None


def tail_log_add_map():
    '''
    Desc: 持续捞取文件变更日志,并将满足条件的变更存入全局MAP_DICT
    '''
    # 持续捞取长这样的日志,必须包含 ]Lookup, 有逗号
    # DEBUG: 03-26 11:45:56:  Client * 15176 [ccdb:NFSClient.cpp:625:15176]Lookup, parentDirInode:0xa0000001198bee40, dentryName:filelinktest
    p = subprocess.Popen('tail -F ' + CCDBFS_LOG_PATH, shell=True, stdout=subprocess.PIPE)
    while True:
        buff = p.stdout.readline()
        # 克服直接在Popen中grep有问题的办法
        if ']Lookup,' not in buff:
            continue
        if buff == '' and p.poll() != None:
            break
        
        log_time_stamp, dir_inode, objname = extract_dir_objname(buff)
        if (log_time_stamp == None) or (dir_inode == None) or (objname == None):
            continue

        # 若不存在,添加之
        map_value = MAP_DICT.get(dir_inode + '+' + objname)
        if  map_value == None:
            MAP_DICT[dir_inode + '+' + objname] = log_time_stamp
        else:
            last_finished_time = FINISHED_DICT.get(dir_inode + '+' + objname)
            # 之前复制完成log_time变成了0,再次有修改才变换这里的log_time
            # 上次检测或者复制完成10s内,视为读操作,不修改log_time
            if last_finished_time == None:
                # FINISHED_DICT无记录,表示要去检测
                last_finished_time = 0
            if (map_value == 0) and (time.time() - last_finished_time > 10):
                    MAP_DICT[dir_inode + '+' + objname] = log_time_stamp


def check_map_dict():
    '''
    Desc: 轮询遍历MAP_DICT将满足条件的文件进行复制或删除记录
    '''
    global MAP_DICT
    while True:
        for k, log_time in MAP_DICT.items():
            # myprint(k, log_time)
            # log_time为0表示上一次复制或检查完成,log_time为1表示是目录永远不再检查,不用操作
            # 在复制文件前会检查,如果目录不存在会创建
            if log_time == 0 or log_time == 1:
                continue
            dir_inode, objname = k.split('+')
            # 先查全局INODE_PATH_DICT中存了路径没,没有再去查ccdbfs客户端
            dir_path = INODE_PATH_DICT.get(dir_inode)
            if dir_path == None:
                try:
                    op = os.popen("cd %s && echo 'lookup %s' | ./bin/Cli -x" % (CCDBFS_SEARCH_TOOLS_ROOT, dir_inode))
                    ret = op.read()
                    # ccdbfs客户端存的路径是/disk开头,挂载路径是/mnt/ccdbfs
                    dir_path = ret.split(' ')[5].replace(CCDBFS_SAVE_ROOT, SOURCE_ROOT)
                    INODE_PATH_DICT[dir_inode] = dir_path
                    myprint('search ccdbfs client:%s->%s' % (dir_inode, dir_path))
                except:
                    myprint('search dir path error: ' + k)
                    # 若获取不到路径,跳过这次复制
                    MAP_DICT[k] = 0
                    FINISHED_DICT[k] = time.time()
                    continue

            # 拼接文件路径
            obj_path = dir_path + objname

            # 若obj_path本身是目录,不用处理
            if os.path.isdir(obj_path) == True:
                # 修改log_time为1,当做标记用
                MAP_DICT[k] = 1
                continue

            # 排除一些不需要监控的目录
            exclude_flag = False
            for exclude in EXCLUDE_LIST:
                if exclude in dir_path:
                    exclude_flag = True
                    break
            if exclude_flag == True:
                MAP_DICT[k] = 1  # 故意置1,永不检查
                continue

            # 若是临时文件,不存在了,跳过这次复制
            if os.path.exists(obj_path) == False:
                myprint(obj_path + ' is inexistent')
                MAP_DICT[k] = 0
                FINISHED_DICT[k] = time.time()
                continue

            # 获取最新修改时间
            try:
                modify_time = os.stat(obj_path).st_mtime
            except:
                myprint('os.stat().st_mtime error: ' + obj_path)
                continue
            now_time = time.time()
            # -5秒的判断,是防止极短时间写入后,log日志时间稍大于修改时间。
            if now_time - modify_time > DELAY_DEAL_TIME and modify_time - log_time >= -5:
                # 复制同步,并修改键值表示复制过
                myprint('cp and set MAP_DICT[k]=0, FINISHED_DICT=now_time, ' + obj_path)
                copy_change(obj_path)
                MAP_DICT[k] = 0
                FINISHED_DICT[k] = now_time
            else:
                # 也不能让MAP_DICT野蛮增长,对于读操作(日志时间大于修改时间5s以上)的log,要清除掉
                if now_time - modify_time > DELAY_DEAL_TIME and log_time - modify_time > 5:
                    # 修改键值表示检查过
                    myprint('no change, set MAP_DICT[k]=0, FINISHED_DICT=now_time, ' + obj_path)
                    MAP_DICT[k] = 0
                    FINISHED_DICT[k] = now_time
            # time.sleep(2)
        time.sleep(60)

if __name__ == '__main__':
    t1 = threading.Thread(target = tail_log_add_map)
    t2 = threading.Thread(target = check_map_dict)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

copy_inexistent_file.py 同步剩余全量文件,若无则同策略。另起一台机器连接新老nfs客户端即可启动之。

# -*- coding: utf-8 -*-
# 平台nfs未被inotify检测到的文件迁移
# 2019/03/18
import os
import time

DIRS_LIST = []  # 记录老文件夹下不断遍历添加进来的目录名,检测完后抛出,当队列使用

# SOURCE_ROOT,TARGET_ROOT提供程序中替换目录路径用
# 为防止其他文件也用到ccdbfs字眼,所以全部执行完后
# 手动修改新mfs目录里ccdbfs软链接,而不再程序中特殊控制
SOURCE_ROOT = '/mnt/ccdbfs'
TARGET_ROOT = '/mnt/mfs'
# SOURCE_ROOT = '/mnt/srcdir'
# TARGET_ROOT = '/mnt/tardir'

EXCLUDE_LIST = ['/mnt/ccdbfs/aladata']
# EXCLUDE_LIST = ['/mnt/srcdir/aladata']

def myprint(*args):
    ltime = time.localtime(time.time())
    now_time = time.strftime("%Y-%m-%d %H:%M:%S, ", ltime)        
    print(now_time, args)


def dir_info(file_dir):
    for root, dirs, files in os.walk(file_dir):
        # myprint('root_dir:', root)  # 当前目录路径
        # myprint('sub_dirs:', dirs)  # 当前路径下所有子目录
        # myprint('files:', files)  # 当前路径下所有非目录子文件
        # 补全目录全路径
        full_path_dirs = [root + '/' + i for i in dirs]
        full_path_dirs_res = []

        # 过滤某些不拷贝的目录
        for i in full_path_dirs:
            exclude_flat = False
            for exclude in EXCLUDE_LIST:
                if exclude in i:
                    exclude_flat = True
                    break
            if exclude_flat == False:
                full_path_dirs_res.append(i)

        # 补全文件全路径
        full_path_files = [root + '/' + i for i in files]
        return full_path_dirs_res, full_path_files


def doing_copy_file(dir_path):
    '''传入原ccdbfs目录下的某文件夹路径,复制其中文件
       并将其中文件夹推入全局DIRS_LIST
    '''
    dirs, files = dir_info(dir_path)
    # 发现目录,推入DIRS_LIST
    global DIRS_LIST
    DIRS_LIST += dirs

    for src_file in files:
        # mfs下属目录中不存在该文件,才复制
        dst_file = src_file.replace(SOURCE_ROOT, TARGET_ROOT)
        if os.path.exists(dst_file) == False:
            try:
                # 防止目标地址目录没创建
                (file_path,file_name) = os.path.split(dst_file)
                if os.path.exists(file_path) == False:
                    os.system('mkdir -p ' + file_path)
                # 正式复制该文件
                myprint('cp -pdu "%s" "%s"' % (src_file, dst_file))
                os.system('cp -pdu "%s" "%s"' % (src_file, dst_file))
            except:
                myprint('copy file except: ' + dst_file)


def make_soft_link(soft_file):
    '''将ccdbfs中的软链接复制到对应mfs目录相同层级下
    '''
    try:
        dst_file = soft_file.replace(SOURCE_ROOT, TARGET_ROOT)
        if os.path.exists(dst_file) == True:
            # 存在则不用重复复制
            myprint(dst_file + 'already exists')
            return
        # 防止目标地址目录没创建
        (file_path,file_name) = os.path.split(dst_file)
        if os.path.exists(file_path) == False:
            os.system('mkdir -p ' + file_path)
        # 正式复制该软链接
        myprint('cp -pdu "%s" "%s"' % (soft_file, dst_file))
        os.system('cp -pdu "%s" "%s"' % (soft_file, dst_file))
    except:
        myprint('make soft link except, old:' + soft_file)


if __name__ == '__main__':
    # 先把源头根目录下的文件复制,以及其文件夹推入DIRS_LIST
    doing_copy_file(SOURCE_ROOT)

    # 一层一层目录遍历完,DIRS_LIST清空程序才结束
    while len(DIRS_LIST) > 0:
        current = DIRS_LIST.pop(0)
        # myprint('now doing->', current)
        if os.path.islink(current) == True:
            # 若是软链接,不进入目录,只获取软链接信息并在新mfs里创建同名软链接
            make_soft_link(current)
        else:
            # 在mfs下属文件夹中该目录不存在则创建
            dst = current.replace(SOURCE_ROOT, TARGET_ROOT)
            if os.path.exists(dst) == False:
                os.system('mkdir -p ' + dst)
            doing_copy_file(current)
           
    print('done!')
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部