原创

php实现多进程socket服务器class

温馨提示:
本文最后更新于 2018年05月02日,已超过 2,331 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

该类实现了多进程的socket服务,目前只写了关于TCP协议,待完善其他内容

所需扩展:socket  pcntl

<?php
/**
 * Created by PhpStorm.
 * User: tioncico
 * Date: 18-5-1
 * Time: 下午7:56
 */

class SphpSocket
{

    private static $_instance;
    public $connect_list = array();//客户端列表
    public $connect_callback, $receive_callback, $close_callback;//回调函数
    public $server;//socket服务
    public $is_run=true;//是否运行
    public $config = array(//各种配置
        'debug'=>true,

        'host' => '0.0.0.0',
        'port' => '9501',

        'domain'   => AF_INET,
        'type'     => SOCK_STREAM,
        'protocol' => SOL_TCP,

        'accept' => 511,

        'option_level' => SOL_SOCKET,
        'optname'      => SO_REUSEADDR,
        'optval'       => 1,

        'read_length'=>1024,
        'read_type'=>PHP_NORMAL_READ

    );

    public $error_log=array();

    public static function getInstance($host, $port)
    {
        if (!(self::$_instance instanceof self)) {
            self::$_instance = new static($host, $port);
        }
        return self::$_instance;
    }

    public function __construct($host, $port)
    {
        $this->config['host'] = $host;
        $this->config['port'] = $port;
    }

    /**
     * 绑定事件
     * @param $type connect|receive|close
     * @param callable $function
     */
    public function on($type, callable $function)
    {
        switch (strtolower($type)) {
            case 'connect':
                $this->connect_callback = $function;
                break;
            case 'receive':
                $this->receive_callback = $function;
                break;
            case 'close':
                $this->close_callback = $function;
                break;
        }
        return $this;
    }

    public function onConnect($connection){
        if (is_callable($this->connect_callback)) {
            call_user_func($this->connect_callback,$connection);
        }
    }
    public function onReceive($connection,$data){

        if (is_callable($this->receive_callback)) {
            call_user_func($this->receive_callback,$connection,$data);
        }
    }
    public function onClose($connection){
        if (is_callable($this->close_callback)) {
            call_user_func($this->close_callback,$connection);
        }
    }

    /**
     *
     */
    public function start()
    {
        $this->createSocket();
        echo '创建socket成功!'.PHP_EOL;
        $this->bindSocket();
        echo '绑定端口成功!'.PHP_EOL;
        $this->listenSocket();
        echo '监听端口成功!'.PHP_EOL;
        $this->setOptionSocket();
        $this->acceptSocket();
        return $this;
    }

    /**
     * 创建socket
     * @return $this
     * @throws Exception
     */
    protected function createSocket()
    {
        $this->server = socket_create($this->config['domain'], $this->config['type'], $this->config['protocol']);
        if ($this->server === false) {
            throw new Exception('创建socket失败!');
        }
        return $this;
    }

    /**
     * 绑定端口
     * @return $this
     * @throws Exception
     */
    protected function bindSocket()
    {
        $this->server === false && $this->createSocket();

        $result = socket_bind($this->server, $this->config['host'], $this->config['port']);
        if ($result === false) {
            throw new Exception('绑定端口失败!');
        }
        return $this;
    }

    /**
     * 监听端口
     * @param null $accept
     * @return $this
     * @throws Exception
     */
    protected function listenSocket($accept = null)
    {
        $this->server === false && $this->createSocket();
        $accept || $accept = $this->config['accept'];
        $result = socket_listen($this->server, $accept);
        if ($result === false) {
            throw new Exception('监听端口失败!');
        }
        return $this;
    }

    /**
     * 配置socket
     * @return $this
     * @throws Exception
     */
    protected function setOptionSocket()
    {
        $this->server === false && $this->createSocket();
        $result = socket_set_option($this->server, $this->config['option_level'], $this->config['optname'], $this->config['optval']);
        if ($result === false) {
            throw new Exception('配置socket失败!');
        }
        return $this;
    }

    /**
     * 接收socket连接
     */
    protected function acceptSocket(){
        $this->server === false && $this->createSocket();
        while(true&&$this->is_run===true){
            $connection = socket_accept($this->server);
            if($connection===false){

            }else{
                $this->addConnectionList($connection);
                $this->onConnect($connection);
                $this->forkProcess($connection);
            }

        }
    }

    /**
     * 写入客户端信息
     * @param $connection
     * @return $this
     */
    protected function addConnectionList($connection){
//        $fd =
        $this->connect_list[(string)$connection]['fd']=$connection;
        return $this;
    }

    /**
     * 写入客户端进程id
     * @param $connection
     * @param $pid
     * @return $this
     */
    protected function addConnectionListProcess($connection,$pid){
        $this->connect_list[(string)$connection]['pid']=$pid;
        return $this;
    }

    /**
     * 派生进程处理
     * @param $connection
     */
    protected function forkProcess($connection){
   
        $pid = pcntl_fork();
        if($pid>0){//使用主进程处理客户端其他请求,子进程继续监听连接请求
            $this->addConnectionListProcess($connection,$pid);
            $this->readSocket($connection);
        }else{
        }
    }

    /**
     * 读取socket信息
     * @param $connection
     */
    protected function readSocket($connection){
        while(true&&isset($this->connect_list[(string)$connection])&&$this->is_run){
            $data = @socket_read($connection,$this->config['read_length'],$this->config['read_type']);
            if($data===false){
                $this->close($connection);
            }else{
                $this->onReceive($connection,$data);
            }
        }
   }

    /**
     * 发送消息给客户端
     * @param $connection
     * @param $msg
     * @return int
     */
   public function send($connection,$msg){
       $result = socket_write($connection, $msg,strlen($msg));
       return $result;
   }

    /**
     * 主动关闭客户端
     * @param $connection
     */
   public function close($connection){
       $this->onClose($connection);
       //先关掉子进程
       posix_kill($this->connect_list[(string)$connection]['pid'],SIGTERM);
       $result = socket_close($connection);
       unset($this->connect_list[(string)$connection]);
       return $result;
   }
}

开启例子:

$socket = SphpSocket::getInstance('0.0.0.0',9501);
$socket->on('connect',function ($connection)use($socket){
    $socket->send($connection,'恭喜您连接成功!');
});
$socket->on('receive',function ($connection,$data)use($socket){
    $result = $socket->send($connection,'您发送的消息是:'.$data);
    var_dump($data);
    if(trim($data)=='关闭'){
        $socket->close($connection);
    }
    echo "发送消息成功";

});
$socket->on('close',function ($connection)use($socket){
    var_dump($connection.'已经关闭连接');
});
$socket->start();

仙士可博客

正文到此结束
本文目录