Find this useful? Enter your email to receive occasional updates for securing PHP code.

Signing you up...

Thank you for signing up!

PHP Decode

<?php /* * This file is part of the Symfony package. * * (c) Fabien Potencier <fabien@..

Decoded Output download

<?php

/*
 * This file is part of the Symfony package.
 *
 * (c) Fabien Potencier <[email protected]>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Symfony\Component\Messenger\Bridge\Redis\Transport;

use Relay\Relay;
use Relay\Sentinel;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;

/**
 * A Redis connection.
 *
 * @author Alexander Schranz <[email protected]>
 * @author Antoine Bluchet <[email protected]>
 * @author Robin Chalas <[email protected]>
 *
 * @internal
 *
 * @final
 */
class Connection
{
    private const DEFAULT_OPTIONS = [
        'host' => '127.0.0.1',
        'port' => 6379,
        'stream' => 'messages',
        'group' => 'symfony',
        'consumer' => 'consumer',
        'auto_setup' => true,
        'delete_after_ack' => true,
        'delete_after_reject' => true,
        'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
        'dbindex' => 0,
        'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
        'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
        'lazy' => false,
        'auth' => null,
        'serializer' => 1, // see \Redis::SERIALIZER_PHP,
        'sentinel_master' => null, // String, master to look for (optional, default is NULL meaning Sentinel support is disabled)
        'redis_sentinel' => null, // String, alias for 'sentinel_master'
        'timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)
        'read_timeout' => 0.0, //  Float, value in seconds (optional, default is 0 meaning unlimited)
        'retry_interval' => 0, //  Int, value in milliseconds (optional, default is 0)
        'persistent_id' => null, // String, persistent connection id (optional, default is NULL meaning not persistent)
        'ssl' => null, // see https://php.net/context.ssl
    ];

    private \Redis|Relay|\RedisCluster|\Closure $redis;
    private string $stream;
    private string $queue;
    private string $group;
    private string $consumer;
    private bool $autoSetup;
    private int $maxEntries;
    private int $redeliverTimeout;
    private float $nextClaim = 0.0;
    private float $claimInterval;
    private bool $deleteAfterAck;
    private bool $deleteAfterReject;
    private bool $couldHavePendingMessages = true;

