hyperf-message-push/hyperf-skeleton/app/Amqp/Consumer/DemoConsumer.php

37 lines
888 B
PHP

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\ServerFactory;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
*/
class DemoConsumer extends ConsumerMessage
{
/**
* RabbitMQ消费端代码
* @param array $data
* @return string
*/
public function consume($data): string
{
// 获取集合中所有的value
$redis = $this->container->get(\Redis::class);
$fdList = $redis->sMembers('websocket_sjd_1');
$server = $this->container->get(ServerFactory::class)->getServer()->getServer();
foreach ($fdList as $fd) {
if (!empty($fd)) {
$server->push((int)$fd, $data);
}
}
return Result::ACK;
}
}