编程学习网 > PHP技术 > php高级 > php中RabbitMQ的使用
2020
04-29

php中RabbitMQ的使用



什么是队列

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回。消息使用者再从MQ中取消息进行逻辑处理。对于消耗较大的请求,可以立马返回处理结果。减少服务器压力。为各个子系统之间解耦和异步处理。


rabbitmq的整体结构

attachments-2020-04-3T3Kwqhm5ea912e3c3922.png

rmq简单来说就是一个(生产/消费)的模型结构。消息生产者把数据丢到队列中,消息消费者从队列中取出数据进行逻辑处理。


那么如何确保,生产者添加的数据,能够到达指定的队列中呢?

rmq(消息队列)主要提供了三个概念(中间件?)来确保消息的分发,Exchange(交换机)、RoutingKey(路由)、Queue(队列)

从上面的图也可以看出来, 处理消息的接收、分发,主要在Broker模块中。


Exchange 所有生产消息的入口都是到交换机这里。

exchange通过进来的路由(RoutingKey),去和已binding的规则进行匹配,找到指定的队列。

RoutingKey 我的理解,这里相当于一把钥匙。而binding的操作相当于一把锁头。

Queue 消息的存放区域,等到消费者来取。

Binding Exchange和Queue之间的一个绑定。

从这些概念来看,影响规则的主要是依赖Exchange。


那rmq提供了哪些类型,都有什么特点呢?


exchange类型

RabbitMQ提供了四种Exchange类型

  1. direct
  2. fanout
  3. topic
  4. header(header类型在实际使用中较少,所以在这里就不进行说明。)


Direct Exchange

v2-19822f39451050f423dfce011aa8e4c0_720w.jpg

direct 的规则比较简单。

在发布消息前,需要把exchange和queue做一个绑定。

如果发布消息的时候,RoutingKey 和绑定的值(key)一致。则将消息投递到该队列中。

如果不存在对应的队列,则消息会被丢弃。 (这时候访问rmq管理web时。可以看到消息进来,但是队列中没有值)


Fanout Exchange


v2-3515f8ceefbe3077f3359a42fb84a685_720w.jpg


fanout 类型则更简单一些。 只要exchange和队列做了绑定。发布的消息都会到队列中去。


Topic Exchange


v2-0e5d232064196ce9950957b902b25b73_720w.jpg


相对来说 topic类型要复杂一些。 和direct类型相比。topic相当于模糊匹配,而direct为全等。类似mysql中 ‘like’关键词。

针对direct 类型写一个实例

实例分两部分 生产者、消费者(回调函数)

因为我的代码,对mq的部分做了封装,懒得拆分出来。 所以我只贴业务代码和封装的核心方法。


生产者代码

$mqModel = new Rabbitmq();   // 初始化(rmq连接操作)
$newResult = ['tom','bill','jack'];
if ($mqModel) {
    $mqRoute = 'push_data_to_crm_routing';  // 路由
    $mqExchange = 'push_data_to_crm_exchange';  // 交换机
    $mqQuery = 'push_data_to_crm_queue';  // 队列
    // 建立连接,设置交换机,设置队列
    $mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute);
    foreach ($newResult as $k => $v){
        $push_data = $v;
		$mqModel->publishMessage($push_data,$mqRoute); // 消息推送
	}
}

消费者代码

$mqModel = new Rabbitmq();
// $mqRoute = 'push_data_to_crm_routing'; 消费者用不上路由,因为不需要指定。 只要想取队列,消费即可。 
$mqExchange = 'push_data_to_crm_exchange';
$mqQuery = 'push_data_to_crm_queue';
$mqModel->setChannel()->setExchange($mqExchange,'', AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE);
$zmq->consume(function($msg){
    var_dump($msg);
    return true;
});

封装类中的核心方法

//设置交换机
public function setExchange($changeName = '', $changeType = '', $flags = false) {	
	$errorMsg = '';
	try{
		if(!$this->channel){
			throw new \AMQPQueueException("Error channel on method setExchange", 1);
		}
		$this->exchange = new \AMQPExchange($this->channel);
		if($changeName){
			$this->changeName = $changeName; // 交换机名称
			$this->exchange->setName($changeName); // 设置名称
			$changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT;  // 交换机类型
		}else{
			$this->changeName = '';
		}
		if($changeType){
			$this->changeType = $changeType;
			$this->exchange->settype($changeType);  // 设置交换机类型
		}else{
			$this->changeType = '';
		}
		if($flags){
			$this->exchange->setFlags($flags);  //交换机标志
		}
		if($changeType || $flags){
			$this->exchange->declareExchange();  // 创建
		}
	} catch(AMQPQueueException $ex) {
		$errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	} catch(Exception $ex) {
		$errorMsg = "Exception error exchange:  {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	}
	if($errorMsg){
		throw new Exception($errorMsg, 1);
	}
	return $this;
}

// 设置队列
public function setQueue($queueName = '', $flags = '', $exchange_name = '', $routing_key = '', $arguments=[] ){
	$errorMsg = '';
	try{
		if(!$this->channel){
			throw new \AMQPQueueException("Error channel on method setQueue", 1);
		}
		$this->queue = new \AMQPQueue($this->channel);
		if(!$queueName){
			return false;
		}
		$this->queueName = $queueName;  // 队列名称
		$this->queue->setName($queueName);
		if($flags){
			$this->queue->setFlags($flags);  // 队列标志。与消息持久化有关。 这篇文字不涉及这一块的说明
		}
        if(is_array($arguments) && !empty($arguments)){
            $this->queue->setArguments($arguments);  // 参数配置
        }
		$this->queue->declareQueue();  // 创建一个队列
		$exchange_name = $exchange_name === false ? 
						'' : 
						($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name);

		$routing_key = $routing_key ? $routing_key : $this->queueName;

		if($exchange_name && $routing_key ){
			$this->queue->bind($exchange_name, $routing_key);  // 交换机和队列的绑定操作
		}

	} catch(AMQPQueueException $ex) {
		$errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	} catch(Exception $ex) {
		$errorMsg = "Exception error queue:  {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	}

	if($errorMsg){
		throw new Exception($errorMsg, 1);
	}

	return $this;
}

// 发布消息
public function publishMessage($message = '', $routing_key = '', $flags = AMQP_NOPARAM, $attributes = []){
	if(!$message){
		return false;
	}
	$routing_key = $routing_key ? $routing_key : $this->queueName;
	// 发布消息,带有路由key。如果需要,则会用于关联。
	$this->exchange->publish($message, $routing_key, $flags, $attributes);
	return true;
}

// 消费
public function consume($callback = null, $qos = 0, $isAct = true){
	if($qos){
		$this->channel->qos(0, $qos);
	}
	$errorMsg = '';
	try{
		if(!$this->queue){
			throw new \AMQPQueueException("Error queue on method consume", 1);
		}
		$this->callBackFnc = $callback;
		$this->isAct = $isAct;
		$callback = function($envelope, $queue){
						if(is_callable($this->callBackFnc)){
							call_user_func($this->callBackFnc, $envelope->getBody());
							if($this->isAct){
								$queue->ack($envelope->getDeliveryTag());
							}else{
								$queue->nack($envelope->getDeliveryTag());
							}
						}
					};
		$this->queue->consume($callback);  
	} catch(AMQPQueueException $ex) {
		$errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	} catch(Exception $ex) {
		$errorMsg = "Exception error queue:  {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	}
	if($errorMsg){
		throw new \Exception($errorMsg, 1);
	}
}

因为封装代码里写了很多 try catch 所以看起来特别乱。 还有部分兼容的逻辑。 看起来不舒服,就先删掉再看吧。


执行结果

先跑一遍生产者代码,这里可以用浏览器直接访问。 执行完了之后。 到rabbitmq 的web管理页面中查看。 发现消息已经正常添加到队列中。(web管理页面可查询别的文章开启)


attachments-2020-04-LcAcwXz85ea914fef2e95.jpg

这时候再执行消费者代码。 消费者代码需要在cli下执行。因为消费者为轮询等待,是死循环,无法在浏览器下执行。

v2-e3d16524a8e3befb13094e2c5a5082a9_720w.png

终端输出结果,消息已经被消费掉了。 再次返回web管理页面,队列中的三条记录也没有了。


结尾

实际上,这样的一个流程,是可以直接在实际项目中使用的。

这就是我上一个项目上使用的rmq(仅限应用架构)。

只不过这是一个简略版的。为了确保子系统的独立。

消费者应该单独成一个小系统。


在回调函数上,使用curl请求业务应用。

这样两个系统分来。

业务逻辑还是在原应用中。

还没结束,到这里似乎缺少点人性化的东西。

因为,没建一个队列,都需要单独去运行一个消费者代码。

这样很繁琐,而且很多人都不会有权限直接操作服务器。

所以,这里还需要搭建一个平台,用来自动部署消费端代码运行,停止,更改。

如何去部署,那就需要思考了。


扫码二维码 获取免费视频学习资料

Python编程学习

查 看2022高级编程视频教程免费获取