【有附件】thinkphp5直接整合workerman的gateway,做websocket推送、IM通讯等服务,写了个完整demo

为了更好推送消息、做IM客服,在tp5引入workerman作为常驻内存的工具,经过整合后让workerman很好地融合在tp下面,不用独立部署和独立引入数据库操作类。

由于自己的框架是5.0.*版本,和5.1有区别,所以下载的是"topthink/think-worker": "1.0.*"(根据自己框架选择即可)。


操作步骤:

  1. 用composer安装worker(tp5.1以上不用加版本号)

 composer require topthink/think-worker=1.0.*

  2.用composer安装gateway

composer require workerman/gateway-worker

3.在网站根目录新增目录workman(自定义就行,里面的文件路径加载正常即可)

在workman新增service_all.php,内容如下

#!/usr/bin/env php
<?php
define('APP_PATH', __DIR__ . '/../application/');
#绑定模块,这是一个总模块,在linux直接启动这个就好了,windows需要用另外的独立打开(用bat那个文件启动也行)
define('BIND_MODULE','push/Run');
# 加载框架引导文件
require __DIR__ . '/../thinkphp/start.php';


4.在application下面新增目录push/controller并新增Run.php类,内容如下:

<?php
/**
 * 全部启动功能
 * linux 下使用,全部运行功能,对应workerman/service_all.php
 */

namespace app\push\controller;

use GatewayWorker\BusinessWorker;
use Workerman\Worker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;

class Run extends Worker
{
    public function __construct()
    {
        $this->start_register();
        $this->start_business();
        $this->start_gateway();
        //运行所有Worker;
        Worker::runAll();
    }

    //开始注册
    private function start_register()
    {
        $register = new Register('text://0.0.0.0:8001');
        $register->name = 'BS_SHOPRegister';
    }

    //初始化 bussinessWorker 进程,注意最后的绑定事件,这样可以融合在自定义的模块
    private function start_business()
    {
        $worker = new BusinessWorker();
        $worker->name = 'BS_SHOPBusinessWorker';
        $worker->count = 4;
        $worker->registerAddress = '127.0.0.1:8001';
        //绑定事件监听类
        $worker->eventHandler = '\app\push\controller\Events';
        //异常日志目录
                $worker::$logFile = RUNTIME_PATH.'log/workman.log';
    }

    //初始化gateway
    private function start_gateway()
    {
        $gateway = new Gateway("websocket://0.0.0.0:8002");
        $gateway->name = 'BS_SHOPGateway';
        $gateway->count = 2;
        //lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0

        $gateway->lanIp = '127.0.0.1';
        $gateway->startPort = 2900;
        $gateway->registerAddress = '127.0.0.1:8001';
    }


}

注意:windows下不允许一个php文件运行多个进程,所以我拆开多个start服务在此下面,在workerman绑定多个service同时启动,文件打包在附件中

微信截图_20220319143039.png


5.启动服务(linux),建议先看6,创建好event文件先。

php service_all.php start -d


注意:windows下不允许一个php文件运行多个进程,所以我拆开多个service服务脚本在workerman下面,用bat命令启动多个用于在windows测试,文件打包在附件中

微信截图_20220319142658.png


6.创建event事件接收文件,在application/push/controller新增Events.php类,内容如下:

<?php
namespace app\push\controller;

use think\Controller;
use think\Db;
use GatewayWorker\Lib\Gateway;
/**

 * 主要是处理 onConnect onMessage onClose 三个方法
 */
class Events extends Controller
{
    /**
     * 当客户端发来消息时触发
     * @param int $client_id 连接id
     * @param mixed $message 具体消息
     */
    public static function onMessage($client_id, $message)
    {
        $message =  @json_decode($message, true);
        if (empty($message['type'])){
            return ;
        }
        $type = $message['type'];
        switch ($type) {
            case 'start':
                if (empty($message['bsid'])){
                    return ;
                }
                //需要绑定的ID,可以是用户登录的ID、uid、特殊标识
                $bsid = $message['bsid'];
                //保存
                $_SESSION['bs_id'] = $bsid;
                $_SESSION['bs_name'] = 'BS_SHOP_'.rand(1000,10000);
                //绑定UID到当前登录的id,方便推送消息
                // 将当前链接与uid绑定
                Gateway::bindUid($client_id, $bsid);
                // 通知当前客户端初始化
                $message_data = array(
                    'type' => 'login',
                    'code' => 0,
                    'message' =>'success',
                    'time'=>time(),
                    'data'=>[

                    ]
                );
                //给当前登录的id发送消息
               // Gateway::sendToClient($client_id, json_encode($message_data,JSON_UNESCAPED_UNICODE));

                //给所有在线用户发送消息
                /*$message_data = array(
                    'type' => 'info',
                    'code' => 0,
                    'message' =>'欢迎新用户登录:'.$bsid,
                    'time'=>time(),
                    'data'=>[

                    ]
                );
                Gateway::sendToAll(json_encode($message_data,JSON_UNESCAPED_UNICODE));*/
                break;
                return;
            case 'ping':
                //心跳检测,返回ok
                Gateway::sendToClient($client_id, 'pong');
                return;
            case 'test':
                //随机返回句子
                $arr = [
                    1 => '机遇对于有准备的头脑有特别的亲和力。', 2 => '不求与人相比,但求超越自己,要哭就哭出激动的泪水,要笑就笑出成长的性格!', 3 => '在你内心深处,还有无穷的潜力,有一天当你回首看时,你就会知道这绝对是真的。', 4 => '无论你觉得自己多么的了不起,也永远有人比你更强;无论你觉得自己多么的不幸,永远有人比你更加不幸。', 5 => '不要浪费你的生命,在你一定会后悔的地方上。', 6 => '放弃该放弃的是无奈,放弃不该放弃的是无能;不放弃该放弃的是无知,不放弃不该放弃的是执着。', 7 => '不要轻易用过去来衡量生活的幸与不幸!每个人的生命都是可以绽放美丽的,只要你珍惜。', 8 => '千万别迷恋网络游戏,要玩就玩好人生这场大游戏。', 9 => '过错是暂时的遗憾,而错过则是永远的遗憾!', 10 => '人生是个圆,有的人走了一辈子也没有走出命运画出的圆圈,其实,圆上的每一个点都有一条腾飞的切线。'
                ];
                $k = array_rand($arr);
                $message_data = array(
                    'type' => 'text',
                    'code' => 0,
                    'message' =>'success',
                    'time'=>time(),
                    'data'=>[
                        'content'=>$arr[$k]
                    ]
                );
                //给当前登录的id发送消息
                Gateway::sendToClient($client_id, json_encode($message_data,JSON_UNESCAPED_UNICODE));
                return;
            default:

                Gateway::sendToClient($client_id, json_encode(self::other_logic($message),JSON_UNESCAPED_UNICODE));
                return;

        }
        return ;
    }

    //其它逻辑
    private static function other_logic($message)
    {
        $data = array(
            'type' => 'unknown',
            'code' => 0,
            'message' =>'未知消息类型',
            'time'=>time(),
            'data'=>[
               'content' =>'abcccc'
            ]
        );
        return $data;
    }

    /**
     * 当客户端连接时触发
     * @param int $client_id 连接id
     */
    public static function onConnect($client_id)
    {

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public static function onClose($client_id)
    {
        $message_data = array(
            'type' => 'info',
            'code' => 0,
            'message' =>'用户退出:'.$_SESSION['bs_id'],
            'data'=>[
                'time'=>time()
            ]
        );
        Gateway::sendToAll(json_encode($message_data,JSON_UNESCAPED_UNICODE));
    }

    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public static function onError($client_id, $code, $msg)
    {
        echo "error $code $msg\r\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public static function onWorkerStart($worker)
    {

    }
}

7.新建test.html(任意地方),内容如下:

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>测试wesocket</title>
    <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
    <style>
        .abc{
            padding: 8px;
        }
    </style>
</head>
<body>
<div class="btn-group">
    <button class="btn abc">发送消息1</button>

    <button class="send2" style="padding: 8px;">发送消息2</button>
    <div class="res">
        <p>下面的返回的消息</p>
        <div class="bs-data" style="color: red;"></div>
    </div>
</div>
<script>
    var ws;//websocket实例
    var lockReconnect = false;//避免重复连接
    var wsUrl = 'ws:127.0.0.1:8002';

    function createWebSocket(url) {
        try {
            ws = new WebSocket(url);
            initEventHandle();
        } catch (e) {
            reconnect(url);
        }
    }


    function initEventHandle() {
        ws.onclose = function () {
            reconnect(wsUrl);
        };
        ws.onerror = function () {
            reconnect(wsUrl);
        };
        ws.onopen = function () {
            //心跳检测重置
            heartCheck.reset().start();
            login();
        };
        ws.onmessage = function (event) {
            //如果获取到消息,心跳检测重置
            //拿到任何消息都说明当前连接是正常的
            heartCheck.reset().start();
            onmessage(event)
        }
    }

    function reconnect(url) {
        if(lockReconnect) return;
        lockReconnect = true;
        //没连接上会一直重连,设置延迟避免请求过多
        setTimeout(function () {
            createWebSocket(url);
            lockReconnect = false;
        }, 2000);
    }


    //心跳检测
    var heartCheck = {
        timeout: 10000,//10秒
        timeoutObj: null,
        reset: function(){
            clearTimeout(this.timeoutObj);
            return this;
        },
        start: function(){
            this.timeoutObj = setTimeout(function(){
                //这里发送一个心跳,后端收到后,返回一个心跳消息,
                //onmessage拿到返回的心跳就说明连接正常
                ws.send('{"type":"ping"}');
            }, this.timeout)
        }
    }

    //初始化登录
    function login(){
        ws.send('{"type":"start","bsid":"bsshoptest"}');
    }

    //消息接收
    function onmessage(e){
        console.log('收到消息啦',e.data)
        var data = e.data;
        if (data==='pong'){
            return ;
        }
        data = $.parseJSON(data);
        var content = '';
        switch (data.type){
            case  'text':
                content = data.data.content;
                break;
            default:
                content = data.data.content?data.data.content: '未知';
                break;
        }

        $(".bs-data").html(content)
    }

    createWebSocket(wsUrl);


    $(function (){
        $(".btn").on('click',function () {
           return ws.send('{"type":"test"}');
        });
        $(".send2").on('click',function () {
            return  ws.send('{"type":"abc"}');
        });
    })
</script>

</body>
</html>

8.打开test.html在浏览器测试

微信截图_20220319143503.png

附件里面包括了上面截图的核心文件,即workerman目录和push目录下面的文件,不包括composer下载的workerman组件和tp框架。

tpsocketdemo.zip



注意事项:

仅限测试,使用前请自己修改端口、功能逻辑。

评论/留言