swoole4.0之打造自己的web开发框架(4)

原创
2018/12/20 13:05
阅读数 263

上一篇:swoole4.0之打造自己的web开发框架(3) 我们介绍了在协程下使用全局变量的问题以及实现了一个Context,用于协程内的上下文传递,这基本算是和FPM最需要注意的地方了,本篇将实现一个完整的CRUD功能

 

1、分层

一般我的习惯是代码分为以下几个层

  1. controller, 此层只负责功能的调度,参数的控制,结果的输出

  2. service,此层负责业务逻辑相关的开发

  3. dao,此层负责与数据的交互

  4. entity, 此层定义为数据表的一个映射

 

2、连接池

在之前的系列文章:swoole4.0 mysql连接池之读写分离有介绍,这里我们直接整合进来就行了。

数据库操作代码:

<?php
//file framework/Family/Db/Mysql.php
namespace Family\Db;

use Family\Core\Log;
use Swoole\Coroutine\MySQL as SwMySql;

class Mysql
{
    /**
     * @var MySQL
     */
    private $master;   //主数据库连接
    private $slave;     //从数据库连接list
    private $config;    //数据库配置
    /**
     * @param $config
     * @return mixed
     * @throws \Exception
     * @desc 连接mysql
     */
    public function connect($config)
    {
        //创建主数据连接
        $master = new SwMySql();
        $res = $master->connect($config['master']);
        if ($res === false) {
            //连接失败,抛弃常
            throw new \Exception($master->connect_error, $master->errno);
        } else {
            //存入master资源
            $this->master = $master;
        }

        if (!empty($config['slave'])) {
            //创建从数据库连接
            foreach ($config['slave'] as $conf) {
                $slave = new MySQL();
                $res = $slave->connect($conf);
                if ($res === false) {
                    //连接失败,抛弃常
                    throw new \Exception($slave->connect_error, $slave->errno);
                } else {
                    //存入slave资源
                    $this->slave[] = $slave;
                }
            }
        }

        $this->config = $config;
        return $res;
    }

    /**
     * @param $type
     * @param $index
     * @return MySQL
     * @desc 单个数据库重连
     * @throws \Exception
     */
    public function reconnect($type, $index)
    {
        //通过type判断是主还是从
        if ('master' == $type) {
            //创建主数据连接
            $master = new SwMySql();
            $res = $master->connect($this->config['master']);
            if ($res === false) {
                //连接失败,抛弃常
                throw new \Exception($master->connect_error, $master->errno);
            } else {
                //更新主库连接
                $this->master = $master;
            }
            return $this->master;
        }

        if (!empty($this->config['slave'])) {
            //创建从数据连接
            $slave = new SwMySql();
            $res = $slave->connect($this->config['slave'][$index]);
            if ($res === false) {
                //连接失败,抛弃常
                throw new \Exception($slave->connect_error, $slave->errno);
            } else {
                //更新对应的重库连接
                $this->slave[$index] = $slave;
            }
            return $slave;
        }
    }

    /**
     * @param $name
     * @param $arguments
     * @return mixed
     * @desc 利用__call,实现操作mysql,并能做断线重连等相关检测
     * @throws \Exception
     */
    public function __call($name, $arguments)
    {
        $sql = $arguments[0];
        $res = $this->chooseDb($sql);
        $db = $res['db'];
//        $result = call_user_func_array([$db, $name], $arguments);
        $result = $db->$name($sql);
        Log::info($sql);
        if (false === $result) {
            Log::warning('mysql query false', [$sql]);
            if (!$db->connected) { //断线重连
                $db = $this->reconnect($res['type'], $res['index']);
                Log::info('mysql reconnect', $res);
                $result = $db->$name($sql);
                return $this->parseResult($result, $db);
            }

            if (!empty($db->errno)) {  //有错误码,则抛出弃常
                throw new \Exception($db->error, $db->errno);
            }
        }
        return $this->parseResult($result, $db);
    }

    /**
     * @param $result
     * @param $db MySQL
     * @return array
     * @desc 格式化返回结果:查询:返回结果集,插入:返回新增id, 更新删除等操作:返回影响行数
     */
    public function parseResult($result, $db)
    {
        if ($result === true) {
            return [
                'affected_rows' => $db->affected_rows,
                'insert_id' => $db->insert_id,
            ];
        }
        return $result;
    }