    public function __construct(array $options, \Redis|Relay|\RedisCluster|null $redis = null)
    {
        if (version_compare(phpversion('redis'), '4.3.0', '<')) {
            throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
        }

        $options += self::DEFAULT_OPTIONS;
        $host = $options['host'];
        $port = $options['port'];
        $auth = $options['auth'];

        if (isset($options['redis_sentinel']) && isset($options['sentinel_master'])) {
            throw new InvalidArgumentException('Cannot use both "redis_sentinel" and "sentinel_master" at the same time.');
        }

        $sentinelMaster = $options['sentinel_master'] ?? $options['redis_sentinel'] ?? null;

        if (null !== $sentinelMaster && !class_exists(\RedisSentinel::class) && !class_exists(Sentinel::class)) {
            throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
        }

        if (null !== $sentinelMaster && $redis instanceof \RedisCluster) {
            throw new InvalidArgumentException('Cannot configure Redis Sentinel and Redis Cluster instance at the same time.');
        }

        $booleanStreamOptions = [
            'allow_self_signed',
            'capture_peer_cert',
            'capture_peer_cert_chain',
            'disable_compression',
            'SNI_enabled',
            'verify_peer',
            'verify_peer_name',
        ];

        foreach ($options['ssl'] ?? [] as $streamOption => $value) {
            if (\in_array($streamOption, $booleanStreamOptions, true) && \is_string($value)) {
                $options['ssl'][$streamOption] = filter_var($value, \FILTER_VALIDATE_BOOL);
            }
        }

        if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
            $hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
            $this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
        } else {
            $this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
                if (null !== $sentinelMaster) {
                    $sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
                    $hostIndex = 0;
                    $hosts = \is_array($host) ? $host : [['scheme' => 'tcp', 'host' => $host, 'port' => $port]];
                    do {
                        $host = $hosts[$hostIndex]['host'];
                        $port = $hosts[$hostIndex]['port'] ?? 0;
                        $tls = 'tls' === $hosts[$hostIndex]['scheme'];
                        $address = false;

                        if (isset($hosts[$hostIndex]['host']) && $tls) {
                            $host = 'tls://'.$host;
                        }

                        try {
                            if (\extension_loaded('redis') && version_compare(phpversion('redis'), '6.0.0', '>=')) {
                                $params = [
                                    'host' => $host,
                                    'port' => $port,
                                    'connectTimeout' => $options['timeout'],
                                    'persistent' => $options['persistent_id'],
                                    'retryInterval' => $options['retry_interval'],
                                    'readTimeout' => $options['read_timeout'],
                                ];

                                $sentinel = new \RedisSentinel($params);
                            } else {
                                $sentinel = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
                            }

                            if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) {
                                [$host, $port] = $address;
                            }
                        } catch (\RedisException|\Relay\Exception $redisException) {
                        }
                    } while (++$hostIndex < \count($hosts) && !$address);

                    if (!$address) {
                        throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $sentinelMaster), previous: $redisException ?? null);
                    }
                }

                return self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
            };
        }

        if (!$options['lazy']) {
            $this->getRedis();
        }

        foreach (['stream', 'group', 'consumer'] as $key) {
            if ('' === $options[$key]) {
                throw new InvalidArgumentException(sprintf('"%s" should be configured, got an empty string.', $key));
            }
        }

        $this->stream = $options['stream'];
        $this->group = $options['group'];
        $this->consumer = $options['consumer'];
        $this->queue = $this->stream.'__queue';
        $this->autoSetup = $options['auto_setup'];
        $this->maxEntries = $options['stream_max_entries'];
        $this->deleteAfterAck = $options['delete_after_ack'];
        $this->deleteAfterReject = $options['delete_after_reject'];
        $this->redeliverTimeout = $options['redeliver_timeout'] * 1000;
        $this->claimInterval = $options['claim_interval'] / 1000;
    }

    /**
     * @param string|string[]|null $auth
     */
    private static function initializeRedis(\Redis|Relay $redis, string $host, int $port, string|array|null $auth, array $params): \Redis|Relay
    {
        if ($redis->isConnected()) {
            return $redis;
        }

        $connect = isset($params['persistent_id']) ? 'pconnect' : 'connect';
        $redis->{$connect}($host, $port, $params['timeout'], $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...(\defined('Redis::SCAN_PREFIX') || \extension_loaded('relay')) ? [['stream' => $params['ssl'] ?? null]] : []);

        $redis->setOption($redis instanceof \Redis ? \Redis::OPT_SERIALIZER : Relay::OPT_SERIALIZER, $params['serializer']);

        if (null !== $auth && !$redis->auth($auth)) {
            throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
        }

        if (($params['dbindex'] ?? false) && !$redis->select($params['dbindex'])) {
            throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
        }

        return $redis;
    }

    /**
     * @param string|string[]|null $auth
     */
    private static function initializeRedisCluster(?\RedisCluster $redis, array $hosts, string|array|null $auth, array $params): \RedisCluster
    {
        $redis ??= new \RedisCluster(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) ($params['persistent'] ?? false), $auth, ...\defined('Redis::SCAN_PREFIX') ? [$params['ssl'] ?? null] : []);
        $redis->setOption(\Redis::OPT_SERIALIZER, $params['serializer']);

        return $redis;
    }

    public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = [], \Redis|Relay|\RedisCluster|null $redis = null): self
    {
        if (!str_contains($dsn, ',')) {
            $params = self::parseDsn($dsn, $options);

            if (isset($params['host']) && 'rediss' === $params['scheme']) {
                $params['host'] = 'tls://'.$params['host'];
            }
        } else {
            $dsns = explode(',', $dsn);
            $paramss = array_map(function ($dsn) use (&$options) {
                return self::parseDsn($dsn, $options);
            }, $dsns);

            // Merge all the URLs, the last one overrides the previous ones
            $params = array_merge(...$paramss);
            $tls = 'rediss' === $params['scheme'];

            // Regroup all the hosts in an array interpretable by RedisCluster
            $params['host'] = array_map(function ($params) use ($tls) {
                if (!isset($params['host'])) {
                    throw new InvalidArgumentException('Missing host in DSN, it must be defined when using Redis Cluster.');
                }
                if ($tls) {
                    $params['host'] = 'tls://'.$params['host'];
                }

                return $params['host'].':'.($params['port'] ?? 6379);
            }, $paramss, $dsns);
        }

        if ($invalidOptions = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS), ['host', 'port'])) {
            throw new LogicException(sprintf('Invalid option(s) "%s" passed to the Redis Messenger transport.', implode('", "', $invalidOptions)));
        }
        foreach (self::DEFAULT_OPTIONS as $k => $v) {
            $options[$k] = match (\gettype($v)) {
                'integer' => filter_var($options[$k] ?? $v, \FILTER_VALIDATE_INT),
                'boolean' => filter_var($options[$k] ?? $v, \FILTER_VALIDATE_BOOL),
                'double' => filter_var($options[$k] ?? $v, \FILTER_VALIDATE_FLOAT),
                default => $options[$k] ?? $v,
            };
        }

        $pass = '' !== ($params['pass'] ?? '') ? rawurldecode($params['pass']) : null;
        $user = '' !== ($params['user'] ?? '') ? rawurldecode($params['user']) : null;
        $options['auth'] ??= null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user);

        if (isset($params['query'])) {
            parse_str($params['query'], $query);

            if (isset($query['host'])) {
                $tls = 'rediss' === $params['scheme'];
                $tcpScheme = $tls ? 'tls' : 'tcp';

                if (!\is_array($hosts = $query['host'])) {
                    throw new InvalidArgumentException(sprintf('Invalid Redis DSN: "%s".', $dsn));
                }
                foreach ($hosts as $host => $parameters) {
                    if (\is_string($parameters)) {
                        parse_str($parameters, $parameters);
                    }
                    if (false === $i = strrpos($host, ':')) {
                        $hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters;
                    } elseif ($port = (int) substr($host, 1 + $i)) {
                        $hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters;
                    } else {
                        $hosts[$host] = ['scheme' => 'unix', 'host' => substr($host, 0, $i)] + $parameters;
                    }
                }
                $params['host'] = array_values($hosts);
            }
        }

        if (isset($params['host'])) {
            $options['host'] = $params['host'] ?? $options['host'];
            $options['port'] = $params['port'] ?? $options['port'];

            $pathParts = explode('/', rtrim($params['path'] ?? '', '/'));
            $options['stream'] = $pathParts[1] ?? $options['stream'];
            $options['group'] = $pathParts[2] ?? $options['group'];
            $options['consumer'] = $pathParts[3] ?? $options['consumer'];
        } else {
            $options['host'] = $params['path'];
            $options['port'] = 0;
        }

        return new self($options, $redis);
    }

    private static function parseDsn(string $dsn, array &$options): array
    {
        $url = $dsn;
        $scheme = str_starts_with($dsn, 'rediss:') ? 'rediss' : 'redis';

        if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
            $url = str_replace($scheme.':', 'file:', $dsn);
        }

        $url = preg_replace_callback('#^'.$scheme.':(//)?(?:(?:(?<user>[^:@]*+):)?(?<password>[^@]*+)@)?#', function ($m) use (&$auth) {
            if (isset($m['password'])) {
                if (!\in_array($m['user'], ['', 'default'], true)) {
                    $auth['user'] = rawurldecode($m['user']);
                }

                $auth['pass'] = rawurldecode($m['password']);
            }

            return 'file:'.($m[1] ?? '');
        }, $url);

        if (false === $params = parse_url($url)) {
            throw new InvalidArgumentException('The given Redis DSN is invalid.');
        }

        if (null !== $auth) {
            unset($params['user']); // parse_url thinks //0@localhost/ is a username of "0"! doh!
            $params += ($auth ?? []); // But don't worry as $auth array will have user, user/pass or pass as needed
        }

        if (isset($params['query'])) {
            parse_str($params['query'], $dsnOptions);
            $options = array_merge($options, $dsnOptions);
        }
        $params['scheme'] = $scheme;

        return $params;
    }

    private function claimOldPendingMessages(): void
    {
        try {
            // This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
            // https://github.com/antirez/redis/issues/6256
            $pendingMessages = $this->getRedis()->xpending($this->stream, $this->group, '-', '+', 1) ?: [];
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        $claimableIds = [];
        foreach ($pendingMessages as $pendingMessage) {
            if ($pendingMessage[1] === $this->consumer) {
                $this->couldHavePendingMessages = true;

                return;
            }

            if ($pendingMessage[2] >= $this->redeliverTimeout) {
                $claimableIds[] = $pendingMessage[0];
            }
        }

        if (\count($claimableIds) > 0) {
            try {
                $this->getRedis()->xclaim(
                    $this->stream,
                    $this->group,
                    $this->consumer,
                    $this->redeliverTimeout,
                    $claimableIds,
                    ['JUSTID']
                );

                $this->couldHavePendingMessages = true;
            } catch (\RedisException|\Relay\Exception $e) {
                throw new TransportException($e->getMessage(), 0, $e);
            }
        }

        $this->nextClaim = microtime(true) + $this->claimInterval;
    }

    public function get(): ?array
    {
        if ($this->autoSetup) {
            $this->setup();
        }
        $now = microtime();
        $now = substr($now, 11).substr($now, 2, 3);

        $queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now) ?? 0;

        while ($queuedMessageCount--) {
            if (!$message = $this->rawCommand('ZPOPMIN', 1)) {
                break;
            }

            [$queuedMessage, $expiry] = $message;

            if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
                // if a future-placed message is popped because of a race condition with
                // another running consumer, the message is readded to the queue

                if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
                    throw new TransportException('Could not add a message to the redis stream.');
                }

                break;
            }

            $decodedQueuedMessage = json_decode($queuedMessage, true);
            $this->add(rray_key_exists('body', $decodedQueuedMessage) ? $decodedQueuedMessage['body'] : $queuedMessage, $decodedQueuedMessage['headers'] ?? [], 0);
        }

        if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) {
            $this->claimOldPendingMessages();
        }

        $messageId = '>'; // will receive new messages

        if ($this->couldHavePendingMessages) {
            $messageId = '0'; // will receive consumers pending messages
        }
        $redis = $this->getRedis();

        try {
            $messages = $redis->xreadgroup(
                $this->group,
                $this->consumer,
                [$this->stream => $messageId],
                1,
                1
            );
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (false === $messages) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }

            throw new TransportException($error ?? 'Could not read messages from the redis stream.');
        }

        if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
            $this->couldHavePendingMessages = false;

            // No pending messages so get a new one
            return $this->get();
        }

        foreach ($messages[$this->stream] ?? [] as $key => $message) {
            return [
                'id' => $key,
                'data' => $message,
            ];
        }

        return null;
    }

    public function ack(string $id): void
    {
        $redis = $this->getRedis();

        try {
            $acknowledged = $redis->xack($this->stream, $this->group, [$id]);
            if ($this->deleteAfterAck) {
                $acknowledged = $redis->xdel($this->stream, [$id]);
            }
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (!$acknowledged) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
        }
    }

    public function reject(string $id): void
    {
        $redis = $this->getRedis();

        try {
            $deleted = $redis->xack($this->stream, $this->group, [$id]);
            if ($this->deleteAfterReject) {
                $deleted = $redis->xdel($this->stream, [$id]) && $deleted;
            }
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (!$deleted) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
        }
    }

    public function add(string $body, array $headers, int $delayInMs = 0): string
    {
        if ($this->autoSetup) {
            $this->setup();
        }
        $redis = $this->getRedis();

        try {
            if ($delayInMs > 0) { // the delay is <= 0 for queued messages
                $id = uniqid('', true);
                $message = json_encode([
                    'body' => $body,
                    'headers' => $headers,
                    // Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
                    'uniqid' => $id,
                ]);

                if (false === $message) {
                    throw new TransportException(json_last_error_msg());
                }

                $now = explode(' ', microtime(), 2);
                $now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
                if (3 < \strlen($now[0])) {
                    $now[1] += substr($now[0], 0, -3);
                    $now[0] = substr($now[0], -3);

                    if (\is_float($now[1])) {
                        throw new TransportException("Message delay is too big: {$delayInMs}ms.");
                    }
                }

                $added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
            } else {
                $message = json_encode([
                    'body' => $body,
                    'headers' => $headers,
                ]);

                if (false === $message) {
                    throw new TransportException(json_last_error_msg());
                }

                if ($this->maxEntries) {
                    $added = $redis->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
                } else {
                    $added = $redis->xadd($this->stream, '*', ['message' => $message]);
                }

                $id = $added;
            }
        } catch (\RedisException|\Relay\Exception $e) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? $e->getMessage(), 0, $e);
        }

        if (!$added) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? 'Could not add a message to the redis stream.');
        }

        return $id;
    }

    public function setup(): void
    {
        $redis = $this->getRedis();

        try {
            $redis->xgroup('CREATE', $this->stream, $this->group, 0, true);
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        // group might already exist, ignore
        if ($redis->getLastError()) {
            $redis->clearLastError();
        }

        if ($this->deleteAfterAck || $this->deleteAfterReject) {
            $groups = $redis->xinfo('GROUPS', $this->stream);
            if (
                // support for Redis extension version 5+
                (\is_array($groups) && 1 < \count($groups))
                // support for Redis extension version 4.x
                || (\is_string($groups) && substr_count($groups, '"name"'))
            ) {
                throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject cannot be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
            }
        }

        $this->autoSetup = false;
    }

    public function cleanup(): void
    {
        static $unlink = true;
        $redis = $this->getRedis();

        if ($unlink) {
            try {
                $unlink = false !== $redis->unlink($this->stream, $this->queue);
            } catch (\Throwable) {
                $unlink = false;
            }
        }

        if (!$unlink) {
            $redis->del($this->stream, $this->queue);
        }
    }

    public function getMessageCount(): int
    {
        $redis = $this->getRedis();
        $groups = $redis->xinfo('GROUPS', $this->stream) ?: [];

        $lastDeliveredId = null;
        foreach ($groups as $group) {
            if ($group['name'] !== $this->group) {
                continue;
            }

            // Use "lag" key provided by Redis 7.x. See https://redis.io/commands/xinfo-groups/#consumer-group-lag.
            if (isset($group['lag'])) {
                return $group['lag'];
            }

            if (!isset($group['last-delivered-id'])) {
                return 0;
            }

            $lastDeliveredId = $group['last-delivered-id'];
            break;
        }

        if (null === $lastDeliveredId) {
            return 0;
        }

        // Iterate through the stream. See https://redis.io/commands/xrange/#iterating-a-stream.
        $useExclusiveRangeInterval = version_compare(phpversion('redis'), '6.2.0', '>=');
        $total = 0;
        while (true) {
            if (!$range = $redis->xRange($this->stream, $lastDeliveredId, '+', 100)) {
                return $total;
            }

            $total += \count($range);

            if ($useExclusiveRangeInterval) {
                $lastDeliveredId = preg_replace_callback('#\d+$#', static fn (array $matches) => (int) $matches[0] + 1, array_key_last($range));
            } else {
                $lastDeliveredId = '('.array_key_last($range);
            }
        }
    }

    private function rawCommand(string $command, ...$arguments): mixed
    {
        $redis = $this->getRedis();

        try {
            if ($redis instanceof \RedisCluster) {
                $result = $redis->rawCommand($this->queue, $command, $this->queue, ...$arguments);
            } else {
                $result = $redis->rawCommand($command, $this->queue, ...$arguments);
            }
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (false === $result) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
        }

        return $result;
    }

    private function getRedis(): \Redis|Relay|\RedisCluster
    {
        if ($this->redis instanceof \Closure) {
            $this->redis = ($this->redis)();
        }

        return $this->redis;
    }
}
 ?>

