文档章节

我的第一个go程序(四)

huangcx
 huangcx
发布于 2016/01/31 10:02
字数 1383
阅读 6
收藏 0

就快要过年了,祝大家在新的一年里有新的收获!

下面以一个发送短信的模块为例子,注:具体发送短信是通过调用第三方接口完成的,我们只需要将短信成功发送到接口即可,可根据接口实际情况查询或返回短信发送情况(发送成功与否)。

每个模块相互独立,都有自己的配置信息,因此设置常量:

const (
    moduleCode  = "sms.send"   // 模块标识,固定值
    configCode  = "sms.zt.api" // 相关参数配置标识,固定值
    maxExecuted = 5            // 限制失败次数
)

设置系统中必要的变量

var (
    dsnBasic   string            // Basic 数据库dsn
    dsnMsg     string            // Msg 数据库dsn
    dbBasic    *sql.DB           // 数据库对象
    dbMsg      *sql.DB           // 数据库对象
    taskModule *basic.TaskModule // 模块结构
    taskConfig *basic.Config     // 业务配置结构
    stopped    = false           // 停止标志
    stopMutex  *sync.RWMutex     // 停止标志锁
    limit      = 50              // 每次获取任务数量
    sleepTime  = 1               // 间隔N秒获取一次
    wokers     = runtime.NumCPU()
)

var (
    apiGateway   string // 接口URL
    apiUsername  string // 接口用户名
    apiPassword  string // 接口密码
    apiProductId string // 接口产品ID
)

模块要设置、检测停止状态,增加一个获取以及设置停止标志的函数:

// 获取是否停止标志
func isStop() bool {
    //这里只是读取值,加个锁反而觉得是多余的
    stopMutex.RLock()
    defer stopMutex.RUnlock()
    return stopped
}
// 设置停止标志
func stop() {
    stopMutex.Lock()
    defer stopMutex.Unlock()
    stopped = true
}

模块初始化时设置相应的变量值:

// 初始化
func init() {
    var err error
    // 加载配置文件
    c, _ := config.Read("app.cfg", "# ", "=", false, false)
    // 读取数据库配置
    dsnBasic, _ = c.String("database", "dsnBasic")
    dsnMsg, _ = c.String("database", "dsnMsg")
    // 两个数据库操作对象
    if dbMsg, err = sql.Open("mysql", dsnMsg); err != nil {
        logger.Error().Panicf("数据库[%s]异常:%s\n", dsnMsg, err)
    }
    // 设置最大连接数
    dbMsg.SetMaxOpenConns(wokers)
    dbMsg.SetMaxIdleConns(wokers - 1)

    if dbBasic, err = sql.Open("mysql", dsnBasic); err != nil {
        logger.Error().Panicf("数据库[%s]异常:%s\n", dsnBasic, err)
    }
    // 获取模块信息
    if taskModule, err = basic.GetTaskModule(dbBasic, moduleCode); err != nil {
        logger.Error().Panicf("获取模块[%s]异常:%s\n", moduleCode, err)
    }
    // 获取业务相关配置
    if taskConfig, err = basic.GetConfig(dbBasic, configCode); err != nil {
        logger.Error().Panicf("获取配置[%s]异常:%s\n", configCode, err)
    }

    // 获取接口配置
    var configData map[string]interface{}
    if err := json.Unmarshal([]byte(taskConfig.JsonData), &configData); err != nil {
        logger.Error().Panicf("获取配置节点[%s:JsonData]异常:%s\n", configCode, err)
    }
    apiGateway = configData["gateway"].(string)
    apiUsername = configData["username"].(string)
    apiPassword = configData["password"].(string)
    apiProductId = configData["product_id"].(string)

    // 锁对象
    stopMutex = new(sync.RWMutex)
}

为这个模块取个名字:sms,并实现Tasker接口

// 发送短信结构,实现 task.Tasker接口
type sms struct {
    done  chan struct{}
    tasks chan *task.Task
}

// 短信内容结构
type smsContent struct {
	id       string         // 内容ID
	content  string         // 短信内容
	sendTime string         // 待发送时间
	extCode  sql.NullString // 扩展编号
}

这个结构里的done 通道,是检测到需要停止执行的时候,往done通道发送信号,外部接收到后,表示模块正常结束,tasks 是需要执行的任务通道,可通过多线程执行并行获取通道里的任务执行。

smsContent 则保存了短信的具体内容,略。

sms 要实现Tasker接口,则需要实现相应的 Start 与 Stop 方法

// 外部调用Start方法开始执行
func (tasker *sms) Start() {
    go tasker.started()
    // 开启多个工作线程
    // 需要注意以及解决的问题
    // tasks通道未设置缓冲区,是通过阻塞避免重复提取相同数据,但最后一条或N(线程数)条数据还是有重复的可能:
    // 如:最后一条数据的状态在数据库中还未更新,线程又从数据库批量获取任务时,会导致重复获取
    for i := 0; i < wokers; i++ {
        go tasker.processTask(i)
    }
}
// 外部调用Stop方法停止执行
// 等待所有工作线程结束,并关闭数据库连接
func (tasker *sms) Stop(done chan<- task.Done) {
    stop()
    // 等待所有工作线程结束
    for i := 0; i < wokers; i++ {
        <-tasker.done
    }
    // 关闭数据库连接
    dbBasic.Close()
    dbMsg.Close()
    // 发送模块完成信号
    done <- task.Done{ModuleCode: taskModule.Code, ModuleName: taskModule.Name}
}
// 无限循环方法
func (tasker *sms) started() {
    // 开启一个无限循环
    logger.Debug().Printf("模块[%s]开始运行", moduleCode)
    for {
        if isStop() {
            break
        }
        // 获取任务
        tasks, err := task.GetTasks(dbMsg, taskModule, limit, maxExecuted)
        //tasks, err := getTasksTest(dbMsg, taskModule, limit)
        if err != nil {
            logger.Error().Printf("获取模块[%s]任务错误:%s\n", taskModule.Code, err)
        }
        for _, t := range tasks {
            if isStop() {
                break
            }
            tasker.tasks <- t
        }
        // 休眠一段时间
        time.Sleep(time.Duration(sleepTime) * time.Second)
    }
    logger.Debug().Printf("模块[%s]结束运行\n", moduleCode)
    close(tasker.tasks)
}

//实际业务处理,参数 i没有任何意义,测试用,可去掉
func (tasker *sms) processTask(i int) {
    for tk := range tasker.tasks { //进入阻塞
       // 这里执行具体的业务,
       // 可根据实际情况自行设置规则
       _, err := sendToServer(tk)

        if err != nil {
            logger.Error().Printf("执行任务失败[taskid=%s]:%s\n", tk.Id, err.Error())
            // 再次发送
            //执行N次失败后,不再执行
            if tk.ExecCount < maxExecuted {
                nextExecTime := getNextExecTime(int(tk.ExecCount))
                _, err = tk.AddChild(dbMsg, nextExecTime.Format("2006-01-02 15:04:05"), false)
                if err != nil {
                    logger.Error().Printf("增加子任务失败[taskid=%s]:%s\n", tk.Id, err.Error())
                }
            }
        }

        //增加到历史记录并删除
        _, err = tk.AddToHistory(dbMsg)
        if err != nil {
            logger.Error().Printf("增加任务到历史记录失败[taskid=%s]:%s\n", tk.Id, err.Error())
        }
    }
    // 往结束通道发送消息
    tasker.done <- struct{}{}
}
func sendToServer(tk *task.Task) (int, error) {
    //发送信息到接口,略
}
//获取延迟发送时间
func getNextExecTime(n int) time.Time {
    switch {
    case n < 3:
        return time.Now().Add(10 * time.Second) // 1~2次失败则延迟10秒钟
    case n < maxExecuted:
        return time.Now().Add(1 * time.Minute) // 3~4次失败则延迟1分钟
    case n >= maxExecuted:
        return time.Now().Add(30 * time.Minute) //
    }
    return time.Now()
}


