文档章节

我的第一个go程序(四)

huangcx
 huangcx
发布于 2016/01/31 10:02
字数 1383
阅读 6
收藏 0

就快要过年了,祝大家在新的一年里有新的收获!

下面以一个发送短信的模块为例子,注:具体发送短信是通过调用第三方接口完成的,我们只需要将短信成功发送到接口即可,可根据接口实际情况查询或返回短信发送情况(发送成功与否)。

每个模块相互独立,都有自己的配置信息,因此设置常量:

const (
    moduleCode  = "sms.send"   // 模块标识,固定值
    configCode  = "sms.zt.api" // 相关参数配置标识,固定值
    maxExecuted = 5            // 限制失败次数
)

设置系统中必要的变量

var (
    dsnBasic   string            // Basic 数据库dsn
    dsnMsg     string            // Msg 数据库dsn
    dbBasic    *sql.DB           // 数据库对象
    dbMsg      *sql.DB           // 数据库对象
    taskModule *basic.TaskModule // 模块结构
    taskConfig *basic.Config     // 业务配置结构
    stopped    = false           // 停止标志
    stopMutex  *sync.RWMutex     // 停止标志锁
    limit      = 50              // 每次获取任务数量
    sleepTime  = 1               // 间隔N秒获取一次
    wokers     = runtime.NumCPU()
)

var (
    apiGateway   string // 接口URL
    apiUsername  string // 接口用户名
    apiPassword  string // 接口密码
    apiProductId string // 接口产品ID
)

模块要设置、检测停止状态,增加一个获取以及设置停止标志的函数:

// 获取是否停止标志
func isStop() bool {
    //这里只是读取值,加个锁反而觉得是多余的
    stopMutex.RLock()
    defer stopMutex.RUnlock()
    return stopped
}
// 设置停止标志
func stop() {
    stopMutex.Lock()
    defer stopMutex.Unlock()
    stopped = true
}

模块初始化时设置相应的变量值:

// 初始化
func init() {
    var err error
    // 加载配置文件
    c, _ := config.Read("app.cfg", "# ", "=", false, false)
    // 读取数据库配置
    dsnBasic, _ = c.String("database", "dsnBasic")
    dsnMsg, _ = c.String("database", "dsnMsg")
    // 两个数据库操作对象
    if dbMsg, err = sql.Open("mysql", dsnMsg); err != nil {
        logger.Error().Panicf("数据库[%s]异常:%s\n", dsnMsg, err)
    }
    // 设置最大连接数
    dbMsg.SetMaxOpenConns(wokers)
    dbMsg.SetMaxIdleConns(wokers - 1)

    if dbBasic, err = sql.Open("mysql", dsnBasic); err != nil {
        logger.Error().Panicf("数据库[%s]异常:%s\n", dsnBasic, err)
    }
    // 获取模块信息
    if taskModule, err = basic.GetTaskModule(dbBasic, moduleCode); err != nil {
        logger.Error().Panicf("获取模块[%s]异常:%s\n", moduleCode, err)
    }
    // 获取业务相关配置
    if taskConfig, err = basic.GetConfig(dbBasic, configCode); err != nil {
        logger.Error().Panicf("获取配置[%s]异常:%s\n", configCode, err)
    }

    // 获取接口配置
    var configData map[string]interface{}
    if err := json.Unmarshal([]byte(taskConfig.JsonData), &configData); err != nil {
        logger.Error().Panicf("获取配置节点[%s:JsonData]异常:%s\n", configCode, err)
    }
    apiGateway = configData["gateway"].(string)
    apiUsername = configData["username"].(string)
    apiPassword = configData["password"].(string)
    apiProductId = configData["product_id"].(string)

    // 锁对象
    stopMutex = new(sync.RWMutex)
}

为这个模块取个名字:sms,并实现Tasker接口

// 发送短信结构,实现 task.Tasker接口
type sms struct {
    done  chan struct{}
    tasks chan *task.Task
}