Did this file decode correctly?

Original Code

<?php

/*
 * This file is part of the Symfony package.
 *
 * (c) Fabien Potencier <[email protected]>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Symfony\Component\Messenger\Bridge\Redis\Transport;

use Relay\Relay;
use Relay\Sentinel;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;

/**
 * A Redis connection.
 *
 * @author Alexander Schranz <[email protected]>
 * @author Antoine Bluchet <[email protected]>
 * @author Robin Chalas <[email protected]>
 *
 * @internal
 *
 * @final
 */
class Connection
{
    private const DEFAULT_OPTIONS = [
        'host' => '127.0.0.1',
        'port' => 6379,
        'stream' => 'messages',
        'group' => 'symfony',
        'consumer' => 'consumer',
        'auto_setup' => true,
        'delete_after_ack' => true,
        'delete_after_reject' => true,
        'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
        'dbindex' => 0,
        'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
        'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
        'lazy' => false,
        'auth' => null,
        'serializer' => 1, // see \Redis::SERIALIZER_PHP,
        'sentinel_master' => null, // String, master to look for (optional, default is NULL meaning Sentinel support is disabled)
        'redis_sentinel' => null, // String, alias for 'sentinel_master'
        'timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)
        'read_timeout' => 0.0, //  Float, value in seconds (optional, default is 0 meaning unlimited)
        'retry_interval' => 0, //  Int, value in milliseconds (optional, default is 0)
        'persistent_id' => null, // String, persistent connection id (optional, default is NULL meaning not persistent)
        'ssl' => null, // see https://php.net/context.ssl
    ];