sms结构 不对外访问,因此我们通过New方法获取

// 初始化一个Tasker对象
func New() task.Tasker {
    tasker := &sms{done: make(chan struct{}), tasks: make(chan *task.Task)}
    return tasker
}

外部调用方式:

tasker := sms.New()
tasker.Start() 开始执行
tasker.Stop() 停止执行


好,代码差不多了,实现sendToServer方法,基本就可以了。

© 著作权归作者所有

共有 人打赏支持
huangcx
粉丝 0
博文 4
码字总数 7225
作品 0
宁波
程序员
私信 提问
android string.xml %问题String sAgeFormat1 = getR...

在android的开发中,经常会遇见一句话,比如“我今年23岁了”;这个23需要在程序中生成,但是遇到一个问题,这完整的一句话是一个TextView中的,而不是三个textView拼接成的,而且是引用的s...

Koon.LY
2012/05/15
0
0
linux学习(一) linux磁盘相关

一 设备命名 装置 装置在 Linux 内癿文件名 IDE 硬盘机 /dev/hd[a-d] SCSI/SATA/USB硬盘机 /dev/sd[a-p] USB 快闪碟 /dev/sd[a-p](与 SATA 相同) 软盘驱劢器 /dev/fd[0-1] 打印机 25 针: /de...

等待救赎
2016/04/18
57
0
ZenTaoPMS新年推出第一个BETA版本

禅道项目管理软件(ZenTaoPMS)是一款国产的,基于LGPL协议,开源免费的项目管理软件,它集产品管理、项目管理、测试管理于一体,同时还包含了事务管理、组织管理等诸多功能,是中小型企业项目...

开源春哥
2010/01/04
426
3
Android adt bundle 开发环境配置及第一个“Hello world”程序运行

最近在学习Android 顺便记录下学习过程当作复习吧,这是写的第一篇正式博客。 一、jdk环境配置 二、android adt bundle 下载 三、安装SDK 四、模拟器及真机调试 五、第一个程序 Hello world!...

程序猿付显
2014/07/23
0
0
android中string.xml中%一$s、%1$d等的用法

1、整型,比如“我今年23岁了”,这个23是整型的。在string.xml中可以这样写,<string name="old">我今年%1$d岁了</string> 在程序中,使用 [java] view plain copy String sAgeFormat = get......

青莲居士
2016/04/10
67
0

没有更多内容

加载失败,请刷新页面

加载更多

Go 使用channel控制并发

前言 channel一般用于协程之间的通信,channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下channel也可轻易实现。 场景示例 总结 ...

恋恋美食
22分钟前
1
0
Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

摘要: 实际问题 我们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,如果计算永远不会结束的话,那么计算结果何时输出呢?本篇将介绍Apache Fl...

阿里云官方博客
25分钟前
3
0
斐波那契堆的理解,节点mark属性和势函数

斐波那契堆 看了好多博客,都是照搬算法导论的内容,没有自己的理解,比如为什么有mark属性,势函数的作用,以及为什么叫斐波那契堆,下面说说鄙人的理解。 势函数 势函数是根节点个数加上2...

杨喆
27分钟前
2
0
NIO源码详解

阻塞io和无阻塞io: 阻塞io是指jdk1.4之前版本面向流的io,服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,如果没有则会一直等待或者遭到拒 ...

沉稳2018
31分钟前
0
0
如何把已经提交的commit, 从一个分支放到另一个分支

在本地master提交了一个commit(8d85d4bca680a5dbcc3e5cfb3096d18cd510cc9f),如何提交的test_2分之上? git checkout test_2git cherry-pick 8d85d4bca680a5dbcc3e5cfb3096d18cd510cc9f......

stephen_wu
35分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部