    /**
     * @param $sql
     * @desc 根据sql语句,选择主还是从
     * @ 判断有select 则选择从库, insert, update, delete等选择主库
     * @return array
     */
    protected function chooseDb($sql)
    {
        if (!empty($this->slave)) {
            //查询语句,随机选择一个从库
            if ('select' == strtolower(substr($sql, 0, 6))) {
                if (1 == count($this->slave)) {
                    $index = 0;
                } else {
                    $index = array_rand($this->slave);
                }
                return [
                    'type' => 'slave',
                    'index' => $index,
                    'db' => $this->slave[$index],

                ];
            }
        }

        return [
            'type' => 'master',
            'index' => 0,
            'db' => $this->master
        ];
    }

}

连接池代码:

<?php
//file framework/Family/Pool/Mysql.php
namespace Family\Pool;

use Family\Db\Mysql as DB;
use chan;

class Mysql
{
    private static $instance;
    private $pool;  //连接池容器,一个channel
    private $config;

    /**
     * @param null $config
     * @return Mysql
     * @desc 获取连接池实例
     * @throws \Exception
     */
    public static function getInstance($config = null)
    {
        if (empty(self::$instance)) {
            if (empty($config)) {
                throw new \Exception("mysql config empty");
            }
            self::$instance = new static($config);
        }

        return self::$instance;
    }

    /**
     * Mysql constructor.
     * @param $config
     * @throws \Exception
     * @desc 初始化,自动创建实例,需要放在workerstart中执行
     */
    public function __construct($config)
    {
        if (empty($this->pool)) {
            $this->config = $config;
            $this->pool = new chan($config['pool_size']);
            for ($i = 0; $i < $config['pool_size']; $i++) {
                $mysql = new DB();
                $res = $mysql->connect($config);
                if ($res == false) {
                    //连接失败,抛弃常
                    throw new \Exception("failed to connect mysql server.");
                } else {
                    //mysql连接存入channel
                    $this->put($mysql);
                }
            }
        }
    }

    /**
     * @param $mysql
     * @desc 放入一个mysql连接入池
     */
    public function put($mysql)
    {
        $this->pool->push($mysql);
    }

    /**
     * @return mixed
     * @desc 获取一个连接,当超时,返回一个异常
     * @throws \Exception
     */
    public function get()
    {
        $mysql = $this->pool->pop($this->config['pool_get_timeout']);
        var_dump($mysql);
        if (false === $mysql) {
            throw new \Exception("get mysql timeout, all mysql connection is used");
        }
        return $mysql;
    }

    /**
     * @return mixed
     * @desc 获取当时连接池可用对象
     */
    public function getLength()
    {
        return $this->pool->length();
    }

}

application/config/default.php 加上数据库配置

'mysql' => [
    'pool_size' => 3,     //连接池大小
    'pool_get_timeout' => 0.5, //当在此时间内未获得到一个连接,会立即返回。(表示所以的连接都已在使用中)
    'master' => [
        'host' => '127.0.0.1',   //数据库ip
        'port' => 3306,          //数据库端口
        'user' => 'root',        //数据库用户名
        'password' => '123456', //数据库密码
        'database' => 'test',   //默认数据库名
        'timeout' => 0.5,       //数据库连接超时时间
        'charset' => 'utf8mb4', //默认字符集
        'strict_type' => true,  //ture,会自动表数字转为int类型
    ],
],

在Family/run方法里,初始化连接池

 

$http->on('workerStart', function (\swoole_http_server $serv, int $worker_id) {
    if (function_exists('opcache_reset')) {
        //清除opcache 缓存,swoole模式下其实可以关闭opcache
        \opcache_reset();
    }
    try {
        $mysqlConfig = Config::get('mysql');
        if (!empty($mysqlConfig)) {
            //配置了mysql, 初始化mysql连接池
            Pool\Mysql::getInstance($mysqlConfig);
        }
    } catch (\Exception $e) {
        //初始化异常,关闭服务
        print_r($e);
        $serv->shutdown();
    } catch (\Throwable $throwable) {
        //初始化异常,关闭服务
        print_r($throwable);
        $serv->shutdown();
    }
});

 