    private \Redis|Relay|\RedisCluster|\Closure $redis;
    private string $stream;
    private string $queue;
    private string $group;
    private string $consumer;
    private bool $autoSetup;
    private int $maxEntries;
    private int $redeliverTimeout;
    private float $nextClaim = 0.0;
    private float $claimInterval;
    private bool $deleteAfterAck;
    private bool $deleteAfterReject;
    private bool $couldHavePendingMessages = true;

    public function __construct(array $options, \Redis|Relay|\RedisCluster|null $redis = null)
    {
        if (version_compare(phpversion('redis'), '4.3.0', '<')) {
            throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
        }

        $options += self::DEFAULT_OPTIONS;
        $host = $options['host'];
        $port = $options['port'];
        $auth = $options['auth'];

        if (isset($options['redis_sentinel']) && isset($options['sentinel_master'])) {
            throw new InvalidArgumentException('Cannot use both "redis_sentinel" and "sentinel_master" at the same time.');
        }

        $sentinelMaster = $options['sentinel_master'] ?? $options['redis_sentinel'] ?? null;

        if (null !== $sentinelMaster && !class_exists(\RedisSentinel::class) && !class_exists(Sentinel::class)) {
            throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
        }

        if (null !== $sentinelMaster && $redis instanceof \RedisCluster) {
            throw new InvalidArgumentException('Cannot configure Redis Sentinel and Redis Cluster instance at the same time.');
        }

        $booleanStreamOptions = [
            'allow_self_signed',
            'capture_peer_cert',
            'capture_peer_cert_chain',
            'disable_compression',
            'SNI_enabled',
            'verify_peer',
            'verify_peer_name',
        ];

        foreach ($options['ssl'] ?? [] as $streamOption => $value) {
            if (\in_array($streamOption, $booleanStreamOptions, true) && \is_string($value)) {
                $options['ssl'][$streamOption] = filter_var($value, \FILTER_VALIDATE_BOOL);
            }
        }

        if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
            $hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
            $this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
        } else {
            $this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
                if (null !== $sentinelMaster) {
                    $sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
                    $hostIndex = 0;
                    $hosts = \is_array($host) ? $host : [['scheme' => 'tcp', 'host' => $host, 'port' => $port]];
                    do {
                        $host = $hosts[$hostIndex]['host'];
                        $port = $hosts[$hostIndex]['port'] ?? 0;
                        $tls = 'tls' === $hosts[$hostIndex]['scheme'];
                        $address = false;

                        if (isset($hosts[$hostIndex]['host']) && $tls) {
                            $host = 'tls://'.$host;
                        }

                        try {
                            if (\extension_loaded('redis') && version_compare(phpversion('redis'), '6.0.0', '>=')) {
                                $params = [
                                    'host' => $host,
                                    'port' => $port,
                                    'connectTimeout' => $options['timeout'],
                                    'persistent' => $options['persistent_id'],
                                    'retryInterval' => $options['retry_interval'],
                                    'readTimeout' => $options['read_timeout'],
                                ];

                                $sentinel = new \RedisSentinel($params);
                            } else {
                                $sentinel = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
                            }

                            if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) {
                                [$host, $port] = $address;
                            }
                        } catch (\RedisException|\Relay\Exception $redisException) {
                        }
                    } while (++$hostIndex < \count($hosts) && !$address);

                    if (!$address) {
                        throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $sentinelMaster), previous: $redisException ?? null);
                    }
                }

                return self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
            };
        }

        if (!$options['lazy']) {
            $this->getRedis();
        }

        foreach (['stream', 'group', 'consumer'] as $key) {
            if ('' === $options[$key]) {
                throw new InvalidArgumentException(sprintf('"%s" should be configured, got an empty string.', $key));
            }
        }

        $this->stream = $options['stream'];
        $this->group = $options['group'];
        $this->consumer = $options['consumer'];
        $this->queue = $this->stream.'__queue';
        $this->autoSetup = $options['auto_setup'];
        $this->maxEntries = $options['stream_max_entries'];
        $this->deleteAfterAck = $options['delete_after_ack'];
        $this->deleteAfterReject = $options['delete_after_reject'];
        $this->redeliverTimeout = $options['redeliver_timeout'] * 1000;
        $this->claimInterval = $options['claim_interval'] / 1000;
    }

    /**
     * @param string|string[]|null $auth
     */
    private static function initializeRedis(\Redis|Relay $redis, string $host, int $port, string|array|null $auth, array $params): \Redis|Relay
    {
        if ($redis->isConnected()) {
            return $redis;
        }

        $connect = isset($params['persistent_id']) ? 'pconnect' : 'connect';
        $redis->{$connect}($host, $port, $params['timeout'], $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...(\defined('Redis::SCAN_PREFIX') || \extension_loaded('relay')) ? [['stream' => $params['ssl'] ?? null]] : []);

        $redis->setOption($redis instanceof \Redis ? \Redis::OPT_SERIALIZER : Relay::OPT_SERIALIZER, $params['serializer']);

        if (null !== $auth && !$redis->auth($auth)) {
            throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
        }

        if (($params['dbindex'] ?? false) && !$redis->select($params['dbindex'])) {
            throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
        }

        return $redis;
    }

    /**
     * @param string|string[]|null $auth
     */
    private static function initializeRedisCluster(?\RedisCluster $redis, array $hosts, string|array|null $auth, array $params): \RedisCluster
    {
        $redis ??= new \RedisCluster(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) ($params['persistent'] ?? false), $auth, ...\defined('Redis::SCAN_PREFIX') ? [$params['ssl'] ?? null] : []);
        $redis->setOption(\Redis::OPT_SERIALIZER, $params['serializer']);

        return $redis;
    }

    public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = [], \Redis|Relay|\RedisCluster|null $redis = null): self
    {
        if (!str_contains($dsn, ',')) {
            $params = self::parseDsn($dsn, $options);

            if (isset($params['host']) && 'rediss' === $params['scheme']) {
                $params['host'] = 'tls://'.$params['host'];
            }
        } else {
            $dsns = explode(',', $dsn);
            $paramss = array_map(function ($dsn) use (&$options) {
                return self::parseDsn($dsn, $options);
            }, $dsns);

            // Merge all the URLs, the last one overrides the previous ones
            $params = array_merge(...$paramss);
            $tls = 'rediss' === $params['scheme'];

            // Regroup all the hosts in an array interpretable by RedisCluster
            $params['host'] = array_map(function ($params) use ($tls) {
                if (!isset($params['host'])) {
                    throw new InvalidArgumentException('Missing host in DSN, it must be defined when using Redis Cluster.');
                }
                if ($tls) {
                    $params['host'] = 'tls://'.$params['host'];
                }

                return $params['host'].':'.($params['port'] ?? 6379);
            }, $paramss, $dsns);
        }

        if ($invalidOptions = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS), ['host', 'port'])) {
            throw new LogicException(sprintf('Invalid option(s) "%s" passed to the Redis Messenger transport.', implode('", "', $invalidOptions)));
        }
        foreach (self::DEFAULT_OPTIONS as $k => $v) {
            $options[$k] = match (\gettype($v)) {
                'integer' => filter_var($options[$k] ?? $v, \FILTER_VALIDATE_INT),
                'boolean' => filter_var($options[$k] ?? $v, \FILTER_VALIDATE_BOOL),
                'double' => filter_var($options[$k] ?? $v, \FILTER_VALIDATE_FLOAT),
                default => $options[$k] ?? $v,
            };
        }

        $pass = '' !== ($params['pass'] ?? '') ? rawurldecode($params['pass']) : null;
        $user = '' !== ($params['user'] ?? '') ? rawurldecode($params['user']) : null;
        $options['auth'] ??= null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user);

        if (isset($params['query'])) {
            parse_str($params['query'], $query);

            if (isset($query['host'])) {
                $tls = 'rediss' === $params['scheme'];
                $tcpScheme = $tls ? 'tls' : 'tcp';

                if (!\is_array($hosts = $query['host'])) {
                    throw new InvalidArgumentException(sprintf('Invalid Redis DSN: "%s".', $dsn));
                }
                foreach ($hosts as $host => $parameters) {
                    if (\is_string($parameters)) {
                        parse_str($parameters, $parameters);
                    }
                    if (false === $i = strrpos($host, ':')) {
                        $hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters;
                    } elseif ($port = (int) substr($host, 1 + $i)) {
                        $hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters;
                    } else {
                        $hosts[$host] = ['scheme' => 'unix', 'host' => substr($host, 0, $i)] + $parameters;
                    }
                }
                $params['host'] = array_values($hosts);
            }
        }

        if (isset($params['host'])) {
            $options['host'] = $params['host'] ?? $options['host'];
            $options['port'] = $params['port'] ?? $options['port'];

            $pathParts = explode('/', rtrim($params['path'] ?? '', '/'));
            $options['stream'] = $pathParts[1] ?? $options['stream'];
            $options['group'] = $pathParts[2] ?? $options['group'];
            $options['consumer'] = $pathParts[3] ?? $options['consumer'];
        } else {
            $options['host'] = $params['path'];
            $options['port'] = 0;
        }

        return new self($options, $redis);
    }

    private static function parseDsn(string $dsn, array &$options): array
    {
        $url = $dsn;
        $scheme = str_starts_with($dsn, 'rediss:') ? 'rediss' : 'redis';

        if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
            $url = str_replace($scheme.':', 'file:', $dsn);
        }

        $url = preg_replace_callback('#^'.$scheme.':(//)?(?:(?:(?<user>[^:@]*+):)?(?<password>[^@]*+)@)?#', function ($m) use (&$auth) {
            if (isset($m['password'])) {
                if (!\in_array($m['user'], ['', 'default'], true)) {
                    $auth['user'] = rawurldecode($m['user']);
                }

                $auth['pass'] = rawurldecode($m['password']);
            }

            return 'file:'.($m[1] ?? '');
        }, $url);

        if (false === $params = parse_url($url)) {
            throw new InvalidArgumentException('The given Redis DSN is invalid.');
        }

        if (null !== $auth) {
            unset($params['user']); // parse_url thinks //0@localhost/ is a username of "0"! doh!
            $params += ($auth ?? []); // But don't worry as $auth array will have user, user/pass or pass as needed
        }

        if (isset($params['query'])) {
            parse_str($params['query'], $dsnOptions);
            $options = array_merge($options, $dsnOptions);
        }
        $params['scheme'] = $scheme;

        return $params;
    }

    private function claimOldPendingMessages(): void
    {
        try {
            // This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
            // https://github.com/antirez/redis/issues/6256
            $pendingMessages = $this->getRedis()->xpending($this->stream, $this->group, '-', '+', 1) ?: [];
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        $claimableIds = [];
        foreach ($pendingMessages as $pendingMessage) {
            if ($pendingMessage[1] === $this->consumer) {
                $this->couldHavePendingMessages = true;

                return;
            }

            if ($pendingMessage[2] >= $this->redeliverTimeout) {
                $claimableIds[] = $pendingMessage[0];
            }
        }

        if (\count($claimableIds) > 0) {
            try {
                $this->getRedis()->xclaim(
                    $this->stream,
                    $this->group,
                    $this->consumer,
                    $this->redeliverTimeout,
                    $claimableIds,
                    ['JUSTID']
                );

                $this->couldHavePendingMessages = true;
            } catch (\RedisException|\Relay\Exception $e) {
                throw new TransportException($e->getMessage(), 0, $e);
            }
        }

        $this->nextClaim = microtime(true) + $this->claimInterval;
    }

    public function get(): ?array
    {
        if ($this->autoSetup) {
            $this->setup();
        }
        $now = microtime();
        $now = substr($now, 11).substr($now, 2, 3);

        $queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now) ?? 0;

        while ($queuedMessageCount--) {
            if (!$message = $this->rawCommand('ZPOPMIN', 1)) {
                break;
            }

            [$queuedMessage, $expiry] = $message;

            if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
                // if a future-placed message is popped because of a race condition with
                // another running consumer, the message is readded to the queue

                if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
                    throw new TransportException('Could not add a message to the redis stream.');
                }

                break;
            }

            $decodedQueuedMessage = json_decode($queuedMessage, true);
            $this->add(\array_key_exists('body', $decodedQueuedMessage) ? $decodedQueuedMessage['body'] : $queuedMessage, $decodedQueuedMessage['headers'] ?? [], 0);
        }

        if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) {
            $this->claimOldPendingMessages();
        }

        $messageId = '>'; // will receive new messages

        if ($this->couldHavePendingMessages) {
            $messageId = '0'; // will receive consumers pending messages
        }
        $redis = $this->getRedis();

        try {
            $messages = $redis->xreadgroup(
                $this->group,
                $this->consumer,
                [$this->stream => $messageId],
                1,
                1
            );
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (false === $messages) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }

            throw new TransportException($error ?? 'Could not read messages from the redis stream.');
        }

        if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
            $this->couldHavePendingMessages = false;

            // No pending messages so get a new one
            return $this->get();
        }

        foreach ($messages[$this->stream] ?? [] as $key => $message) {
            return [
                'id' => $key,
                'data' => $message,
            ];
        }

        return null;
    }

    public function ack(string $id): void
    {
        $redis = $this->getRedis();

        try {
            $acknowledged = $redis->xack($this->stream, $this->group, [$id]);
            if ($this->deleteAfterAck) {
                $acknowledged = $redis->xdel($this->stream, [$id]);
            }
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (!$acknowledged) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
        }
    }

    public function reject(string $id): void
    {
        $redis = $this->getRedis();

        try {
            $deleted = $redis->xack($this->stream, $this->group, [$id]);
            if ($this->deleteAfterReject) {
                $deleted = $redis->xdel($this->stream, [$id]) && $deleted;
            }
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (!$deleted) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
        }
    }

    public function add(string $body, array $headers, int $delayInMs = 0): string
    {
        if ($this->autoSetup) {
            $this->setup();
        }
        $redis = $this->getRedis();

        try {
            if ($delayInMs > 0) { // the delay is <= 0 for queued messages
                $id = uniqid('', true);
                $message = json_encode([
                    'body' => $body,
                    'headers' => $headers,
                    // Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
                    'uniqid' => $id,
                ]);

                if (false === $message) {
                    throw new TransportException(json_last_error_msg());
                }

                $now = explode(' ', microtime(), 2);
                $now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
                if (3 < \strlen($now[0])) {
                    $now[1] += substr($now[0], 0, -3);
                    $now[0] = substr($now[0], -3);

                    if (\is_float($now[1])) {
                        throw new TransportException("Message delay is too big: {$delayInMs}ms.");
                    }
                }

                $added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
            } else {
                $message = json_encode([
                    'body' => $body,
                    'headers' => $headers,
                ]);

                if (false === $message) {
                    throw new TransportException(json_last_error_msg());
                }

                if ($this->maxEntries) {
                    $added = $redis->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
                } else {
                    $added = $redis->xadd($this->stream, '*', ['message' => $message]);
                }

                $id = $added;
            }
        } catch (\RedisException|\Relay\Exception $e) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? $e->getMessage(), 0, $e);
        }

        if (!$added) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? 'Could not add a message to the redis stream.');
        }

        return $id;
    }

    public function setup(): void
    {
        $redis = $this->getRedis();

        try {
            $redis->xgroup('CREATE', $this->stream, $this->group, 0, true);
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        // group might already exist, ignore
        if ($redis->getLastError()) {
            $redis->clearLastError();
        }

        if ($this->deleteAfterAck || $this->deleteAfterReject) {
            $groups = $redis->xinfo('GROUPS', $this->stream);
            if (
                // support for Redis extension version 5+
                (\is_array($groups) && 1 < \count($groups))
                // support for Redis extension version 4.x
                || (\is_string($groups) && substr_count($groups, '"name"'))
            ) {
                throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject cannot be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
            }
        }

        $this->autoSetup = false;
    }

    public function cleanup(): void
    {
        static $unlink = true;
        $redis = $this->getRedis();

        if ($unlink) {
            try {
                $unlink = false !== $redis->unlink($this->stream, $this->queue);
            } catch (\Throwable) {
                $unlink = false;
            }
        }

        if (!$unlink) {
            $redis->del($this->stream, $this->queue);
        }
    }

    public function getMessageCount(): int
    {
        $redis = $this->getRedis();
        $groups = $redis->xinfo('GROUPS', $this->stream) ?: [];

        $lastDeliveredId = null;
        foreach ($groups as $group) {
            if ($group['name'] !== $this->group) {
                continue;
            }

            // Use "lag" key provided by Redis 7.x. See https://redis.io/commands/xinfo-groups/#consumer-group-lag.
            if (isset($group['lag'])) {
                return $group['lag'];
            }

            if (!isset($group['last-delivered-id'])) {
                return 0;
            }

            $lastDeliveredId = $group['last-delivered-id'];
            break;
        }

        if (null === $lastDeliveredId) {
            return 0;
        }

        // Iterate through the stream. See https://redis.io/commands/xrange/#iterating-a-stream.
        $useExclusiveRangeInterval = version_compare(phpversion('redis'), '6.2.0', '>=');
        $total = 0;
        while (true) {
            if (!$range = $redis->xRange($this->stream, $lastDeliveredId, '+', 100)) {
                return $total;
            }

            $total += \count($range);

            if ($useExclusiveRangeInterval) {
                $lastDeliveredId = preg_replace_callback('#\d+$#', static fn (array $matches) => (int) $matches[0] + 1, array_key_last($range));
            } else {
                $lastDeliveredId = '('.array_key_last($range);
            }
        }
    }

    private function rawCommand(string $command, ...$arguments): mixed
    {
        $redis = $this->getRedis();

        try {
            if ($redis instanceof \RedisCluster) {
                $result = $redis->rawCommand($this->queue, $command, $this->queue, ...$arguments);
            } else {
                $result = $redis->rawCommand($command, $this->queue, ...$arguments);
            }
        } catch (\RedisException|\Relay\Exception $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        if (false === $result) {
            if ($error = $redis->getLastError() ?: null) {
                $redis->clearLastError();
            }
            throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
        }

        return $result;
    }

    private function getRedis(): \Redis|Relay|\RedisCluster
    {
        if ($this->redis instanceof \Closure) {
            $this->redis = ($this->redis)();
        }

        return $this->redis;
    }
}

Function Calls

None

Variables

None

Stats

MD5 d5dce5432c830f0b715ad62c9686e32f
Eval Count 0
Decode Time 115 ms