编程学习网 > PHP技术 > swoole > Swoole教程案例之进程池模块的使用
2021
05-28

Swoole教程案例之进程池模块的使用

在实际项目中经常需要写一些长期运行的脚本,如基于redis、kafka、rabbitmq实现的多进程队列消费者,多进程爬虫等等。程序员需要使用pcntl和posix相关的扩展库实现多进程编程,需要开发者具备深厚的Linux系统编程功底,否则很容易出现问题。

Swoole提供的进程管理器来自于Swoole\Server,经过大量生产项目验证,稳定性和健壮性都非常高。可大大简化多进程脚本编程工作。下面就给大家分享一下swoole教程

一、 创建进程池

在PHP代码中使用new Swoole\Process\Pool即可创建一个进程池,构造方法的第一个参数传入工作进程的数量。使用on方法设置WorkerStart即可在工作进程启动时执行指定的代码,可以在这里进行while(true)循环从redis队列中获取任务并处理。使用start方法启动所有进程,管理器开始进入wait状态。

  1. $workerNum = 10;
  2. $pool = new Swoole\Process\Pool($workerNum);
  3. $pool->on("WorkerStart", function ($pool, $workerId) {
  4. echo "Worker#{$workerId} is started\n";
  5. $redis = new Redis();
  6. $redis->pconnect('127.0.0.1', 6379);
  7. $key = "key1";
  8. while (true) {
  9. $msgs = $redis->brpop($key, 2);
  10. if ( $msgs == null) continue;
  11. var_dump($msgs);
  12. }
  13. });
  14. $pool->on("WorkerStop", function ($pool, $workerId) {
  15. echo "Worker#{$workerId} is stopped\n";
  16. });
  17. $pool->start();

使用进程管理器,可以保证工作进程的稳定性。

  • 某个工作进程遇到致命错误、主动退出时管理器会进行回收,避免出现僵尸进程
  • 工作进程退出后,管理器会自动拉起、创建一个新的工作进程

二、信号处理

Swoole进程管理器自带了信号处理,向管理器进程发送:

  • SIGTERM信号:中止服务,向所有工作进程发送SIGTERM关闭进程
  • SIGUSR1信号:重启工作进程,管理器会逐个重启工作进程

在工作进程中可以配合使用pcntl_signal和pcntl_signal_dispatch实现信号处理。

  1. $pool->on("WorkerStart", function ($pool, $workerId) {
  2. $running = true;
  3. pcntl_signal(SIGTERM, function () use (&$running) {
  4. $running = false;
  5. });
  6. echo "Worker#{$workerId} is started\n";
  7. $redis = new Redis();
  8. $redis->pconnect('127.0.0.1', 6379);
  9. $key = "key1";
  10. while ($running) {
  11. $msgs = $redis->brpop($key, 2);
  12. pcntl_signal_dispatch();
  13. if ( $msgs == null) continue;
  14. var_dump($msgs);
  15. }
  16. });

三、任务投递

Swoole进程管理器自带了消息队列和TCP-Socket消息投递的支持。可设置监听系统队列或者TCP端口,接收任务数据。此项功能是可选的,要使用任务投递功能,需要对进程池对象设置onMessage回调。

消息队列

  1. $pool = new Swoole\Process\Pool(2, SWOOLE_IPC_MSGQUEUE, 0x7000001);
  2. $pool->on("WorkerStart", function ($pool, $workerId) {
  3. echo "Worker#{$workerId} is started\n";
  4. });
  5. $pool->on("Message", function ($pool, $message) {
  6. echo "Message: {$message}\n";
  7. });
  8. $pool->start();

需要在构造方法的第二个参数传入SWOOLE_IPC_MSGQUEUE,第三个参数设置监听的消息队列KEY。其他程序中使用消息队列相关API就可以向工作进程投递任务了。

TCP 端口

  1. $pool = new Swoole\Process\Pool(2, SWOOLE_IPC_SOCKET);
  2. $pool->on("WorkerStart", function ($pool, $workerId) {
  3. echo "Worker#{$workerId} is started\n";
  4. });
  5. $pool->on("Message", function ($pool, $message) {
  6. echo "Message: {$message}\n";
  7. });
  8. $pool->listen('127.0.0.1', 8089);
  9. $pool->start();

使用TCP端口监听,需要设置构造方法的第二个参数为SWOOLE_IPC_SOCKET,并使用listen方法设置监听的主机和端口。

底层使用了4字节长度+包体的协议。其他程序中向此端口发送数据时,需要在数据包之前增加一个长度字段。


	

  1. $fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
  2. $msg = json_encode(['data' => 'hello', 'uid' => 1991]);
  3. fwrite($fp, pack('N', strlen($msg)).$msg);
  4. fclose($fp);

以上就是这次分享的进程池模块的所有内容,如果觉得对你有用的话欢迎持续关注编程学习网,获取更多资讯与技能


扫码二维码 获取免费视频学习资料

Python编程学习

查 看2022高级编程视频教程免费获取