Using Swoole and Redis for Message Queue and PubSub

Published:

By Bruce Dou @ Swoole Labs

Redis provides the commands for message queue and pubsub: RPUSH, BLPOP, PUBLISH, SUBSCRIBE.

Messages can be ingested into Redis with Redis Client API. But how to consume and process, broadcast these messages?

These are some examples of processing messages from Redis Queue or Redis PubSub with Swoole and PHP:

Coroutine Swoole Redis style is as simple as normal PHP style

<?php
go(function () {
    $redis = new Swoole\Coroutine\Redis();
    $redis->connect("127.0.0.1", 6379);
    $msg = $redis->subscribe(array("msg_1"));
    while ($msg = $redis->recv())
    {
      var_dump($msg);
    }
});

Listen to a blocking Queue on Redis at workerStart callback function in a Swoole worker process

<?php
$server->on("workerStart", function($serv, $worker_id) {

 while(1) {
   $data = \Redis::blPop(["msg_queue"], 3);
   // process data only
 }

});

Redis API is used in this example, use this when you only have to process the data like in a daemon process.

Subscribe to a Redis PubSub channel at workerStart callback function in a Swoole worker process

<?php
$server = new swoole_websocket_server("0.0.0.0", 9501);

$server->on("workerStart", function ($server, $workerId) {
   $client = new \swoole_redis;
   $client->on("message", function (\swoole_redis $client, $data) use ($server) {
       // process data, broadcast to websocket clients
        if ($result[0] == 'message') {
            foreach($server->connections as $fd) {
                $server->push($fd, $result[1]);
            }
        }
   });
   $client->connect("127.0.0.1", 6379, function (swoole_redis $client, $result) {
       $client->subscribe("msg_queue");
   });
});

$server->on("open", function ($server, $request) {

});

$server->on("message", function (swoole_websocket_server $server, $frame) {
    $server->push($frame->fd, "hello");
});

$server->on("close", function ($serv, $fd) {

});

$server->start();

\swoole_redis or \Swoole\Redis is the non-blocking style Redis API provided by Swoole, allowing you subscribe to a Redis pubsub channel and also communicate with Swoole server.

References: