ETJava Beta | Java    注册   登录
  • 搜索:
  • 【workerman】uniapp+thinkPHP5使用GatewayWorker实现实时通讯

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/nothavebug/p/18286356
    如有侵权 请联系我们删除  (页面底部联系我们)  

    前言

    之前公司需要一个内部的通讯软件,就叫我做一个。通讯软件嘛,就离不开通讯了,然后我就想到了长连接。这里本人用的是GatewayWorker框架。

    什么是GatewayWorker框架?

    GatewayWorker是基于Workerman开发的一套TCP长连接的应用框架,实现了单发、群发、广播等接口,内置了mysql类库,GatewayWorker分为Gateway进程和Worker进程,支持分布式部署,能够支持大量的连接数。

    GatewayWorker的工作原理

    image

    1、启动所有进程(GatewayWorker、business、register)
    
    2、GatewayWorker和business进程启动后向register请求注册
    
    3、register服务收到注册请求后,把所有Gateway的通讯地址保存在内存中同时把内存中所有的Gateway的通讯地址发给business
    
    4、business进程得到所有的Gateway内部通讯地址后进行连接GatewayWorker
    
    5、如果有新的GatewayWorker服务进行register,则将新的Gateway内部通讯地址列表将广播给所有buiness并建立连接
    
    6、如果有GatewayWorker下线,则Register服务会收到通知,会将该GatewayWorker内部通讯地址删除,然后广播新的内部通讯地址列表给所有business
    
    7、此时GatewayWorker与buiness已经建立起长连接
    
    8、客户端的事件及接受的数据全部由GatewayWorker转发给business进行处理。
    

    目录结构

    ├── Applications // 项目应用目录
    │   └── YourAppGateway  // 建立一个存放workman的目录,名字随意
    │       ├── Events.php // 处理主逻辑业务的文件,管理onConnect onMessage onClose 等方法
    │       ├── start_gateway.php // gateway进程启动脚本、配置服务注册地址、端口号、进程数等参数
    │       ├── start_businessworker.php // 用户进程的启动脚本
    │       └── start_register.php // 注册服务的启动脚本
    │
    ├── start.php // 全局启动脚本,此脚本会依次加载Applications/YourAppGateway/start*.php对所有脚本进行启动
    │
    └── vendor    // GatewayWorker框架和Workerman框架源码目录
    
    

    GatewayWorker实现

    以宝塔为例

    1.安装composer

    登录SSH终端,使用以下命令下载Composer的安装脚本:

    php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"
    

    运行下面的命令来安装Composer:

    php composer-setup.php --install-dir=/usr/local/bin --filename=composer
    

    检查composer版本

    composer -v //检查composer版本
    

    2.安装workerman

    在项目根目录打开宝塔终端,输入以下命令安装workman

    composer require topthink/think-worker
    

    3.安装GatewayWorker

    在项目根目录打开宝塔终端,输入以下命令安装GatewayWorker

    composer require workerman/gateway-worker
    

    4.实现代码

    可以选择官方提供的demo 链接:http://www.workerman.net/download/GatewayWorker.zip

    或者使用我根据demo改编而来的

    先在项目应用目录(一般是Applications)下新建一个文件存储以下四个进程文件

    start_gateway.php

    <?php 
    use \Workerman\Worker;
    use \Workerman\WebServer;
    use \GatewayWorker\Gateway;
    use \GatewayWorker\BusinessWorker;
    use \Workerman\Autoloader;
    
    // 自动加载类
    require_once __DIR__ . '/../../vendor/autoload.php';
    
    // gateway 进程,这里使用Text协议,可以用telnet测试
    $gateway = new Gateway("websocket://0.0.0.0:8283");
    // gateway名称,status方便查看
    $gateway->name = 'YourAppGateway';
    // gateway进程数
    $gateway->count = 200;
    // 本机ip,分布式部署时使用内网ip
    $gateway->lanIp = '127.0.0.1';
    // 内部通讯起始端口,假如$gateway->count=4,起始端口为4000
    // 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口 
    $gateway->startPort = 2900;
    // 服务注册地址、端口
    $gateway->registerAddress = '127.0.0.1:1237';
    
    // 心跳间隔
    //$gateway->pingInterval = 10;
    // 心跳数据
    //$gateway->pingData = '{"type":"ping"}';
    
    /* 
    // 当客户端连接上来时,设置连接的onWebSocketConnect,即在websocket握手时的回调
    $gateway->onConnect = function($connection)
    {
        $connection->onWebSocketConnect = function($connection , $http_header)
        {
            // 可以在这里判断连接来源是否合法,不合法就关掉连接
            // $_SERVER['HTTP_ORIGIN']标识来自哪个站点的页面发起的websocket链接
            if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net')
            {
                $connection->close();
            }
            // onWebSocketConnect 里面$_GET $_SERVER是可用的
            // var_dump($_GET, $_SERVER);
        };
    }; 
    */
    
    // 如果不是在根目录启动,则运行runAll方法
    if(!defined('GLOBAL_START'))
    {
        Worker::runAll();
    }
    

    start_businessworker.php

    <?php 
    use Workerman\Worker;
    use Workerman\WebServer;
    use GatewayWorker\Gateway;
    use GatewayWorker\BusinessWorker;
    use Workerman\Autoloader;
    
    // 自动加载类
    require_once __DIR__ . '/../../vendor/autoload.php';
    
    // bussinessWorker 进程
    $worker = new BusinessWorker();
    // worker名称
    $worker->name = 'YourAppBusinessWorker';
    // bussinessWorker进程数量
    $worker->count = 200;
    // 服务注册地址、端口
    $worker->registerAddress = '127.0.0.1:1237';
    
    // 如果不是在根目录启动,则运行runAll方法
    if(!defined('GLOBAL_START'))
    {
        Worker::runAll();
    }
    

    start_register.php

    <?php 
    use \Workerman\Worker;
    use \GatewayWorker\Register;
    
    // 自动加载类
    require_once __DIR__ . '/../../vendor/autoload.php';
    
    // register 必须是text协议 1237为端口
    $register = new Register('text://0.0.0.0:1237');
    
    // 如果不是在根目录启动,则运行runAll方法
    if(!defined('GLOBAL_START'))
    {
        Worker::runAll();
    }
    

    Events.php

    <?php
    
    use \GatewayWorker\Lib\Gateway;
    
    /**
     * 主逻辑
     * 主要是处理 onConnect onMessage onClose 三个方法
     */
    class Events
    {
        /**
         * 当客户端连接时触发
         * 
         * @param int $client_id 连接id
         */
        public static function onConnect($client_id)
        {
            echo "【新的客户端链接】:client_id:".$client_id.PHP_EOL;
            // 向当前client_id发送数据 
            Gateway::sendToClient($client_id, "");
            
            // 向所有人发送
            $data=[
                'client_id'=>$client_id,
                'message'=>'欢迎'.$client_id.'登录!',
                'data'=>[]
            ];
            Gateway::sendToAll(json_encode($data));
            // Gateway::sendToAll("$client_id login\r\n");
        }
        
       /**
        * 当客户端发来消息时触发
        * @param int $client_id 连接id
        * @param mixed $message 具体消息
        */
       public static function onMessage($client_id, $message){
           
           $data=[
                'client_id'=>$client_id,
                'message'=>$client_id.'说:'.$result['message'],
                'data'=>$message
            ];
            Gateway::sendToAll(json_encode($data));
            // 向所有人发送 
            // Gateway::sendToAll("$client_id said $message\r\n");
       }
       
       /**
        * 当用户断开连接时触发
        * @param int $client_id 连接id
        */
       public static function onClose($client_id)
       {
           // 向所有人发送 
        //   GateWay::sendToAll("$client_id 退出了!\r\n");
       }
    }
    

    随后在项目的根目录下新建一个启动文件

    start_all_workman.php

    <?php
    
    ini_set('display_errors', 'on');
    use Workerman\Worker;
    
    if(strpos(strtolower(PHP_OS), 'win') === 0)
    {
        exit("start.php not support windows, please use start_for_win.bat\n");
    }
    
    // 检查扩展
    if(!extension_loaded('pcntl'))
    {
        exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
    }
    
    if(!extension_loaded('posix'))
    {
        exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
    }
    
    // 标记是全局启动
    define('GLOBAL_START', 1);
    
    require_once __DIR__ . '/vendor/autoload.php';
    
    // 加载所有Applications/*/start.php,以便启动所有服务
    foreach(glob(__DIR__.'/application/此处请改成你自己命名存放workman的目录名/start*.php') as $start_file)
    {
        require_once $start_file;
    }
    // 运行所有服务
    Worker::runAll();
    
    

    注意开启端口后记得去放行端口!!!除了宝塔放行以外,你的服务器(阿里云/腾讯云等等)也记得要去放行!!!

    启动workman

    在项目根目录下打开终端,输入php start_all_workman.php start -d ,开启守护进程,如果出现一下页面即成功开启

    image

    如想关闭workman进程则输入php start_all_workman.php stop 进行关闭

    GatewayWorker使用

    如果你的网站使用的是Https协议的话,WebSocket必须使用wss协议
    但是wss协议不支持IP:端口的形式,而是只能写域名+url
    所以为了解决使用https协议而WebSocket不能连接的问题,可以使用Nginx进行反向代理
    在网站配置文件的server下加入以下代码

    location /connectWorkman(名字随你取,别跟其他反向代理重名就行)
      {
        proxy_pass http://127.0.0.1:8283;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_set_header X-Real-IP $remote_addr;
       }
    

    前端使用(uniapp)

    init() {
    	SocketTask = uni.connectSocket({
    		url: 'wss://chat.gdpaimaihui.com/auction', //正式
    		header: {
    			'content-type': 'application/json'
    		},
    		success: function(res) {
    			console.log('WebSocket连接创建', res);
    		},
    		fail: function(err) {
    			uni.showToast({
    				title: '网络异常!',
    				icon: 'none'
    			});
    			console.log(err);
    		}
    	});
    
    	//websocket监听事件
    	SocketTask.onOpen((res) => {
    		socketOpen = true
    		canReconnect = true
    		console.log('监听 WebSocket 连接打开事件。', res);
    		//websocket连接后可以启动个定时器,每隔一段时间进行心跳一次,以防心跳停止断开连接
    		this.timer = setInterval(() => {
    			SocketTask.send({
    				data: '心跳',
    				success() {
    					// console.log('发送心跳成功');
    				}
    			})
    		}, 2000)
    	});
    	SocketTask.onError((onError) => {
    		console.log('监听 WebSocket 错误。错误信息', onError);
    		socketOpen = false;
    		if (canReconnect) {
    			this.reconnect()
    			canReconnect = false
    		}
    	});
    
    	SocketTask.onMessage((res) => {
    		console.log('监听WebSocket接受到服务器的消息事件。服务器返回的消息', res);
    	});
    },
    
    //重新连接
    reconnect() {
    	if (!socketOpen) {
    		let count = 0;
    		reconnectInterval = setInterval(() => {
    			console.log("正在尝试重连")
    			uni.showToast({
    				title: '正在尝试重连',
    				icon: 'none'
    			})
    			this.init();
    			count++
    			console.log();
    			//重连一定次数后就不再重连
    			if (count >= reconnectTimes) {
    				clearInterval(reconnectInterval)
    				uni.showToast({
    					title: '网络异常或服务器错误',
    					icon: 'none'
    				})
    			}
    		}, reconnectDelay)
    	}
    }
    

    上述为之前给公司做内部通讯软件时个人整理内容,水平有限,如有错误之处,望各位园友不吝赐教!如果觉得不错,请点击推荐和关注!谢谢~๑•́₃•̀๑ [鲜花][鲜花][鲜花]