多语言翻译
待翻译
统计
  • 建站日期:2021-03-10
  • 文章总数:9 篇
  • 评论总数:1 条
  • 分类总数:8 个
  • 最后更新:7月24日
文章 技术

分享一个workerman实时推送实例复制粘贴就可用

知序博客
首页 技术 正文
广告

分享一个workerman实时推送实例复制粘贴就可用

分享一个workerman实时推送实例复制粘贴就可用
-知序博客
-第1
张图片

<?php
use WorkermanWorker;
use WorkermanLibTimer;
use ChannelServer as ChannelServer;
use ChannelClient as ChannelClient;

require_once __DIR__ . '/vendor/autoload.php';

// 1. 读取 .env 文件
function parseEnv($file) {
    $env = [];
    if (!is_file($file)) return $env;
    foreach (file($file, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES) as $line) {
        $line = trim($line);
        if ($line === '' || strpos($line, '#') === 0) continue;
        $parts = explode('=', $line, 2);
        if (count($parts) == 2) {
            $env[trim($parts[0])] = trim($parts[1]);
        }
    }
    return $env;
}

$env = parseEnv(__DIR__ . '/.env');
define('DB_TYPE',     $env['DB_TYPE'] ?? 'mysql');
define('DB_HOST',     $env['DB_HOST'] ?? '127.0.0.1');
define('DB_NAME',     $env['DB_NAME'] ?? '');
define('DB_USER',     $env['DB_USER'] ?? '');
define('DB_PWD',      $env['DB_PASS'] ?? '');
define('DB_PORT',     $env['DB_PORT'] ?? '3306');
define('DB_CHARSET',  $env['DB_CHARSET'] ?? 'utf8mb4');
define('DB_PREFIX',   $env['DB_PREFIX'] ?? '');
define('DB_TABLE',    DB_PREFIX . 'broadcast_msgs');

// 2. 获取数据库中最新N条消息
function getLatestMsgs($limit = 10) {
    static $pdo = null;
    $dsn = DB_TYPE . ':host=' . DB_HOST . ';port=' . DB_PORT . ';dbname=' . DB_NAME . ';charset=' . DB_CHARSET;
    if (!$pdo) {
        $pdo = new PDO($dsn, DB_USER, DB_PWD, [PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]);
    }
    $sql = "SELECT msg FROM " . DB_TABLE . " WHERE status=1 ORDER BY created_at DESC, id DESC LIMIT {$limit}";
    $stmt = $pdo->query($sql);
    $rows = $stmt->fetchAll(PDO::FETCH_ASSOC);
    return array_reverse(array_column($rows, 'msg'));
}

// 3. 启动 Channel 服务
$channel_server = new ChannelServer('0.0.0.0', 2206);

// 4. WS 服务
$ws_worker = new Worker("websocket://0.0.0.0:2346");
$ws_worker->name = 'BroadcastWS';

// 5. HTTP 服务
$http_worker = new Worker("http://127.0.0.1:2347");
$http_worker->name = 'BroadcastHttp';

// ========== WS服务内订阅 channel ==========
// 实现轮播和实时推送的完整逻辑
$ws_worker->onWorkerStart = function($worker) {
    ChannelClient::connect('127.0.0.1', 2206);

    // 全局缓存轮播用的消息和下标
    $worker->msg_list = getLatestMsgs(10); // 初始取10条
    $worker->msg_index = 0;

    // 1. 实时推送新消息
    ChannelClient::on('broadcast', function($data) use ($worker) {
        $msg = $data['msg'];
        // 插入头部并保持10条
        array_unshift($worker->msg_list, $msg);
        $worker->msg_list = array_slice($worker->msg_list, 0, 10);
        $worker->msg_index = 0; // 重置为最新
        foreach ($worker->connections as $conn) {
            $conn->send(json_encode(['msg' => $msg]));
        }
    });

    // 2. 定时轮播(每3秒一条)
    Timer::add(3, function() use ($worker) {
        // 没有消息就重新获取
        if (empty($worker->msg_list)) {
            $worker->msg_list = getLatestMsgs(10);
            $worker->msg_index = 0;
        }
        if (!empty($worker->msg_list)) {
            $msg = $worker->msg_list[$worker->msg_index];
            foreach ($worker->connections as $conn) {
                $conn->send(json_encode(['msg' => $msg]));
            }
            // 下一个,下标轮回
            $worker->msg_index++;
            if ($worker->msg_index >= count($worker->msg_list)) {
                // 每轮播完一遍,重新拉取最新的10条
                $worker->msg_list = getLatestMsgs(10);
                $worker->msg_index = 0;
            }
        }
    });
};

// 新连接:推送历史10条
$ws_worker->onConnect = function($connection) {
    $msgs = getLatestMsgs(10);
    foreach($msgs as $msg) {
        $connection->send(json_encode(['msg' => $msg]));
    }
};

// 收到前端消息(一般不用处理)
$ws_worker->onMessage = function($connection, $data) {
    $connection->send(json_encode(['msg' => '服务端已收到: '.$data]));
};

// HTTP服务:接收后台POST推送
$http_worker->onMessage = function($connection, $request) {
    $msg = '';
    // Workerman 的 request->post
    if (method_exists($request, 'post')) {
        $msg = $request->post('msg', '');
    }
    // 再兼容 JSON
    if (!$msg) {
        $body = $request->rawBody();
        if ($body) {
            $arr = @json_decode($body, true);
            if (isset($arr['msg'])) $msg = $arr['msg'];
            if (!$msg && strpos($body, 'msg=') !== false) {
                parse_str($body, $arr2);
                if (isset($arr2['msg'])) $msg = $arr2['msg'];
            }
        }
    }
    if (!$msg && method_exists($request, 'get')) {
        $msg = $request->get('msg', '');
    }
    if ($msg) {
        ChannelClient::connect('127.0.0.1', 2206);
        ChannelClient::publish('broadcast', ['msg' => $msg]);
        $connection->send('{"code":200,"msg":"ok"}');
    } else {
        $connection->send('{"code":400,"msg":"empty msg"}');
    }
};

// 启动所有服务
Worker::runAll();

版权说明
文章采用: 《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权。
版权声明:未标注转载均为本站原创,转载时请以链接形式注明文章出处。如有侵权、不妥之处,请联系站长删除。敬请谅解!

《万相AI:让你的艺术细胞不再"饿死"在襁褓中》
下一篇 »

发表评论

HI ! 请登录
注册会员,享受下载全站资源特权。

最新评论

1111游客
5小时前

导航内部广告插件