编程学习网 > PHP技术 > php高级 > swoole教程:swoole4.0操作mysql连接池之读写分离
2019
01-29

swoole教程:swoole4.0操作mysql连接池之读写分离

为什么要读写分离?


  1. 一般的系统都是读多写少,利用读写分离,可以提升mysql的效率

  2. 读写分离后,从库可以水平扩展



下面我们开始代码之旅吧


配置先改造:

$config = [ 'host' => '127.0.0.1', //数据库ip     'port' => 3306, //数据库端口     'user' => 'root', //数据库用户名     'password' => '123456', //数据库密码     'database' => 'test', //默认数据库名     'timeout' => 0.5, //数据库连接超时时间     'charset' => 'utf8mb4', //默认字符集     'strict_type' => true, //ture,会自动表数字转为int类型     'pool_size' => '3', //连接池大小     'pool_get_timeout' => 0.5, //当在此时间内未获得到一个连接,会立即返回。(表示所以的连接都已在使用中) ];

改成:

$config = [ 'pool_size' => '3', //连接池大小     'pool_get_timeout' => 0.5, //当在此时间内未获得到一个连接,会立即返回。(表示所以的连接都已在使用中)     'master' => [ 'host' => '127.0.0.1', //数据库ip         'port' => 3306, //数据库端口         'user' => 'root', //数据库用户名         'password' => '123456', //数据库密码         'database' => 'test', //默认数据库名         'timeout' => 0.5, //数据库连接超时时间         'charset' => 'utf8mb4', //默认字符集         'strict_type' => true, //ture,会自动表数字转为int类型     ], 'slave' => [
        [ 'host' => '127.0.0.1', //从数据库1ip             'port' => 3306, //从数据库1端口             'user' => 'root', //从数据库1用户名             'password' => '123456', //从数据库1密码             'database' => 'test', //默认数据库名             'timeout' => 0.5, //数据库连接超时时间             'charset' => 'utf8mb4', //默认字符集             'strict_type' => true, //ture,会自动表数字转为int类型         ],
        [ 'host' => '127.0.0.1', //从数据库2ip             'port' => 3306, //从数据库2端口             'user' => 'root', //从数据库2用户名             'password' => '123456', //数据库密码             'database' => 'test', //默认数据库名             'timeout' => 0.5, //数据库连接超时时间             'charset' => 'utf8mb4', //默认字符集             'strict_type' => true, //ture,会自动表数字转为int类型         ]
    ],
];

上面模拟的一主两从,由于我本机没有搭建mysql的主从,所以这里主从配置一样


由于我们要做到对上层业务无感知,所以我们只需要改动mydb.php这一层就行了

<?php use Swoole\Coroutine\MySQL; class mydb { /**      * @var MySQL      */     private $master; //主数据库连接     private $slave; //从数据库连接list     private $config; //数据库配置     /**      * @param $config      * @return mixed      * @desc 连接mysql      */     public function connect($config)
    { //创建主数据连接         $master = new MySQL();
        $res = $master->connect($config['master']); if ($res === false) { //连接失败,抛弃常             throw new RuntimeException($master->connect_error, $master->errno);
        } else { //存入master资源             $this->master = $master;
        } //创建从数据库连接         foreach ($config['slave'] as $conf) {
            $slave = new MySQL();
            $res = $slave->connect($conf); if ($res === false) { //连接失败,抛弃常                 throw new RuntimeException($slave->connect_error, $slave->errno);
            } else { //存入slave资源                 $this->slave[] = $slave;
            }
        }

        $this->config = $config; return $res;
    } /**      * @param $type      * @param $index      * @return MySQL      * @desc 单个数据库重连      */     public function reconnect($type, $index)
    { //通过type判断是主还是从         if ('master' == $type) { //创建主数据连接             $master = new MySQL();
            $res = $master->connect($this->config['master']); if ($res === false) { //连接失败,抛弃常                 throw new RuntimeException($master->connect_error, $master->errno);
            } else { //更新主库连接                 $this->master = $master;
            } return $this->master;
        } //创建从数据连接         $slave = new MySQL();
        $res = $slave->connect($this->config['slave'][$index]); if ($res === false) { //连接失败,抛弃常             throw new RuntimeException($slave->connect_error, $slave->errno);
        } else { //更新对应的重库连接             $this->slave[$index] = $slave;
        } return $slave;
    } /**      * @param $name      * @param $arguments      * @return mixed      * @desc 利用__call,实现操作mysql,并能做断线重连等相关检测      */     public function __call($name, $arguments)
    {
        $sql = $arguments[0];
        $res = $this->chooseDb($sql); print_r($res);
        $db = $res['db'];
        $result = call_user_func_array([$db, $name], $arguments); if (false === $result) { if (!$db->connected) { //断线重连                 echo "mysql reconnect" . PHP_EOL;
                $db = $this->reconnect($res['type'], $res['index']);
                $result = call_user_func_array([$db, $name], $arguments); return $this->parseResult($result, $db);
            } if (!empty($db->errno)) { //有错误码,则抛出弃常                 throw new RuntimeException($db->error, $db->errno);
            }
        } return $this->parseResult($result, $db);
    } /**      * @param $result      * @param $db MySQL      * @return array      * @desc 格式化返回结果:查询:返回结果集,插入:返回新增id, 更新删除等操作:返回影响行数      */     public function parseResult($result, $db)
    { if ($result === true) { return [ 'affected_rows' => $db->affected_rows, 'insert_id' => $db->insert_id,
            ];
        } return $result;
    } /**      * @param $sql      * @desc 根据sql语句,选择主还是从      * @ 判断有select 则选择从库, insert, update, delete等选择主库      * @return array      */     protected function chooseDb($sql)
    { //查询语句,随机选择一个从库         if ('select' == strtolower(substr($sql, 0, 6))) { if (1 == count($this->slave)) {
                $index = 0;
            } else {
                $index = array_rand($this->slave);
            } return [ 'type' => 'slave', 'index' => $index, 'db' => $this->slave[$index],

            ];
        } return [ 'type' => 'master', 'index' => 0, 'db' => $this->master         ];
    }

}
  1. connect的时候,我们自动把主,从所有的连接都建立好

  2. 执行sql的时候,我们跟据sql语句是否有select关键词,来判断选择主库还是从库

  3. 重连操作,只重连有问题的连接,不重连所有的连接


一个支持读写分离的mysql数据库接连池就OK了,是不是很简单的,大家完全可以在此基础上封装自己的CRUD等相关操作


我们在http server里增加 Insert的逻辑:

if ($request->server['path_info'] == '/add') {
    go(function () use ($request, $response) { //从池子中获取一个实例         try {
            $pool = MysqlPool::getInstance(); echo "当前可用连接数:" . $pool->getLength() . PHP_EOL;
            $mysql = $pool->get(); echo "当前可用连接数:" . $pool->getLength() . PHP_EOL;
            defer(function () use ($mysql) { //协程执行完成,归还$mysql到连接池                 MysqlPool::getInstance()->put($mysql); echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
            });
            $ct = time();
            $title = $request->get['title'];
            $result = $mysql->query("insert into test values(NULL, '{$title}', '{$ct}')");
            $response->end(json_encode($result));
        } catch (\Exception $e) {
            $response->end($e->getMessage());
        }
    }); return;
}


访问:http://127.0.0.1:9501/add?title=测试

再访问: http://127.0.0.1:9501/list, 看看是否有最新的



抛个问题:

主从分离后,如果写入之后马上读,大概率会读不到?为什么?有什么方案解决?

扫码二维码 获取免费视频学习资料

Python编程学习

查 看2022高级编程视频教程免费获取