Find this useful? Enter your email to receive occasional updates for securing PHP code.

Signing you up...

Thank you for signing up!

PHP Decode

<?php declare(strict_types=1); namespace Kafka\Protocol; use Kafka\Exception\NotSupporte..

Decoded Output download

<?php
declare(strict_types=1);

namespace Kafka\Protocol;

use Kafka\Exception\NotSupported;
use Kafka\Exception\Protocol as ProtocolException;
use Lcobucci\Clock\Clock;
use Lcobucci\Clock\SystemClock;
use function crc32;
use function is_array;
use function substr;

class Produce extends Protocol
{
    /**
     * Specifies the mask for the compression code. 3 bits to hold the compression codec.
     * 0 is reserved to indicate no compression
     */
    public const COMPRESSION_CODEC_MASK = 0x07;

    /**
     * Specify the mask of timestamp type: 0 for CreateTime, 1 for LogAppendTime.
     */
    private const TIMESTAMP_TYPE_MASK = 0x08;

    private const TIMESTAMP_NONE            = -1;
    private const TIMESTAMP_CREATE_TIME     = 0;
    private const TIMESTAMP_LOG_APPEND_TIME = 1;

    /**
     * @var Clock
     */
    private $clock;

    public function __construct(string $version = self::DEFAULT_BROKER_VERION, ?Clock $clock = null)
    {
        parent::__construct($version);

        $this->clock = $clock ?: new SystemClock();
    }

    /**
     * @param mixed[] $payloads
     *
     * @throws NotSupported
     * @throws ProtocolException
     */
    public function encode(array $payloads = []): string
    {
        if (! isset($payloads['data'])) {
            throw new ProtocolException('given procude data invalid. `data` is undefined.');
        }

        $header = $this->requestHeader('kafka-php', 0, self::PRODUCE_REQUEST);
        $data   = self::pack(self::BIT_B16, (string) ($payloads['required_ack'] ?? 0));
        $data  .= self::pack(self::BIT_B32, (string) ($payloads['timeout'] ?? 100));
        $data  .= self::encodeArray(
            $payloads['data'],
            [$this, 'encodeProduceTopic'],
            $payloads['compression'] ?? self::COMPRESSION_NONE
        );

        return self::encodeString($header . $data, self::PACK_INT32);
    }

    /**
     * @return mixed[]
     *
     * @throws ProtocolException
     */
    public function decode(string $data): array
    {
        $offset       = 0;
        $version      = $this->getApiVersion(self::PRODUCE_REQUEST);
        $ret          = $this->decodeArray(substr($data, $offset), [$this, 'produceTopicPair'], $version);
        $offset      += $ret['length'];
        $throttleTime = 0;

        if ($version === self::API_VERSION2) {
            $throttleTime = self::unpack(self::BIT_B32, substr($data, $offset, 4));
        }

        return ['throttleTime' => $throttleTime, 'data' => $ret['data']];
    }

    /**
     * encode message set
     * N.B., MessageSets are not preceded by an int32 like other array elements
     * in the protocol.
     *
     * @param string[]|string[][] $messages
     *
     * @throws NotSupported
     */
    protected function encodeMessageSet(array $messages, int $compression = self::COMPRESSION_NONE): string
    {
        $data = '';
        $next = 0;

        foreach ($messages as $message) {
            $encodedMessage = $this->encodeMessage($message);

            $data .= self::pack(self::BIT_B64, (string) $next)
                   . self::encodeString($encodedMessage, self::PACK_INT32);

            ++$next;
        }

        if ($compression === self::COMPRESSION_NONE) {
            return $data;
        }

        return self::pack(self::BIT_B64, '0')
             . self::encodeString($this->encodeMessage($data, $compression), self::PACK_INT32);
    }

    /**
     * @param string[]|string $message
     *
     * @throws NotSupported
     */
    protected function encodeMessage($message, int $compression = self::COMPRESSION_NONE): string
    {
        $magic      = $this->computeMagicByte();
        $attributes = $this->computeAttributes($magic, $compression, $this->computeTimestampType($magic));

        $data  = self::pack(self::BIT_B8, (string) $magic);
        $data .= self::pack(self::BIT_B8, (string) $attributes);

        if ($magic >= self::MESSAGE_MAGIC_VERSION1) {
            $data .= self::pack(self::BIT_B64, $this->clock->now()->format('Uv'));
        }

        $key = '';

        if (is_array($message)) {
            $key     = $message['key'];
            $message = $message['value'];
        }

        // message key
        $data .= self::encodeString($key, self::PACK_INT32);

        // message value
        $data .= self::encodeString($message, self::PACK_INT32, $compression);

        $crc = (string) crc32($data);

        // int32 -- crc code  string data
        $message = self::pack(self::BIT_B32, $crc) . $data;

        return $message;
    }