框架层的代码就完成,下面接着看业务逻辑

 

我们创建一个user表,用于做CRUD的操作表

 

3、entity定义

entity是表的映射,所以这个代码简单:

<?php
//file: application/entity/user.phU
namespace entity;


use Family\MVC\Entity;

class User extends Entity
{
    /**
     * 对应的数据库表名
     */
    const TABLE_NAME = 'user';
    /**
     * 主键字段名
     */
    const PK_ID = 'id';

    //以下对应的数据库字段名
    public $id;
    public $name;
    public $password;

}

这里有个Entity基类,主要目的是: mysql查询回来的数据是一个数组,我们能够把数据自动填充到entity里来,代码如下:
 

<?php
//file framework/Family/MVC/Entity.php
namespace Family\MVC;


class Entity
{
    /**
     * Entity constructor.
     * @param array $array
     * @desc 把数组填充到entity
     */
    public function __construct(array $array)
    {
        if (empty($array)) {
            return $this;
        }

        foreach ($array as $key => $value) {
            if (property_exists($this, $key)) {
                $this->$key = $value;
            }
        }
    }
}

 

4、Dao封装

dao层是与数据库的交互,这个我们可以抽离出一个基类,把常见的crud操作封封起来, 代码如下:

<?php
//file framework/Family/MVC/Dao.php
namespace Family\MVC;

use Family\Db\Mysql;
use Family\Pool\Mysql as MysqlPool;
use Family\Coroutine\Coroutine;


class Dao
{
    /**
     * @var entity名
     */
    private $entity;

    /**
     * @var mysql连接数组
     * @desc 不同协程不能复用mysql连接,所以通过协程id进行资源隔离
     */
    private $dbs;

    /**
     * @var Mysql
     */
    private $db;

    //表名
    private $table;

    //主键字段名
    private $pkId;


    public function __construct($entity)
    {
        $this->entity = $entity;
        $coId = Coroutine::getId();
        if (empty($this->dbs[$coId])) {
            //不同协程不能复用mysql连接,所以通过协程id进行资源隔离
            //达到同一协程只用一个mysql连接,不同协程用不同的mysql连接
            $this->dbs[$coId] = MysqlPool::getInstance()->get();
            $entityRef = new \ReflectionClass($this->entity);
            $this->table = $entityRef->getConstant('TABLE_NAME');
            $this->pkId = $entityRef->getConstant('PK_ID');
            defer(function () {
                //利用协程的defer特性,自动回收资源
                $this->recycle();
            });
        }
        $this->db = $this->dbs[$coId];
    }

    /**
     * @throws \Exception
     * @desc mysql资源回收到连接池
     */
    public function recycle()
    {
        $coId = Coroutine::getId();
        if (!empty($this->dbs[$coId])) {
            $mysql = $this->dbs[$coId];
            MysqlPool::getInstance()->put($mysql);
            unset($this->dbs[$coId]);
        }
    }

    /**
     * @return mixed
     * @desc 获取表名
     */
    public function getLibName()
    {
        return $this->table;
    }

    /**
     * @param $id
     * @param string $fields
     * @return mixed
     * @desc 通过主键查询记录
     */
    public function fetchById($id, $fields = '*')
    {
        return $this->fetchEntity("{$this->pkId} = {$id}", $fields);
    }

    /**
     * @param string $where
     * @param string $fields
     * @param null $orderBy
     * @return mixed
     * @desc 通过条件查询一条记录,并返回一个entity
     */
    public function fetchEntity($where = '1', $fields = '*', $orderBy = null)
    {
        $result = $this->fetchArray($where, $fields, $orderBy, 1);
        if (!empty($result[0])) {
            return new $this->entity($result[0]);
        }
        return null;
    }

