分享一个workerman实时推送实例复制粘贴就可用
<?php
use WorkermanWorker;
use WorkermanLibTimer;
use ChannelServer as ChannelServer;
use ChannelClient as ChannelClient;
require_once __DIR__ . '/vendor/autoload.php';
// 1. 读取 .env 文件
function parseEnv($file) {
$env = [];
if (!is_file($file)) return $env;
foreach (file($file, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES) as $line) {
$line = trim($line);
if ($line === '' || strpos($line, '#') === 0) continue;
$parts = explode('=', $line, 2);
if (count($parts) == 2) {
$env[trim($parts[0])] = trim($parts[1]);
}
}
return $env;
}
$env = parseEnv(__DIR__ . '/.env');
define('DB_TYPE', $env['DB_TYPE'] ?? 'mysql');
define('DB_HOST', $env['DB_HOST'] ?? '127.0.0.1');
define('DB_NAME', $env['DB_NAME'] ?? '');
define('DB_USER', $env['DB_USER'] ?? '');
define('DB_PWD', $env['DB_PASS'] ?? '');
define('DB_PORT', $env['DB_PORT'] ?? '3306');
define('DB_CHARSET', $env['DB_CHARSET'] ?? 'utf8mb4');
define('DB_PREFIX', $env['DB_PREFIX'] ?? '');
define('DB_TABLE', DB_PREFIX . 'broadcast_msgs');
// 2. 获取数据库中最新N条消息
function getLatestMsgs($limit = 10) {
static $pdo = null;
$dsn = DB_TYPE . ':host=' . DB_HOST . ';port=' . DB_PORT . ';dbname=' . DB_NAME . ';charset=' . DB_CHARSET;
if (!$pdo) {
$pdo = new PDO($dsn, DB_USER, DB_PWD, [PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]);
}
$sql = "SELECT msg FROM " . DB_TABLE . " WHERE status=1 ORDER BY created_at DESC, id DESC LIMIT {$limit}";
$stmt = $pdo->query($sql);
$rows = $stmt->fetchAll(PDO::FETCH_ASSOC);
return array_reverse(array_column($rows, 'msg'));
}
// 3. 启动 Channel 服务
$channel_server = new ChannelServer('0.0.0.0', 2206);
// 4. WS 服务
$ws_worker = new Worker("websocket://0.0.0.0:2346");
$ws_worker->name = 'BroadcastWS';
// 5. HTTP 服务
$http_worker = new Worker("http://127.0.0.1:2347");
$http_worker->name = 'BroadcastHttp';
// ========== WS服务内订阅 channel ==========
// 实现轮播和实时推送的完整逻辑
$ws_worker->onWorkerStart = function($worker) {
ChannelClient::connect('127.0.0.1', 2206);
// 全局缓存轮播用的消息和下标
$worker->msg_list = getLatestMsgs(10); // 初始取10条
$worker->msg_index = 0;
// 1. 实时推送新消息
ChannelClient::on('broadcast', function($data) use ($worker) {
$msg = $data['msg'];
// 插入头部并保持10条
array_unshift($worker->msg_list, $msg);
$worker->msg_list = array_slice($worker->msg_list, 0, 10);
$worker->msg_index = 0; // 重置为最新
foreach ($worker->connections as $conn) {
$conn->send(json_encode(['msg' => $msg]));
}
});
// 2. 定时轮播(每3秒一条)
Timer::add(3, function() use ($worker) {
// 没有消息就重新获取
if (empty($worker->msg_list)) {
$worker->msg_list = getLatestMsgs(10);
$worker->msg_index = 0;
}
if (!empty($worker->msg_list)) {
$msg = $worker->msg_list[$worker->msg_index];
foreach ($worker->connections as $conn) {
$conn->send(json_encode(['msg' => $msg]));
}
// 下一个,下标轮回
$worker->msg_index++;
if ($worker->msg_index >= count($worker->msg_list)) {
// 每轮播完一遍,重新拉取最新的10条
$worker->msg_list = getLatestMsgs(10);
$worker->msg_index = 0;
}
}
});
};
// 新连接:推送历史10条
$ws_worker->onConnect = function($connection) {
$msgs = getLatestMsgs(10);
foreach($msgs as $msg) {
$connection->send(json_encode(['msg' => $msg]));
}
};
// 收到前端消息(一般不用处理)
$ws_worker->onMessage = function($connection, $data) {
$connection->send(json_encode(['msg' => '服务端已收到: '.$data]));
};
// HTTP服务:接收后台POST推送
$http_worker->onMessage = function($connection, $request) {
$msg = '';
// Workerman 的 request->post
if (method_exists($request, 'post')) {
$msg = $request->post('msg', '');
}
// 再兼容 JSON
if (!$msg) {
$body = $request->rawBody();
if ($body) {
$arr = @json_decode($body, true);
if (isset($arr['msg'])) $msg = $arr['msg'];
if (!$msg && strpos($body, 'msg=') !== false) {
parse_str($body, $arr2);
if (isset($arr2['msg'])) $msg = $arr2['msg'];
}
}
}
if (!$msg && method_exists($request, 'get')) {
$msg = $request->get('msg', '');
}
if ($msg) {
ChannelClient::connect('127.0.0.1', 2206);
ChannelClient::publish('broadcast', ['msg' => $msg]);
$connection->send('{"code":200,"msg":"ok"}');
} else {
$connection->send('{"code":400,"msg":"empty msg"}');
}
};
// 启动所有服务
Worker::runAll();
版权说明
文章采用: 《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权。版权声明:未标注转载均为本站原创,转载时请以链接形式注明文章出处。如有侵权、不妥之处,请联系站长删除。敬请谅解!