// 短信内容结构
type smsContent struct {
	id       string         // 内容ID
	content  string         // 短信内容
	sendTime string         // 待发送时间
	extCode  sql.NullString // 扩展编号
}

这个结构里的done 通道,是检测到需要停止执行的时候,往done通道发送信号,外部接收到后,表示模块正常结束,tasks 是需要执行的任务通道,可通过多线程执行并行获取通道里的任务执行。

smsContent 则保存了短信的具体内容,略。

sms 要实现Tasker接口,则需要实现相应的 Start 与 Stop 方法

// 外部调用Start方法开始执行
func (tasker *sms) Start() {
    go tasker.started()
    // 开启多个工作线程
    // 需要注意以及解决的问题
    // tasks通道未设置缓冲区,是通过阻塞避免重复提取相同数据,但最后一条或N(线程数)条数据还是有重复的可能:
    // 如:最后一条数据的状态在数据库中还未更新,线程又从数据库批量获取任务时,会导致重复获取
    for i := 0; i < wokers; i++ {
        go tasker.processTask(i)
    }
}
// 外部调用Stop方法停止执行
// 等待所有工作线程结束,并关闭数据库连接
func (tasker *sms) Stop(done chan<- task.Done) {
    stop()
    // 等待所有工作线程结束
    for i := 0; i < wokers; i++ {
        <-tasker.done
    }
    // 关闭数据库连接
    dbBasic.Close()
    dbMsg.Close()
    // 发送模块完成信号
    done <- task.Done{ModuleCode: taskModule.Code, ModuleName: taskModule.Name}
}
// 无限循环方法
func (tasker *sms) started() {
    // 开启一个无限循环
    logger.Debug().Printf("模块[%s]开始运行", moduleCode)
    for {
        if isStop() {
            break
        }
        // 获取任务
        tasks, err := task.GetTasks(dbMsg, taskModule, limit, maxExecuted)
        //tasks, err := getTasksTest(dbMsg, taskModule, limit)
        if err != nil {
            logger.Error().Printf("获取模块[%s]任务错误:%s\n", taskModule.Code, err)
        }
        for _, t := range tasks {
            if isStop() {
                break
            }
            tasker.tasks <- t
        }
        // 休眠一段时间
        time.Sleep(time.Duration(sleepTime) * time.Second)
    }
    logger.Debug().Printf("模块[%s]结束运行\n", moduleCode)
    close(tasker.tasks)
}

//实际业务处理,参数 i没有任何意义,测试用,可去掉
func (tasker *sms) processTask(i int) {
    for tk := range tasker.tasks { //进入阻塞
       // 这里执行具体的业务,
       // 可根据实际情况自行设置规则
       _, err := sendToServer(tk)

        if err != nil {
            logger.Error().Printf("执行任务失败[taskid=%s]:%s\n", tk.Id, err.Error())
            // 再次发送
            //执行N次失败后,不再执行
            if tk.ExecCount < maxExecuted {
                nextExecTime := getNextExecTime(int(tk.ExecCount))
                _, err = tk.AddChild(dbMsg, nextExecTime.Format("2006-01-02 15:04:05"), false)
                if err != nil {
                    logger.Error().Printf("增加子任务失败[taskid=%s]:%s\n", tk.Id, err.Error())
                }
            }
        }

        //增加到历史记录并删除
        _, err = tk.AddToHistory(dbMsg)
        if err != nil {
            logger.Error().Printf("增加任务到历史记录失败[taskid=%s]:%s\n", tk.Id, err.Error())
        }
    }
    // 往结束通道发送消息
    tasker.done <- struct{}{}
}
func sendToServer(tk *task.Task) (int, error) {
    //发送信息到接口,略
}
//获取延迟发送时间
func getNextExecTime(n int) time.Time {
    switch {
    case n < 3:
        return time.Now().Add(10 * time.Second) // 1~2次失败则延迟10秒钟
    case n < maxExecuted:
        return time.Now().Add(1 * time.Minute) // 3~4次失败则延迟1分钟
    case n >= maxExecuted:
        return time.Now().Add(30 * time.Minute) //
    }
    return time.Now()
}


