<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\VarDumper\Tests\Caster;
use PHPUnit\Framework\TestCase;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Producer;
use RdKafka\TopicConf;
use Symfony\Component\VarDumper\Test\VarDumperTestTrait;
/**
* @requires extension rdkafka
*
* @group integration
*/
class RdKafkaCasterTest extends TestCase
{
use VarDumperTestTrait;
private const TOPIC = 'test-topic';
private const GROUP_ID = 'test-group-id';
private $hasBroker = false;
private $broker;
protected function setUp(): void
{
if (!$this->hasBroker && getenv('KAFKA_BROKER')) {
$this->broker = getenv('KAFKA_BROKER');
$this->hasBroker = true;
}
}
public function testDumpConf()
{
$conf = new Conf();
$conf->setErrorCb(function ($kafka, $err, $reason) {});
$conf->setDrMsgCb(function () {});
$conf->setRebalanceCb(function () {});
// BC with earlier version of extension rdkafka
foreach (['setLogCb', 'setOffsetCommitCb', 'setStatsCb', 'setConsumeCb'] as $method) {
if (method_exists($conf, $method)) {
$conf->{$method}(function () {});
}
}
$expectedDump = <<<EODUMP
RdKafka\Conf {
builtin.features: "gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins%S"
client.id: "rdkafka"
%A
dr_msg_cb: "0x%x"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $conf);
}
public function testDumpProducer()
{
if (!$this->hasBroker) {
$this->markTestSkipped('Test requires an active broker');
}
$producer = new Producer(new Conf());
$producer->addBrokers($this->broker);
$expectedDump = <<<EODUMP
RdKafka\Producer {
%Aout_q_len: %d
orig_broker_id: 1001
orig_broker_name: "$this->broker/1001"
brokers: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Broker {
id: 1001
host: "%s"
port: %d
}
}
topics: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Topic {
name: "%s"
partitions: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Partition {
id: 0
err: 0
leader: 1001
}%A
}
}%A
}
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $producer);
}
public function testDumpTopicConf()
{
$topicConf = new TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$expectedDump = <<<EODUMP
RdKafka\TopicConf {
request.required.acks: "%i"
request.timeout.ms: "5000"
message.timeout.ms: "300000"
%A
auto.commit.enable: "true"
auto.commit.interval.ms: "60000"
auto.offset.reset: "smallest"
offset.store.path: "."
offset.store.sync.interval.ms: "-1"
offset.store.method: "broker"
consume.callback.max.messages: "0"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $topicConf);
}
public function testDumpKafkaConsumer()
{
if (!$this->hasBroker) {
$this->markTestSkipped('Test requires an active broker');
}
$conf = new Conf();
$conf->set('metadata.broker.list', $this->broker);
$conf->set('group.id', self::GROUP_ID);
$consumer = new KafkaConsumer($conf);
$consumer->subscribe([self::TOPIC]);
$expectedDump = <<<EODUMP
RdKafka\KafkaConsumer {
%Asubscription: array:1 [
0 => "test-topic"
]
assignment: []
orig_broker_id: %i
orig_broker_name: "$this->broker/%s"
brokers: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Broker {
id: 1001
host: "%s"
port: %d
}
}
topics: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Topic {
name: "%s"
partitions: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Partition {
id: 0
err: 0
leader: 1001
}%A
}
}%A
}
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $consumer);
}
public function testDumpProducerTopic()
{
$producer = new Producer(new Conf());
$producer->addBrokers($this->broker);
$topic = $producer->newTopic('test');
$topic->produce(\RD_KAFKA_PARTITION_UA, 0, '{}');
$expectedDump = <<<EODUMP
RdKafka\ProducerTopic {
name: "test"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $topic);
}
public function testDumpMessage()
{
$conf = new Conf();
$conf->set('metadata.broker.list', $this->broker);
$conf->set('group.id', self::GROUP_ID);
$consumer = new KafkaConsumer($conf);
$consumer->subscribe([self::TOPIC]);
// Force timeout
$message = $consumer->consume(0);
$expectedDump = <<<EODUMP
RdKafka\Message {
+err: -185
+topic_name: null
+timestamp: null
+partition: 0
+payload: null
+len: null
+key: null
+offset: 0%A
errstr: "Local: Timed out"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $message);
}
}