By Bruce Dou @ Swoole Labs
Redis provides the commands for message queue and pubsub: RPUSH
, BLPOP
, PUBLISH
, SUBSCRIBE
.
Messages can be ingested into Redis with Redis Client API. But how to consume and process, broadcast these messages?
These are some examples of processing messages from Redis Queue or Redis PubSub with Swoole and PHP:
<?php
go(function () {
$redis = new Swoole\Coroutine\Redis();
$redis->connect("127.0.0.1", 6379);
$msg = $redis->subscribe(array("msg_1"));
while ($msg = $redis->recv())
{
var_dump($msg);
}
});
workerStart
callback function in a Swoole worker process<?php
$server->on("workerStart", function($serv, $worker_id) {
while(1) {
$data = \Redis::blPop(["msg_queue"], 3);
// process data only
}
});
Redis API is used in this example, use this when you only have to process the data like in a daemon process.
workerStart
callback function in a Swoole worker process<?php
$server = new swoole_websocket_server("0.0.0.0", 9501);
$server->on("workerStart", function ($server, $workerId) {
$client = new \swoole_redis;
$client->on("message", function (\swoole_redis $client, $data) use ($server) {
// process data, broadcast to websocket clients
if ($result[0] == 'message') {
foreach($server->connections as $fd) {
$server->push($fd, $result[1]);
}
}
});
$client->connect("127.0.0.1", 6379, function (swoole_redis $client, $result) {
$client->subscribe("msg_queue");
});
});
$server->on("open", function ($server, $request) {
});
$server->on("message", function (swoole_websocket_server $server, $frame) {
$server->push($frame->fd, "hello");
});
$server->on("close", function ($serv, $fd) {
});
$server->start();
\swoole_redis
or \Swoole\Redis
is the non-blocking style Redis API provided by Swoole, allowing you subscribe to a Redis pubsub channel and also communicate with Swoole server.
References:
Join 4,000+ others and never miss out on new tips, tutorials, and more.