    private function computeMagicByte(): int
    {
        if ($this->getApiVersion(self::PRODUCE_REQUEST) === self::API_VERSION2) {
            return self::MESSAGE_MAGIC_VERSION1;
        }

        return self::MESSAGE_MAGIC_VERSION0;
    }

    public function computeTimestampType(int $magic): int
    {
        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
            return self::TIMESTAMP_NONE;
        }

        return self::TIMESTAMP_CREATE_TIME;
    }

    private function computeAttributes(int $magic, int $compression, int $timestampType): int
    {
        $attributes = 0;

        if ($compression !== self::COMPRESSION_NONE) {
            $attributes |= self::COMPRESSION_CODEC_MASK & $compression;
        }

        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
            return $attributes;
        }

        if ($timestampType === self::TIMESTAMP_LOG_APPEND_TIME) {
            $attributes |= self::TIMESTAMP_TYPE_MASK;
        }

        return $attributes;
    }

    /**
     * encode signal part
     *
     * @param mixed[] $values
     *
     * @throws NotSupported
     * @throws ProtocolException
     */
    protected function encodeProducePartition(array $values, int $compression): string
    {
        if (! isset($values['partition_id'])) {
            throw new ProtocolException('given produce data invalid. `partition_id` is undefined.');
        }

        if (! isset($values['messages']) || empty($values['messages'])) {
            throw new ProtocolException('given produce data invalid. `messages` is undefined.');
        }

        $data  = self::pack(self::BIT_B32, (string) $values['partition_id']);
        $data .= self::encodeString(
            $this->encodeMessageSet((array) $values['messages'], $compression),
            self::PACK_INT32
        );

        return $data;
    }

    /**
     * encode signal topic
     *
     * @param mixed[] $values
     *
     * @throws NotSupported
     * @throws ProtocolException
     */
    protected function encodeProduceTopic(array $values, int $compression): string
    {
        if (! isset($values['topic_name'])) {
            throw new ProtocolException('given produce data invalid. `topic_name` is undefined.');
        }

        if (! isset($values['partitions']) || empty($values['partitions'])) {
            throw new ProtocolException('given produce data invalid. `partitions` is undefined.');
        }

        $topic      = self::encodeString($values['topic_name'], self::PACK_INT16);
        $partitions = self::encodeArray($values['partitions'], [$this, 'encodeProducePartition'], $compression);

        return $topic . $partitions;
    }

    /**
     * decode produce topic pair response
     *
     * @return mixed[]
     *
     * @throws ProtocolException
     */
    protected function produceTopicPair(string $data, int $version): array
    {
        $offset    = 0;
        $topicInfo = $this->decodeString($data, self::BIT_B16);
        $offset   += $topicInfo['length'];
        $ret       = $this->decodeArray(substr($data, $offset), [$this, 'producePartitionPair'], $version);
        $offset   += $ret['length'];

        return [
            'length' => $offset,
            'data'   => [
                'topicName'  => $topicInfo['data'],
                'partitions' => $ret['data'],
            ],
        ];
    }

    /**
     * decode produce partition pair response
     *
     * @return mixed[]
     *
     * @throws ProtocolException
     */
    protected function producePartitionPair(string $data, int $version): array
    {
        $offset          = 0;
        $partitionId     = self::unpack(self::BIT_B32, substr($data, $offset, 4));
        $offset         += 4;
        $errorCode       = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
        $offset         += 2;
        $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
        $offset         += 8;
        $timestamp       = 0;

        if ($version === self::API_VERSION2) {
            $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8));
            $offset   += 8;
        }

        return [
            'length' => $offset,
            'data'   => [
                'partition' => $partitionId,
                'errorCode' => $errorCode,
                'offset'    => $offset,
                'timestamp' => $timestamp,
            ],
        ];
    }
}
 ?>