    /**
     * @param string $where
     * @param string $fields
     * @param null $orderBy
     * @param int $limit
     * @return mixed
     * @desc 通过条件查询记录列表,并返回entity列表
     */
    public function fetchAll($where = '1', $fields = '*', $orderBy = null, $limit = 0)
    {
        $result = $this->fetchArray($where, $fields, $orderBy, $limit);
        if (empty($result)) {
            return $result;
        }
        foreach ($result as $index => $value) {
            $result[$index] = new $this->entity($value);
        }
        return $result;
    }


    /**
     * @param string $where
     * @param string $fields
     * @param null $orderBy
     * @param int $limit
     * @return mixed
     * @desc 通过条件查询
     */
    public function fetchArray($where = '1', $fields = '*', $orderBy = null, $limit = 0)
    {
        $query = "SELECT {$fields} FROM {$this->getLibName()} WHERE {$where}";

        if ($orderBy) {
            $query .= " order by {$orderBy}";
        }

        if ($limit) {
            $query .= " limit {$limit}";
        }
        return $this->db->query($query);
    }

    /**
     * @param array $array
     * @return bool
     * @desc 插入一条记录
     */
    public function add(array $array)
    {

        $strFields = '`' . implode('`,`', array_keys($array)) . '`';
        $strValues = "'" . implode("','", array_values($array)) . "'";
        $query = "INSERT INTO {$this->getLibName()} ({$strFields}) VALUES ({$strValues})";
        if (!empty($onDuplicate)) {
            $query .= 'ON DUPLICATE KEY UPDATE ' . $onDuplicate;
        }
        echo $query . PHP_EOL;
        $result = $this->db->query($query);
        if (!empty($result['insert_id'])) {
            return $result['insert_id'];
        }

        return false;
    }

    /**
     * @param array $array
     * @param $where
     * @return bool
     * @throws \Exception
     * @desc 按条件更新记录
     */
    public function update(array $array, $where)
    {
        if (empty($where)) {
            throw new \Exception('update 必需有where条件限定');
        }
        $strUpdateFields = '';
        foreach ($array as $key => $value) {
            $strUpdateFields .= "`{$key}` = '{$value}',";
        }
        $strUpdateFields = rtrim($strUpdateFields, ',');
        $query = "UPDATE {$this->getLibName()} SET {$strUpdateFields} WHERE {$where}";
        echo $query;
        $result = $this->db->query($query);
        return $result['affected_rows'];
    }

    /**
     * @param $where
     * @return mixed
     * @throws \Exception
     * @desc 按条件删除记录
     */
    public function delete($where)
    {
        if (empty($where)) {
            throw new \Exception('delete 必需有where条件限定');
        }

        $query = "DELETE FROM {$this->getLibName()} WHERE {$where}";
        $result = $this->db->query($query);
        return $result['affected_rows'];
    }
}

这里有个需要注意的地方就是,不同的协程不能复用一个mysql连接,但同一协程内可以复用,所以在Dao类做了一些优化处理

 

 

5、业务DAO实现

有了dao的基类,我们的业务dao实现就非常简单的了,代码如下:

<?php
//file application/dao/User.php
namespace dao;


use Family\MVC\Dao;
use Family\Core\Singleton;

class User extends Dao
{
    use Singleton;

    public function __construct()
    {
        parent::__construct('\entity\User');
    }
}

这里有个

trait Singleton

作用是代码复用,因为实现单例的方法都一样,代码如下:

<?php
//file frame/Family/Core/Singleton.php
namespace Family\Core;


trait Singleton
{
    private static $instance;

    static function getInstance(...$args)
    {
        if (!isset(self::$instance)) {
            self::$instance = new static(...$args);
        }
        return self::$instance;
    }
}

 

6、业务Service实现

业务逻辑也简单,实现了几个功能,代码如下:

<?php
//file application/service/User.php
namespace service;

use dao\User as UserDao;
use Family\Core\Singleton;


class User
{
    use Singleton;

    /**
     * @param $id
     * @return mixed
     * @desc 通过uid查询用户信息
     */
    public function getUserInfoByUId($id)
    {
        return UserDao::getInstance()->fetchById($id);
    }

    /**
     * @return mixed
     * @desc 获取所有用户列表
     */
    public function getUserInfoList()
    {
        return UserDao::getInstance()->fetchAll();
    }

