我的第一个go程序(四)
博客专区 > huangcx 的博客 > 博客详情
我的第一个go程序(四)
huangcx 发表于2年前
我的第一个go程序(四)
  • 发表于 2年前
  • 阅读 5
  • 收藏 0
  • 点赞 1
  • 评论 0

【腾讯云】如何购买服务器最划算?>>>   

就快要过年了,祝大家在新的一年里有新的收获!

下面以一个发送短信的模块为例子,注:具体发送短信是通过调用第三方接口完成的,我们只需要将短信成功发送到接口即可,可根据接口实际情况查询或返回短信发送情况(发送成功与否)。

每个模块相互独立,都有自己的配置信息,因此设置常量:

const (
    moduleCode  = "sms.send"   // 模块标识,固定值
    configCode  = "sms.zt.api" // 相关参数配置标识,固定值
    maxExecuted = 5            // 限制失败次数
)

设置系统中必要的变量

var (
    dsnBasic   string            // Basic 数据库dsn
    dsnMsg     string            // Msg 数据库dsn
    dbBasic    *sql.DB           // 数据库对象
    dbMsg      *sql.DB           // 数据库对象
    taskModule *basic.TaskModule // 模块结构
    taskConfig *basic.Config     // 业务配置结构
    stopped    = false           // 停止标志
    stopMutex  *sync.RWMutex     // 停止标志锁
    limit      = 50              // 每次获取任务数量
    sleepTime  = 1               // 间隔N秒获取一次
    wokers     = runtime.NumCPU()
)

var (
    apiGateway   string // 接口URL
    apiUsername  string // 接口用户名
    apiPassword  string // 接口密码
    apiProductId string // 接口产品ID
)

模块要设置、检测停止状态,增加一个获取以及设置停止标志的函数:

// 获取是否停止标志
func isStop() bool {
    //这里只是读取值,加个锁反而觉得是多余的
    stopMutex.RLock()
    defer stopMutex.RUnlock()
    return stopped
}
// 设置停止标志
func stop() {
    stopMutex.Lock()
    defer stopMutex.Unlock()
    stopped = true
}

模块初始化时设置相应的变量值:

// 初始化
func init() {
    var err error
    // 加载配置文件
    c, _ := config.Read("app.cfg", "# ", "=", false, false)
    // 读取数据库配置
    dsnBasic, _ = c.String("database", "dsnBasic")
    dsnMsg, _ = c.String("database", "dsnMsg")
    // 两个数据库操作对象
    if dbMsg, err = sql.Open("mysql", dsnMsg); err != nil {
        logger.Error().Panicf("数据库[%s]异常:%s\n", dsnMsg, err)
    }
    // 设置最大连接数
    dbMsg.SetMaxOpenConns(wokers)
    dbMsg.SetMaxIdleConns(wokers - 1)

    if dbBasic, err = sql.Open("mysql", dsnBasic); err != nil {
        logger.Error().Panicf("数据库[%s]异常:%s\n", dsnBasic, err)
    }
    // 获取模块信息
    if taskModule, err = basic.GetTaskModule(dbBasic, moduleCode); err != nil {
        logger.Error().Panicf("获取模块[%s]异常:%s\n", moduleCode, err)
    }
    // 获取业务相关配置
    if taskConfig, err = basic.GetConfig(dbBasic, configCode); err != nil {
        logger.Error().Panicf("获取配置[%s]异常:%s\n", configCode, err)
    }

    // 获取接口配置
    var configData map[string]interface{}
    if err := json.Unmarshal([]byte(taskConfig.JsonData), &configData); err != nil {
        logger.Error().Panicf("获取配置节点[%s:JsonData]异常:%s\n", configCode, err)
    }
    apiGateway = configData["gateway"].(string)
    apiUsername = configData["username"].(string)
    apiPassword = configData["password"].(string)
    apiProductId = configData["product_id"].(string)

    // 锁对象
    stopMutex = new(sync.RWMutex)
}

为这个模块取个名字:sms,并实现Tasker接口

// 发送短信结构,实现 task.Tasker接口
type sms struct {
    done  chan struct{}
    tasks chan *task.Task
}

// 短信内容结构
type smsContent struct {
	id       string         // 内容ID
	content  string         // 短信内容
	sendTime string         // 待发送时间
	extCode  sql.NullString // 扩展编号
}