Did this file decode correctly?

Original Code

<?php
declare(strict_types=1);

namespace Kafka\Protocol;

use Kafka\Exception\NotSupported;
use Kafka\Exception\Protocol as ProtocolException;
use Lcobucci\Clock\Clock;
use Lcobucci\Clock\SystemClock;
use function crc32;
use function is_array;
use function substr;

class Produce extends Protocol
{
    /**
     * Specifies the mask for the compression code. 3 bits to hold the compression codec.
     * 0 is reserved to indicate no compression
     */
    public const COMPRESSION_CODEC_MASK = 0x07;

    /**
     * Specify the mask of timestamp type: 0 for CreateTime, 1 for LogAppendTime.
     */
    private const TIMESTAMP_TYPE_MASK = 0x08;

    private const TIMESTAMP_NONE            = -1;
    private const TIMESTAMP_CREATE_TIME     = 0;
    private const TIMESTAMP_LOG_APPEND_TIME = 1;

    /**
     * @var Clock
     */
    private $clock;

    public function __construct(string $version = self::DEFAULT_BROKER_VERION, ?Clock $clock = null)
    {
        parent::__construct($version);

        $this->clock = $clock ?: new SystemClock();
    }

    /**
     * @param mixed[] $payloads
     *
     * @throws NotSupported
     * @throws ProtocolException
     */
    public function encode(array $payloads = []): string
    {
        if (! isset($payloads['data'])) {
            throw new ProtocolException('given procude data invalid. `data` is undefined.');
        }

        $header = $this->requestHeader('kafka-php', 0, self::PRODUCE_REQUEST);
        $data   = self::pack(self::BIT_B16, (string) ($payloads['required_ack'] ?? 0));
        $data  .= self::pack(self::BIT_B32, (string) ($payloads['timeout'] ?? 100));
        $data  .= self::encodeArray(
            $payloads['data'],
            [$this, 'encodeProduceTopic'],
            $payloads['compression'] ?? self::COMPRESSION_NONE
        );

        return self::encodeString($header . $data, self::PACK_INT32);
    }

    /**
     * @return mixed[]
     *
     * @throws ProtocolException
     */
    public function decode(string $data): array
    {
        $offset       = 0;
        $version      = $this->getApiVersion(self::PRODUCE_REQUEST);
        $ret          = $this->decodeArray(substr($data, $offset), [$this, 'produceTopicPair'], $version);
        $offset      += $ret['length'];
        $throttleTime = 0;

        if ($version === self::API_VERSION2) {
            $throttleTime = self::unpack(self::BIT_B32, substr($data, $offset, 4));
        }

        return ['throttleTime' => $throttleTime, 'data' => $ret['data']];
    }

    /**
     * encode message set
     * N.B., MessageSets are not preceded by an int32 like other array elements
     * in the protocol.
     *
     * @param string[]|string[][] $messages
     *
     * @throws NotSupported
     */
    protected function encodeMessageSet(array $messages, int $compression = self::COMPRESSION_NONE): string
    {
        $data = '';
        $next = 0;

        foreach ($messages as $message) {
            $encodedMessage = $this->encodeMessage($message);

            $data .= self::pack(self::BIT_B64, (string) $next)
                   . self::encodeString($encodedMessage, self::PACK_INT32);

            ++$next;
        }

        if ($compression === self::COMPRESSION_NONE) {
            return $data;
        }

        return self::pack(self::BIT_B64, '0')
             . self::encodeString($this->encodeMessage($data, $compression), self::PACK_INT32);
    }

    /**
     * @param string[]|string $message
     *
     * @throws NotSupported
     */
    protected function encodeMessage($message, int $compression = self::COMPRESSION_NONE): string
    {
        $magic      = $this->computeMagicByte();
        $attributes = $this->computeAttributes($magic, $compression, $this->computeTimestampType($magic));

        $data  = self::pack(self::BIT_B8, (string) $magic);
        $data .= self::pack(self::BIT_B8, (string) $attributes);

        if ($magic >= self::MESSAGE_MAGIC_VERSION1) {
            $data .= self::pack(self::BIT_B64, $this->clock->now()->format('Uv'));
        }

        $key = '';

        if (is_array($message)) {
            $key     = $message['key'];
            $message = $message['value'];
        }

        // message key
        $data .= self::encodeString($key, self::PACK_INT32);

        // message value
        $data .= self::encodeString($message, self::PACK_INT32, $compression);

        $crc = (string) crc32($data);

        // int32 -- crc code  string data
        $message = self::pack(self::BIT_B32, $crc) . $data;

        return $message;
    }