    /**
     * @param array $array
     * @return bool
     * @desc 添加一个用户
     */
    public function add(array $array)
    {
        return UserDao::getInstance()->add($array);
    }

    /**
     * @param array $array
     * @param $id
     * @return bool
     * @throws \Exception
     * @desc 按id更新一个用户
     */
    public function updateById(array $array, $id)
    {
        return UserDao::getInstance()->update($array, "id={$id}");
    }

    /**
     * @param $id
     * @return mixed
     * @throws \Exception
     * @desc 按id删除用户
     */
    public function deleteById($id)
    {
        return UserDao::getInstance()->delete("id={$id}");
    }
}

 

7、Controller实现

最后就是实现controller了,代码也很简单,如下:

<?php
//file: application/controller/Index.php
namespace controller;


use Family\MVC\Controller;
use service\User as UserService;

class Index extends Controller
{
    public function index()
    {

        return 'i am family by route!' . json_encode($this->request->get);
    }

    public function tong()
    {
        return 'i am tong ge';
    }

    /**
     * @return false|string
     * @throws \Exception
     * @desc 返回一个用户信息
     */
    public function user()
    {
        if (empty($this->request->get['uid'])) {
            throw new \Exception("uid 不能为空 ");
        }
        $result = UserService::getInstance()->getUserInfoByUId($this->request->get['uid']);
        return json_encode($result);

    }

    /**
     * @return false|string
     * @desc 返回用户列表
     */
    public function list()
    {
        $result = UserService::getInstance()->getUserInfoList();
        return json_encode($result);

    }

    /**
     * @return bool
     * @desc 添加用户
     */
    public function add()
    {
        $array = [
            'name' => $this->request->get['name'],
            'password' => $this->request->get['password'],
        ];

        return UserService::getInstance()->add($array);
    }

    /**
     * @return bool
     * @throws \Exception
     * @desc 更新用户信息
     */
    public function update()
    {
        $array = [
            'name' => $this->request->get['name'],
            'password' => $this->request->get['password'],
        ];
        $id = $this->request->get['id'];
        return UserService::getInstance()->updateById($array, $id);
    }

    /**
     * @return mixed
     * @throws \Exception
     * @desc 删除用户信息
     */
    public function delete()
    {
        $id = $this->request->get['id'];
        return UserService::getInstance()->deleteById($id);
    }

}

这里也把controller一些操作抽出成一个基类,目前只是把request对象抽出来了,后续如:参数的获取和判断,安全过滤等等,都可以在基类里补充,代码如下:

<?php
//file framework/Family/MVC/Controller.php
namespace Family\MVC;


use Family\Pool\Context;

class Controller
{
    protected $request;

    public function __construct()
    {
        //通过context拿到$request, 再也不用担收数据错乱了
        $context = Context::getContext();
        $this->request = $context->getRequest();
    }
}

 

8、运行

http://127.0.0.1:9501/Index/add?name=shenzhe5&password=555

http://127.0.0.1:9501/Index/list

http://127.0.0.1:9501/Index/update?name=shenzhe0&password=555&id=5

http://127.0.0.1:9501/Index/delete?id=5

至此,我们就拥有一个完整的CRUD的操作,并且把MVC的基础功能封装在框架里了

可以看到,代码的思路和习惯和常用的fpm web开发框架几无差别

 

下一篇:我们将拥抱composer, 并通过整合一个composer router包,把router功能升级,有了composer,我们将有了取之不尽的各类库,框架的能力也将大大的得到拓展

 

composer: https://getcomposer.org/

github地址: https://github.com/shenzhe/family

 

--------------伟大的分割线----------------

PHP饭米粒(phpfamily) 由一群靠谱的人建立,愿为PHPer带来一些值得细细品味的精神食粮!

饭米粒只发原创或授权发表的文章,不转载网上的文章

所发的文章,均可找到原作者进行沟通。

也希望各位多多打赏(算作稿费给文章作者),更希望大家多多投搞。

投稿请联系:

shenzhe163@gmail.com

 

本文由 半桶水 授权 饭米粒 发布,转载请注明本来源信息和以下的二维码(长按可识别二维码关注)

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部