sms结构 不对外访问,因此我们通过New方法获取

// 初始化一个Tasker对象
func New() task.Tasker {
    tasker := &sms{done: make(chan struct{}), tasks: make(chan *task.Task)}
    return tasker
}

外部调用方式:

tasker := sms.New()
tasker.Start() 开始执行
tasker.Stop() 停止执行


好,代码差不多了,实现sendToServer方法,基本就可以了。

© 著作权归作者所有

共有 人打赏支持
huangcx
粉丝 0
博文 4
码字总数 7225
作品 0
宁波
程序员
android string.xml %问题String sAgeFormat1 = getR...

在android的开发中,经常会遇见一句话,比如“我今年23岁了”;这个23需要在程序中生成,但是遇到一个问题,这完整的一句话是一个TextView中的,而不是三个textView拼接成的,而且是引用的s...

Koon.LY
2012/05/15
0
0
linux学习(一) linux磁盘相关

一 设备命名 装置 装置在 Linux 内癿文件名 IDE 硬盘机 /dev/hd[a-d] SCSI/SATA/USB硬盘机 /dev/sd[a-p] USB 快闪碟 /dev/sd[a-p](与 SATA 相同) 软盘驱劢器 /dev/fd[0-1] 打印机 25 针: /de...

等待救赎
2016/04/18
57
0
ZenTaoPMS新年推出第一个BETA版本

禅道项目管理软件(ZenTaoPMS)是一款国产的,基于LGPL协议,开源免费的项目管理软件,它集产品管理、项目管理、测试管理于一体,同时还包含了事务管理、组织管理等诸多功能,是中小型企业项目...

开源春哥
2010/01/04
426
3
Android adt bundle 开发环境配置及第一个“Hello world”程序运行

最近在学习Android 顺便记录下学习过程当作复习吧,这是写的第一篇正式博客。 一、jdk环境配置 二、android adt bundle 下载 三、安装SDK 四、模拟器及真机调试 五、第一个程序 Hello world!...

程序猿付显
2014/07/23
0
0
android中string.xml中%一$s、%1$d等的用法

1、整型,比如“我今年23岁了”,这个23是整型的。在string.xml中可以这样写,<string name="old">我今年%1$d岁了</string> 在程序中,使用 [java] view plain copy String sAgeFormat = get......

青莲居士
2016/04/10
67
0

没有更多内容

加载失败,请刷新页面

加载更多

利用碎片化时间Get Linux系统

起初,我做着一份与IT毫无关系的工作,每月领着可怜的工资,一直想改变现状,但无从下手,也就是大家熟知的迷茫。我相信,每一个人都会或多或少的经历过迷茫,迷茫每一个选择,迷茫工作或者生...

linuxprobe16
今天
5
0
OSChina 周日乱弹 —— 恨不得给你买张飞机挂票

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @开源中国首席灵魂师:分享张希/曹方的单曲《认真地老去》 来不及认真的年轻过,就认真的老去! 《认真地老去》- 张希/曹方 手机党少年们想听...

小小编辑
今天
281
6
如何实现靠谱的分布式锁?

分布式锁,是用来控制分布式系统中互斥访问共享资源的一种手段,从而避免并行导致的结果不可控。基本的实现原理和单进程锁是一致的,通过一个共享标识来确定唯一性,对共享标识进行修改时能够...

郑加威
今天
3
0
Mac OS X下Maven的安装与配置

Mac OS X 安装Maven: 下载 Maven, 并解压到某个目录。例如/Users/robbie/apache-maven-3.3.3 打开Terminal,输入以下命令,设置Maven classpath $ vi ~/.bash_profile 添加下列两行代码,之后...

TonyStarkSir
今天
5
0
关于编程,你的练习是不是有效的?

最近由于工作及Solution项目的影响,我在重新学习DDD和领域建模的一些知识。然后,我突然就想到了这个问题,以及我是怎么做的? 对于我来说,提升技能的项目会有四种: 纯兴趣驱动的项目。即...

问题终结者
今天
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部