    private function computeMagicByte(): int
    {
        if ($this->getApiVersion(self::PRODUCE_REQUEST) === self::API_VERSION2) {
            return self::MESSAGE_MAGIC_VERSION1;
        }

        return self::MESSAGE_MAGIC_VERSION0;
    }

    public function computeTimestampType(int $magic): int
    {
        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
            return self::TIMESTAMP_NONE;
        }

        return self::TIMESTAMP_CREATE_TIME;
    }

    private function computeAttributes(int $magic, int $compression, int $timestampType): int
    {
        $attributes = 0;

        if ($compression !== self::COMPRESSION_NONE) {
            $attributes |= self::COMPRESSION_CODEC_MASK & $compression;
        }

        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
            return $attributes;
        }

        if ($timestampType === self::TIMESTAMP_LOG_APPEND_TIME) {
            $attributes |= self::TIMESTAMP_TYPE_MASK;
        }

        return $attributes;
    }

    /**
     * encode signal part
     *
     * @param mixed[] $values
     *
     * @throws NotSupported
     * @throws ProtocolException
     */
    protected function encodeProducePartition(array $values, int $compression): string
    {
        if (! isset($values['partition_id'])) {
            throw new ProtocolException('given produce data invalid. `partition_id` is undefined.');
        }

        if (! isset($values['messages']) || empty($values['messages'])) {
            throw new ProtocolException('given produce data invalid. `messages` is undefined.');
        }

        $data  = self::pack(self::BIT_B32, (string) $values['partition_id']);
        $data .= self::encodeString(
            $this->encodeMessageSet((array) $values['messages'], $compression),
            self::PACK_INT32
        );

        return $data;
    }

    /**
     * encode signal topic
     *
     * @param mixed[] $values
     *
     * @throws NotSupported
     * @throws ProtocolException
     */
    protected function encodeProduceTopic(array $values, int $compression): string
    {
        if (! isset($values['topic_name'])) {
            throw new ProtocolException('given produce data invalid. `topic_name` is undefined.');
        }

        if (! isset($values['partitions']) || empty($values['partitions'])) {
            throw new ProtocolException('given produce data invalid. `partitions` is undefined.');
        }

        $topic      = self::encodeString($values['topic_name'], self::PACK_INT16);
        $partitions = self::encodeArray($values['partitions'], [$this, 'encodeProducePartition'], $compression);

        return $topic . $partitions;
    }

    /**
     * decode produce topic pair response
     *
     * @return mixed[]
     *
     * @throws ProtocolException
     */
    protected function produceTopicPair(string $data, int $version): array
    {
        $offset    = 0;
        $topicInfo = $this->decodeString($data, self::BIT_B16);
        $offset   += $topicInfo['length'];
        $ret       = $this->decodeArray(substr($data, $offset), [$this, 'producePartitionPair'], $version);
        $offset   += $ret['length'];

        return [
            'length' => $offset,
            'data'   => [
                'topicName'  => $topicInfo['data'],
                'partitions' => $ret['data'],
            ],
        ];
    }

    /**
     * decode produce partition pair response
     *
     * @return mixed[]
     *
     * @throws ProtocolException
     */
    protected function producePartitionPair(string $data, int $version): array
    {
        $offset          = 0;
        $partitionId     = self::unpack(self::BIT_B32, substr($data, $offset, 4));
        $offset         += 4;
        $errorCode       = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
        $offset         += 2;
        $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
        $offset         += 8;
        $timestamp       = 0;

        if ($version === self::API_VERSION2) {
            $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8));
            $offset   += 8;
        }

        return [
            'length' => $offset,
            'data'   => [
                'partition' => $partitionId,
                'errorCode' => $errorCode,
                'offset'    => $offset,
                'timestamp' => $timestamp,
            ],
        ];
    }
}

Function Calls

None

Variables

None

Stats

MD5 3fa78c79c884e2c4bdb998d993d4b51c
Eval Count 0
Decode Time 90 ms