服务器环境 Ubuntu 18.04.5 LTS
PHP 7.2.24
RabbitMQ 3.6.10
php-amqplib 2.7
apt-get install erlang-nox apt-get install rabbitmq-server rabbitmqctl add_user admin admin rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin '.*' '.*' '.*' //开启web管理页面 //cd到安装目录 我这里是/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10 cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10 rabbitmq-plugins enable rabbitmq_management cd 到项目目录 composer require php-amqplib/php-amqplib 在需要使用的地方 use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable;
//1.1 建立连接 $conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost); 参数: $host: RabbitMQ服务器主机IP地址 $port: RabbitMQ服务器端口 $user: 连接RabbitMQ服务器的用户名 $password: 连接RabbitMQ服务器的用户密码 $vhost: 连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
//1.2 建立信道$channel = $conn->channel($channel_id); 参数: $channel_id 信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids
//1.3 声明交换器 $channel->exchange_declare($exhcange_name, $type, $passive, $durable, $auto_delete); 参数: $exhcange_name 交换器名字 $type 交换器类型 $passive 是否检测同名队列 $durable 交换机是否开启持久化 $auto_detlete 通道关闭后是否删除队列 (1)交换器类型 枚举 [ direct: (默认)直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue, fanout: 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue, topic: 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。(* 表是匹配一个任意词组,#表示匹配0个或多个词组), headers:根据消息体的header匹配 ]
//1.4 声明队列 $channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete); 参数: $queue_name 队列名称 $passive 是否检测同名队列 $durable 是否开启队列持久化 $exclusive 队列是否可以被其他队列访问 $auto_delete 通道关闭后是否删除队列
//1.5 创建要发送的信息 ,可以创建多个消息 $msg = new AMQPMessage($data, $properties) $data 要发送的消息 $properties Array 设置的属性,比如设置该消息持久化['delivery_mode'=>2] //单个发送 $channel->basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null); 参数: $msg 消息内容 $exchange 交换器 $routing_key routing_key $mandatory 匹配不到队列时,是否立即丢弃消息 $immediate 队列无消费者时,是否立即丢弃消息 $ticket 这个俺也不知道 坐等大佬 //多个发送 1.多次调用 $channel->batch_basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null) 内部实现:往$this->batch_messages[]塞 2.再调用一次$channel->publish_batch(),完成发送
1.6 路由绑定 $channel->queue_bind( $queue, $exchange, $routing_key = '', $nowait = false, $arguments = array(), $ticket = null ) 参数: $queue 队列名 $exchange 交换器名 $routing_key routing_key $nowait 同上 俺也不知 $arguments $ticket
1.7 消费消息 $channel->basic_consume( $queue = '', $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array() ) 参数: $queue 队列名 $consumer_tag $no_local $no_ack 是否不需要手动ack:true就是不需要ack|false需要手动ack $exclusive $nowait $callback 消息回调函数 $ticket $arguments
1.8 手动ack 示例
$callback = function($msg) {
sleep($msg->body);
echo " [x] Received sleep ", $msg->body, "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
echo " [x] Ack "."\n";
};1.9 限制分发 示例 限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。 $channel->basic_qos(null,1,null);
这里也顺手补一下最基础的使用,连接参数作为demo就直接写死了哈~
public function send()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
}
public function consume()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
}生产者和消费者均增加 $channel->basic_qos(null,1,null); 即可。
例如注册后需要发送欢迎短信和邮件,将注册行为广播至短信和邮件
生产者//定义交换器$channel->exchange_declare('register','fanout',false,false,false);
$msg = new AMQPMessage('register event');
$channel->basic_publish($msg, 'register');
注册短信消费者
$channel->exchange_declare('register','fanout',false,false,false);
$channel->queue_declare('register.sms', false, false, false, false);
$channel->queue_bind('register.sms', 'register');
注册邮件消费者
$channel->exchange_declare('register','fanout',false,false,false);
$channel->queue_declare('register.mail', false, false, false, false);
$channel->queue_bind('register.mail', 'register');例如我想一个消费者接受所有日志,一个消费者只接收Error级别日志
生产者//定义交换器
$channel->exchange_declare('log','topic',false,false,false);
$num = rand(0,10);
if ($num%3 == 0) {
$level = 'error';
}elseif($num%3 == 1){
$level = 'warning';
}else{
$level = 'common';
}
$msg = new AMQPMessage('log event '.$level);
$channel->basic_publish($msg, 'log', 'log.'.$level);
全量日志消费者
$channel->exchange_declare('log','topic',false,false,false);
$channel->queue_declare('log.all', false, false, false, false);
$channel->queue_bind('log.all', 'log', 'log.*');
Error日志消费者
$channel->exchange_declare('log','topic',false,false,false);
$channel->queue_declare('log.error', false, false, false, false);
$channel->queue_bind('log.error', 'log', 'log.error');例如我想一个消费者接受所有日志,一个消费者只接收Error级别日志
生产者//定义交换器
$channel->exchange_declare('log2','headers',false,false,false);
$num = rand(0,10);
if ($num%3 == 0) {
$level = 'error';
}elseif($num%3 == 1){
$level = 'warning';
}else{
$level = 'common';
}
$msg = new AMQPMessage('log2 event '.$level);
$bindArguments = [
'level' => $level,
'type' => 'log'
];
$headers = new AMQPTable($bindArguments);
$msg->set('application_headers', $bindArguments);
$channel->basic_publish($msg, 'log2');
全量日志消费者
$channel->exchange_declare('log2','headers',false,false,false);
$channel->queue_declare('log2.all', false, false, false, false);
$bindArguments = [
'type' => 'log',
//'x-match' => 'any' //默认any
];
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.all', 'log2', '', false, $headers);
Error日志消费者
$channel->exchange_declare('log2','headers',false,false,false);
$channel->queue_declare('log2.error', false, false, false, false);
$bindArguments = [
'type' => 'log',
'level' => 'error',
'x-match' => 'all' //默认any
];
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.error', 'log2', '', false, $headers);//2.5.1 定义一个没有消费者,5s后消息过期的队列
//生产者
$arguments = new AMQPTable([
'x-dead-letter-exchange' => 'dead',
'x-message-ttl' => 5000, //消息存活时间毫秒
'x-dead-letter-routing-key' => 'dead'
]);
//定义队列 不要交换器
$channel->queue_declare('no_consume', false, false, false, false, false, $arguments);
$now = time();
$msg = new AMQPMessage($now);
$channel->basic_publish($msg, '', 'no_consume');
echo " [x] Sent no_consume :".date('Y-m-d H:i:s',$now)."\n";
$channel->close();
$connection->close();
//消费者
$channel->exchange_declare('dead','topic',false,false,false);
$channel->queue_declare('dead.all', false, false, false, false);
$channel->queue_bind('dead.all', 'dead', 'dead');
$channel->basic_qos(null,1,null);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
var_dump('msg:'.date('Y-m-d H:i:s',$msg->body));
var_dump('now:'.date('Y-m-d H:i:s'));
echo " [x] Received log error ", $msg->body, "\n";
};
$channel->basic_consume('dead.all', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}https://segmentfault.com/a/1190000038779279