这个结构里的done 通道,是检测到需要停止执行的时候,往done通道发送信号,外部接收到后,表示模块正常结束,tasks 是需要执行的任务通道,可通过多线程执行并行获取通道里的任务执行。

smsContent 则保存了短信的具体内容,略。

sms 要实现Tasker接口,则需要实现相应的 Start 与 Stop 方法

// 外部调用Start方法开始执行
func (tasker *sms) Start() {
    go tasker.started()
    // 开启多个工作线程
    // 需要注意以及解决的问题
    // tasks通道未设置缓冲区,是通过阻塞避免重复提取相同数据,但最后一条或N(线程数)条数据还是有重复的可能:
    // 如:最后一条数据的状态在数据库中还未更新,线程又从数据库批量获取任务时,会导致重复获取
    for i := 0; i < wokers; i++ {
        go tasker.processTask(i)
    }
}
// 外部调用Stop方法停止执行
// 等待所有工作线程结束,并关闭数据库连接
func (tasker *sms) Stop(done chan<- task.Done) {
    stop()
    // 等待所有工作线程结束
    for i := 0; i < wokers; i++ {
        <-tasker.done
    }
    // 关闭数据库连接
    dbBasic.Close()
    dbMsg.Close()
    // 发送模块完成信号
    done <- task.Done{ModuleCode: taskModule.Code, ModuleName: taskModule.Name}
}
// 无限循环方法
func (tasker *sms) started() {
    // 开启一个无限循环
    logger.Debug().Printf("模块[%s]开始运行", moduleCode)
    for {
        if isStop() {
            break
        }
        // 获取任务
        tasks, err := task.GetTasks(dbMsg, taskModule, limit, maxExecuted)
        //tasks, err := getTasksTest(dbMsg, taskModule, limit)
        if err != nil {
            logger.Error().Printf("获取模块[%s]任务错误:%s\n", taskModule.Code, err)
        }
        for _, t := range tasks {
            if isStop() {
                break
            }
            tasker.tasks <- t
        }
        // 休眠一段时间
        time.Sleep(time.Duration(sleepTime) * time.Second)
    }
    logger.Debug().Printf("模块[%s]结束运行\n", moduleCode)
    close(tasker.tasks)
}

//实际业务处理,参数 i没有任何意义,测试用,可去掉
func (tasker *sms) processTask(i int) {
    for tk := range tasker.tasks { //进入阻塞
       // 这里执行具体的业务,
       // 可根据实际情况自行设置规则
       _, err := sendToServer(tk)

        if err != nil {
            logger.Error().Printf("执行任务失败[taskid=%s]:%s\n", tk.Id, err.Error())
            // 再次发送
            //执行N次失败后,不再执行
            if tk.ExecCount < maxExecuted {
                nextExecTime := getNextExecTime(int(tk.ExecCount))
                _, err = tk.AddChild(dbMsg, nextExecTime.Format("2006-01-02 15:04:05"), false)
                if err != nil {
                    logger.Error().Printf("增加子任务失败[taskid=%s]:%s\n", tk.Id, err.Error())
                }
            }
        }

        //增加到历史记录并删除
        _, err = tk.AddToHistory(dbMsg)
        if err != nil {
            logger.Error().Printf("增加任务到历史记录失败[taskid=%s]:%s\n", tk.Id, err.Error())
        }
    }
    // 往结束通道发送消息
    tasker.done <- struct{}{}
}
func sendToServer(tk *task.Task) (int, error) {
    //发送信息到接口,略
}
//获取延迟发送时间
func getNextExecTime(n int) time.Time {
    switch {
    case n < 3:
        return time.Now().Add(10 * time.Second) // 1~2次失败则延迟10秒钟
    case n < maxExecuted:
        return time.Now().Add(1 * time.Minute) // 3~4次失败则延迟1分钟
    case n >= maxExecuted:
        return time.Now().Add(30 * time.Minute) //
    }
    return time.Now()
}


sms结构 不对外访问,因此我们通过New方法获取

// 初始化一个Tasker对象
func New() task.Tasker {
    tasker := &sms{done: make(chan struct{}), tasks: make(chan *task.Task)}
    return tasker
}

外部调用方式:

tasker := sms.New()
tasker.Start() 开始执行
tasker.Stop() 停止执行


好,代码差不多了,实现sendToServer方法,基本就可以了。

共有 人打赏支持
粉丝 0
博文 4
码字总数 7225
×
huangcx
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: