原创

php进程通信-消息队列

温馨提示:
本文最后更新于 2018年06月08日,已超过 2,295 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

php多进程通信,有各种各样的方法(进程信号,消息队列,管道,共享内存,socket等等)

本文主要讲php利用linux 消息队列的通信方法

注意:多进程系列文章,都建立在linux环境,php-cli运行模式下

一:消息队列通信介绍

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

消息队列的最佳定义是:内核地址空间中的内部链表。消息可以顺序地发送到队列中,并以几种不同的方式从队列中获取。当然,每个消息队列都是由 IPC标识符所唯一标识的。

二:php消息队列扩展

php如果要使用linux的消息队列,需要安装sysvmsg扩展,官方文档地址:http://php.net/manual/zh/book.sem.php

三:php使用消息队列

1:获取一个IPC标识符ftok();

ftok,可将项目路径与文件标识转换成一个IPC标识符,该标识符可用于创建消息队列

仙士可博客

2:获取/创建一个消息队列msg_get_queue()

仙士可博客

使用linux命令ipcs -q 可查看系统当前的消息队列数

仙士可博客

3:插入数据到队列msg_send()


msg_send (

resource $queue(消息队列资源句柄) ,

int $msgtype(插入数据的类型,用来标识该队列自己的消息类型,自己自定义,必须大于0) ,

mixed $message(插入的数据,可以为数组,下一个参数可以序列化数据)

[, bool $serialize = TRUE(是否序列化数据,默认是)

[, bool $blocking = TRUE (如果消息太大而无法放入队列(linux消息队列限制),则脚本将等待另一个进程从队列中读取消息,并释放足够的空间以发送消息。这被称为阻塞; 您可以通过设置可选blocking参数来防止阻塞FALSE,在这种情况下,如果消息对于队列来说太大,msg_send()将立即返回,并将可选参数FALSE设置 errorcode为MSG_EAGAIN,表示您应稍后尝试再次发送消息。)

[, int &$errorcode ]]] (错误标识));

仙士可博客

插入成功之后,ipcs可看到message多了一条:

仙士可博客

4:取出一条数据msg_receive

msg_receive (

resource $queue , (消息队列资源句柄)

int $desiredmsgtype (要取出的消息队列类型,如果为0,则不筛选类型,直接返回最先插入的那条,大于0,则筛选类型,返回最先插入的类型数据,小于0,则返回小于等于绝对值的数据,如果消息队列暂无满足要求的数据,则阻塞或者返回false,由flag参数配置),

int &$msgtype (当取出数据时,该变量会赋值为该数据的类型),

int $maxsize (消息的最大大小被指定的被接受 maxsize; 如果队列中的消息大于此大小,则该功能将失败(除非flags按照以下说明设置 )该参数较迷,没有理解),

mixed &$message (当取出数据时,该变量会赋值为该数据)

[, bool $unserialize = TRUE(是否反序列化数据)

[, int $flags = 0

该选项flags允许您将标志传递给低级msgrcv系统调用。它默认为0,但您可以指定一个或多个以下值(通过将它们相加或相加)。

msg_receive的标志值

MSG_IPC_NOWAIT

如果没有消息 desiredmsgtype,立即返回,不要等待。该函数将失败并返回对应的整数值MSG_ENOMSG

MSG_EXCEPT

将此标志与desiredmsgtype大于0 结合使用 会导致函数接收到不等于的第一条消息 desiredmsgtype

MSG_NOERROR

如果消息长于maxsize,则设置此标志将截断消息, maxsize并且不会发出错误信号。

[, int &$errorcode ]]] )如果该函数失败,errorcode 则可选项将被设置为系统errno变量的值。

仙士可博客

5:删除队列msg_remove_queue ( resource $queue )

顾名思义,该函数可删除一个消息队列

四:linux相关操作

在linux中,主要用ipcs(查看) ipcrm(删除)

1:ipcs


ipcs -h,可查看帮助

仙士可博客

主要需要记住的是:

ipcs -q (查看消息队列)

仙士可博客

ipcs -l  (查看系统配置)

仙士可博客

2:ipcrm

ipcrm -h:

仙士可博客

ipcrm,只要能删除就行啦~~

ipcrm -q id  (删除指定消息队列)

仙士可博客

3:注意!

在使用消息队列时,请注意消息队列的默认限制(限制消息队列数,和消息队列大小),

当到达上限时,会使得写入消息队列操作阻塞(默认阻塞)

五:封装类

创建队列方法,好像有点问题(创建后无法正确使用队列,估计是__FILE__常量问题),暂时没查

使用封装类方法:

$message_queue_key= ftok(__FILE__, 'a');
if(msg_queue_exists($message_queue_key)){//如果有该消息队列,则删除,用于清空之前队列的无用数据
    msg_remove_queue(msg_get_queue($message_queue_key, 0666));
}
$message_queue= msg_get_queue($message_queue_key, 0666);
$msg_queue = new MsgQueue($message_queue);
<?php

/**
 * Created by PhpStorm.
 * User: tioncico
 * Date: 18-5-29
 * Time: 下午11:00
 */
class MsgQueue
{

    public $queue;

    public function __construct($queue)
    {
        $this->queue = $queue;
    }

    public function push($data, $type = 1)
    {
        $result = msg_send($this->queue, $type, $data);
        return $result;
    }

    public function pop($type = 0,$flags = MSG_IPC_NOWAIT)
    {
        msg_receive($this->queue, $type, $message_type, 1024, $message,true,$flags);
//        var_dump($message_type);
//        msg_receive($this->queue,$type,$message_type,1024,$message);
        return $message;
    }

    public function close()
    {
        return msg_remove_queue($this->queue);
    }

    /**
     * 创建一个队列(TODO:疑问待解决)
     * @param string $path_name
     * @param string $prop
     * @param string $perms
     * @return array
     */
    public static function getQueue($path_name = __FILE__, $prop = '1', $perms = '0666')
    {
        $data              = array();
        $data['queue_key'] = ftok($path_name, $prop);
        $data['queue']     = msg_get_queue($data['queue_key'], $perms);
        return $data;
    }
}

七:使用例子

<?php
include_once 'new/MsgQueue.php';
$message_queue_key= ftok(__FILE__, 'a');
$message_queue= msg_get_queue($message_queue_key, 0666);
$queue_obj = new MsgQueue($message_queue);
$pid = pcntl_fork();
if($pid>0){//主进程入列
    while(1){
        $msg = $queue_obj->push((array('a'=>321312,'v'=>'casd')),12456);
        sleep(2);
    }
}else{//子进程出列
    while(1){
        $message = $queue_obj->pop();
        var_dump($message);
        sleep(1);
    }
}

输出:

仙士可博客

正文到此结束
本文目录