37 lines
888 B
PHP
37 lines
888 B
PHP
|
<?php
|
||
|
declare(strict_types=1);
|
||
|
|
||
|
namespace App\Amqp\Consumer;
|
||
|
|
||
|
use Hyperf\Amqp\Annotation\Consumer;
|
||
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
||
|
use Hyperf\Amqp\Result;
|
||
|
use Hyperf\Server\ServerFactory;
|
||
|
|
||
|
/**
|
||
|
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
|
||
|
*/
|
||
|
class DemoConsumer extends ConsumerMessage
|
||
|
{
|
||
|
/**
|
||
|
* RabbitMQ消费端代码
|
||
|
* @param array $data
|
||
|
* @return string
|
||
|
*/
|
||
|
public function consume($data): string
|
||
|
{
|
||
|
// 获取集合中所有的value
|
||
|
$redis = $this->container->get(\Redis::class);
|
||
|
$fdList = $redis->sMembers('websocket_sjd_1');
|
||
|
|
||
|
$server = $this->container->get(ServerFactory::class)->getServer()->getServer();
|
||
|
foreach ($fdList as $fd) {
|
||
|
if (!empty($fd)) {
|
||
|
$server->push((int)$fd, $data);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return Result::ACK;
|
||
|
}
|
||
|
}
|