php实现多进程socket服务器class

该类实现了多进程的socket服务,目前只写了关于TCP协议,待完善其他内容

所需扩展:socket  pcntl

<?php
/**
 * Created by PhpStorm.
 * User: tioncico
 * Date: 18-5-1
 * Time: 下午7:56
 */

class SphpSocket
{

    private static $_instance;
    public $connect_list = array();//客户端列表
    public $connect_callback, $receive_callback, $close_callback;//回调函数
    public $server;//socket服务
    public $is_run=true;//是否运行
    public $config = array(//各种配置
        'debug'=>true,

        'host' => '0.0.0.0',
        'port' => '9501',

        'domain'   => AF_INET,
        'type'     => SOCK_STREAM,
        'protocol' => SOL_TCP,

        'accept' => 511,

        'option_level' => SOL_SOCKET,
        'optname'      => SO_REUSEADDR,
        'optval'       => 1,

        'read_length'=>1024,
        'read_type'=>PHP_NORMAL_READ

    );

    public $error_log=array();

    public static function getInstance($host, $port)
    {
        if (!(self::$_instance instanceof self)) {
            self::$_instance = new static($host, $port);
        }
        return self::$_instance;
    }

    public function __construct($host, $port)
    {
        $this->config['host'] = $host;
        $this->config['port'] = $port;
    }

    /**
     * 绑定事件
     * @param $type connect|receive|close
     * @param callable $function
     */
    public function on($type, callable $function)
    {
        switch (strtolower($type)) {
            case 'connect':
                $this->connect_callback = $function;
                break;
            case 'receive':
                $this->receive_callback = $function;
                break;
            case 'close':
                $this->close_callback = $function;
                break;
        }
        return $this;
    }

    public function onConnect($connection){
        if (is_callable($this->connect_callback)) {
            call_user_func($this->connect_callback,$connection);
        }
    }
    public function onReceive($connection,$data){

        if (is_callable($this->receive_callback)) {
            call_user_func($this->receive_callback,$connection,$data);
        }
    }
    public function onClose($connection){
        if (is_callable($this->close_callback)) {
            call_user_func($this->close_callback,$connection);
        }
    }

    /**
     *
     */
    public function start()
    {
        $this->createSocket();
        echo '创建socket成功!'.PHP_EOL;
        $this->bindSocket();
        echo '绑定端口成功!'.PHP_EOL;
        $this->listenSocket();
        echo '监听端口成功!'.PHP_EOL;
        $this->setOptionSocket();
        $this->acceptSocket();
        return $this;
    }

    /**
     * 创建socket
     * @return $this
     * @throws Exception
     */
    protected function createSocket()
    {
        $this->server = socket_create($this->config['domain'], $this->config['type'], $this->config['protocol']);
        if ($this->server === false) {
            throw new Exception('创建socket失败!');
        }
        return $this;
    }

    /**
     * 绑定端口
     * @return $this
     * @throws Exception
     */
    protected function bindSocket()
    {
        $this->server === false && $this->createSocket();

        $result = socket_bind($this->server, $this->config['host'], $this->config['port']);
        if ($result === false) {
            throw new Exception('绑定端口失败!');
        }
        return $this;
    }

    /**
     * 监听端口
     * @param null $accept
     * @return $this
     * @throws Exception
     */
    protected function listenSocket($accept = null)
    {
        $this->server === false && $this->createSocket();
        $accept || $accept = $this->config['accept'];
        $result = socket_listen($this->server, $accept);
        if ($result === false) {
            throw new Exception('监听端口失败!');
        }
        return $this;
    }

    /**
     * 配置socket
     * @return $this
     * @throws Exception
     */
    protected function setOptionSocket()
    {
        $this->server === false && $this->createSocket();
        $result = socket_set_option($this->server, $this->config['option_level'], $this->config['optname'], $this->config['optval']);
        if ($result === false) {
            throw new Exception('配置socket失败!');
        }
        return $this;
    }

    /**
     * 接收socket连接
     */
    protected function acceptSocket(){
        $this->server === false && $this->createSocket();
        while(true&&$this->is_run===true){
            $connection = socket_accept($this->server);
            if($connection===false){

            }else{
                $this->addConnectionList($connection);
                $this->onConnect($connection);
                $this->forkProcess($connection);
            }

        }
    }

    /**
     * 写入客户端信息
     * @param $connection
     * @return $this
     */
    protected function addConnectionList($connection){
//        $fd =
        $this->connect_list[(string)$connection]['fd']=$connection;
        return $this;
    }

    /**
     * 写入客户端进程id
     * @param $connection
     * @param $pid
     * @return $this
     */
    protected function addConnectionListProcess($connection,$pid){
        $this->connect_list[(string)$connection]['pid']=$pid;
        return $this;
    }

    /**
     * 派生进程处理
     * @param $connection
     */
    protected function forkProcess($connection){
   
        $pid = pcntl_fork();
        if($pid>0){//使用主进程处理客户端其他请求,子进程继续监听连接请求
            $this->addConnectionListProcess($connection,$pid);
            $this->readSocket($connection);
        }else{
        }
    }

    /**
     * 读取socket信息
     * @param $connection
     */
    protected function readSocket($connection){
        while(true&&isset($this->connect_list[(string)$connection])&&$this->is_run){
            $data = @socket_read($connection,$this->config['read_length'],$this->config['read_type']);
            if($data===false){
                $this->close($connection);
            }else{
                $this->onReceive($connection,$data);
            }
        }
   }

    /**
     * 发送消息给客户端
     * @param $connection
     * @param $msg
     * @return int
     */
   public function send($connection,$msg){
       $result = socket_write($connection, $msg,strlen($msg));
       return $result;
   }

    /**
     * 主动关闭客户端
     * @param $connection
     */
   public function close($connection){
       $this->onClose($connection);
       //先关掉子进程
       posix_kill($this->connect_list[(string)$connection]['pid'],SIGTERM);
       $result = socket_close($connection);
       unset($this->connect_list[(string)$connection]);
       return $result;
   }
}

开启例子:

$socket = SphpSocket::getInstance('0.0.0.0',9501);
$socket->on('connect',function ($connection)use($socket){
    $socket->send($connection,'恭喜您连接成功!');
});
$socket->on('receive',function ($connection,$data)use($socket){
    $result = $socket->send($connection,'您发送的消息是:'.$data);
    var_dump($data);
    if(trim($data)=='关闭'){
        $socket->close($connection);
    }
    echo "发送消息成功";

});
$socket->on('close',function ($connection)use($socket){
    var_dump($connection.'已经关闭连接');
});
$socket->start();

仙士可博客


仙士可博客
请先登录后发表评论
  • 最新评论
  • 总共2条评论
仙士可博客

x:close 方法错了

2018-05-06 14:51:20 回复

仙士可博客
  • 仙士可 回复 x:有啥错?麻烦指出下咯
  • 2018-05-06 15:48:07 回复
仙士可博客
  • x 回复 仙士可:1. pid没有,fork之后父进程和子进程数据分开2. 子进程kill自己?
  • 2018-05-06 15:54:22 回复
仙士可博客
  • 仙士可 回复 x仙士可博客多谢提醒,之前的主进程,子进程搞混了,所以代码运行一直没问题,现在改成子进程处理连接,就出错了,我再研究研究
  • 2018-05-06 16:01:01 回复
仙士可博客
  • 仙士可 回复 x:刚刚研究了下,之前在主进程A fork之后,留给主进程A 处理连接,然后子进程成为了"主进程"B 响应其他客户端请求,然后"主进程"B再次处理下一个请求,fork出一个子进程成为了下一个"主进程"C 进行响应,这样的话,close是没有问题的
  • 2018-05-06 16:23:39 回复
仙士可博客
  • x 回复 仙士可:你看你close方法kill掉了子进程,你同时两个连接会出现问题。然后你当前处理的进程连接断开后需要退出,不然又会去监听。还有,父进程退出,子进程会变成孤儿进程,好像是这样。。
  • 2018-05-06 16:43:46 回复
仙士可博客
  • 仙士可 回复 x:恩,之前是考虑不够,正在重新写,分出进程管理和socket处理
  • 2018-05-06 18:57:11 回复
仙士可博客

x:不应该是子进程处理新来的连接吗?主进程处理有bug吧?

2018-05-05 18:31:45 回复

仙士可博客
  • 仙士可 回复 x:写错了,哈哈
  • 2018-05-06 08:28:40 回复
  • 本站由白俊遥博客程序搭建
    © 2017-1-17 php20.cn 版权所有 ICP证:闽ICP备17001387号
  • 联系邮箱:1067197739@qq.com