Find this useful? Enter your email to receive occasional updates for securing PHP code.
Signing you up...
Thank you for signing up!
PHP Decode
<?php declare(strict_types=1); namespace Kafka; use const STREAM_CLIENT_CONNECT; use fun..
Decoded Output download
<?php
declare(strict_types=1);
namespace Kafka;
use const STREAM_CLIENT_CONNECT;
use function feof;
use function fread;
use function fwrite;
use function is_null;
use function is_resource;
use function sprintf;
use function stream_context_create;
use function stream_get_meta_data;
use function stream_select;
use function stream_socket_client;
use function strlen;
use function substr;
use function trim;
abstract class CommonSocket
{
public const READ_MAX_LENGTH = 5242880; // read socket max length 5MB
/**
* max write socket buffer
* fixed:send of 8192 bytes failed with errno=11 Resource temporarily
* fixed:'fwrite(): send of ???? bytes failed with errno=35 Resource temporarily unavailable'
* unavailable error info
*/
public const MAX_WRITE_BUFFER = 2048;
/**
* Send timeout in seconds.
*
* @var int
*/
protected $sendTimeoutSec = 0;
/**
* Send timeout in microseconds.
*
* @var int
*/
protected $sendTimeoutUsec = 100000;
/**
* Recv timeout in seconds
*
* @var int
*/
protected $recvTimeoutSec = 0;
/**
* Recv timeout in microseconds
*
* @var int
*/
protected $recvTimeoutUsec = 750000;
/**
* @var resource
*/
protected $stream;
/**
* @var string|null
*/
protected $host;
/**
* @var int
*/
protected $port = -1;
/**
* @var int
*/
protected $maxWriteAttempts = 3;
/**
* @var Config|null
*/
protected $config;
/**
* @var SaslMechanism|null
*/
private $saslProvider;
public function __construct(string $host, int $port, ?Config $config = null, ?SaslMechanism $saslProvider = null)
{
$this->host = $host;
$this->port = $port;
$this->config = $config;
$this->saslProvider = $saslProvider;
}
public function setSendTimeoutSec(int $sendTimeoutSec): void
{
$this->sendTimeoutSec = $sendTimeoutSec;
}
public function setSendTimeoutUsec(int $sendTimeoutUsec): void
{
$this->sendTimeoutUsec = $sendTimeoutUsec;
}
public function setRecvTimeoutSec(int $recvTimeoutSec): void
{
$this->recvTimeoutSec = $recvTimeoutSec;
}
public function setRecvTimeoutUsec(int $recvTimeoutUsec): void
{
$this->recvTimeoutUsec = $recvTimeoutUsec;
}
public function setMaxWriteAttempts(int $number): void
{
$this->maxWriteAttempts = $number;
}
/**
* @throws Exception
*/
protected function createStream(): void
{
if (trim($this->host) === '') {
throw new Exception('Cannot open null host.');
}
if ($this->port <= 0) {
throw new Exception('Cannot open without port.');
}
$remoteSocket = sprintf('tcp://%s:%s', $this->host, $this->port);
$context = stream_context_create([]);
if ($this->config !== null && $this->config->getSslEnable()) { // ssl connection
$remoteSocket = sprintf('ssl://%s:%s', $this->host, $this->port);
$context = $this->createSslStreamContext();
}
$this->stream = $this->createSocket($remoteSocket, $context, $errno, $errstr);
if (! is_resource($this->stream)) {
throw new Exception(
sprintf('Could not connect to %s:%d (%s [%d])', $this->host, $this->port, $errstr, $errno)
);
}
// SASL auth
if ($this->saslProvider !== null) {
$this->saslProvider->authenticate($this);
}
}
/**
* Ecapsulation of stream_context_create for SSL connections
*
* @return resource $context
*/
protected function createSslStreamContext()
{
if (is_null($this->config)) {
throw new Exception('Cannot build SSL context without configuration');
}
if ($this->config->getSslEnableAuthentication()) {
return stream_context_create(
[
'ssl' => [
'local_cert' => $this->config->getSslLocalCert(),
'local_pk' => $this->config->getSslLocalPk(),
'verify_peer' => $this->config->getSslVerifyPeer(),
'passphrase' => $this->config->getSslPassphrase(),
'cafile' => $this->config->getSslCafile(),
'peer_name' => $this->config->getSslPeerName(),
],
]
);
}
return stream_context_create(
[
'ssl' => [
'verify_peer' => $this->config->getSslVerifyPeer(),
'cafile' => $this->config->getSslCafile(),
'peer_name' => $this->config->getSslPeerName(),
],
]
);
}
/**
* Encapsulation of stream_socket_client
*
* Because `stream_socket_client` in stream wrapper mock no effect, if don't create this function will never be testable
*
* @codeCoverageIgnore
*
* @param resource $context
*
* @return resource
*/
protected function createSocket(string $remoteSocket, $context, ?int &$errno, ?string &$errstr)
{
return stream_socket_client(
$remoteSocket,
$errno,
$errstr,
$this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000),
STREAM_CLIENT_CONNECT,
$context
);
}
/**
* @return resource
*/
public function getSocket()
{
return $this->stream;
}
/**
* Encapsulation of stream_select
*
* Because `stream_select` in stream wrapper mock no effect, if don't create this function will never be testable
*
* @codeCoverageIgnore
*
* @param resource[] $sockets
*
* @return int|bool
*/
protected function select(array $sockets, int $timeoutSec, int $timeoutUsec, bool $isRead = true)
{
$null = null;
if ($isRead) {
return @stream_select($sockets, $null, $null, $timeoutSec, $timeoutUsec);
}
return @stream_select($null, $sockets, $null, $timeoutSec, $timeoutUsec);
}
/**
* Encapsulation of stream_get_meta_data
*
* Because `stream_get_meta_data` in stream wrapper mock no effect, if don't create this function will never be testable
*
* @codeCoverageIgnore
*
* @return mixed[]
*/
protected function getMetaData(): array
{
return stream_get_meta_data($this->stream);
}
/**
* Read from the socket at most $len bytes.
*
* This method will not wait for all the requested data, it will return as
* soon as any data is received.
*
* @throws Exception
*/
public function readBlocking(int $length): string
{
if ($length > self::READ_MAX_LENGTH) {
throw Exception\Socket::invalidLength($length, self::READ_MAX_LENGTH);
}
$readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable === false) {
$this->close();
throw Exception\Socket::notReadable($length);
}
if ($readable === 0) { // select timeout
$res = $this->getMetaData();
$this->close();
if (! empty($res['timed_out'])) {
throw Exception\Socket::timedOut($length);
}
throw Exception\Socket::notReadable($length);
}
$remainingBytes = $length;
$data = $chunk = '';
while ($remainingBytes > 0) {
$chunk = fread($this->stream, $remainingBytes);
if ($chunk === false || strlen($chunk) === 0) {
// Zero bytes because of EOF?
if (feof($this->stream)) {
$this->close();
throw Exception\Socket::unexpectedEOF($length);
}
// Otherwise wait for bytes
$readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable !== 1) {
throw Exception\Socket::timedOutWithRemainingBytes($length, $remainingBytes);
}
continue; // attempt another read
}
$data .= $chunk;
$remainingBytes -= strlen($chunk);
}
return $data;
}
/**
* Write to the socket.
*
* @throws Exception
*/
public function writeBlocking(string $buffer): int
{
// fwrite to a socket may be partial, so loop until we
// are done with the entire buffer
$failedAttempts = 0;
$bytesWritten = 0;
$bytesToWrite = strlen($buffer);
while ($bytesWritten < $bytesToWrite) {
// wait for stream to become available for writing
$writable = $this->select([$this->stream], $this->sendTimeoutSec, $this->sendTimeoutUsec, false);
if ($writable === false) {
throw new Exception\Socket('Could not write ' . $bytesToWrite . ' bytes to stream');
}
if ($writable === 0) {
$res = $this->getMetaData();
if (! empty($res['timed_out'])) {
throw new Exception('Timed out writing ' . $bytesToWrite . ' bytes to stream after writing ' . $bytesWritten . ' bytes');
}
throw new Exception\Socket('Could not write ' . $bytesToWrite . ' bytes to stream');
}
if ($bytesToWrite - $bytesWritten > self::MAX_WRITE_BUFFER) {
// write max buffer size
$wrote = fwrite($this->stream, substr($buffer, $bytesWritten, self::MAX_WRITE_BUFFER));
} else {
// write remaining buffer bytes to stream
$wrote = fwrite($this->stream, substr($buffer, $bytesWritten));
}
if ($wrote === -1 || $wrote === false) {
throw new Exception\Socket('Could not write ' . strlen($buffer) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
}
if ($wrote === 0) {
// Increment the number of times we have failed
$failedAttempts++;
if ($failedAttempts > $this->maxWriteAttempts) {
throw new Exception\Socket('After ' . $failedAttempts . ' attempts could not write ' . strlen($buffer) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
}
} else {
// If we wrote something, reset our failed attempt counter
$failedAttempts = 0;
}
$bytesWritten += $wrote;
}
return $bytesWritten;
}
abstract public function close(): void;
abstract public function connect(): void;
/**
* @return void|int
*/
abstract public function write(?string $data = null);
/**
* @param string|int $data
*
* @return mixed
*/
abstract public function read($data);
}
?>
Did this file decode correctly?
Original Code
<?php
declare(strict_types=1);
namespace Kafka;
use const STREAM_CLIENT_CONNECT;
use function feof;
use function fread;
use function fwrite;
use function is_null;
use function is_resource;
use function sprintf;
use function stream_context_create;
use function stream_get_meta_data;
use function stream_select;
use function stream_socket_client;
use function strlen;
use function substr;
use function trim;
abstract class CommonSocket
{
public const READ_MAX_LENGTH = 5242880; // read socket max length 5MB
/**
* max write socket buffer
* fixed:send of 8192 bytes failed with errno=11 Resource temporarily
* fixed:'fwrite(): send of ???? bytes failed with errno=35 Resource temporarily unavailable'
* unavailable error info
*/
public const MAX_WRITE_BUFFER = 2048;
/**
* Send timeout in seconds.
*
* @var int
*/
protected $sendTimeoutSec = 0;
/**
* Send timeout in microseconds.
*
* @var int
*/
protected $sendTimeoutUsec = 100000;
/**
* Recv timeout in seconds
*
* @var int
*/
protected $recvTimeoutSec = 0;
/**
* Recv timeout in microseconds
*
* @var int
*/
protected $recvTimeoutUsec = 750000;
/**
* @var resource
*/
protected $stream;
/**
* @var string|null
*/
protected $host;
/**
* @var int
*/
protected $port = -1;
/**
* @var int
*/
protected $maxWriteAttempts = 3;
/**
* @var Config|null
*/
protected $config;
/**
* @var SaslMechanism|null
*/
private $saslProvider;
public function __construct(string $host, int $port, ?Config $config = null, ?SaslMechanism $saslProvider = null)
{
$this->host = $host;
$this->port = $port;
$this->config = $config;
$this->saslProvider = $saslProvider;
}
public function setSendTimeoutSec(int $sendTimeoutSec): void
{
$this->sendTimeoutSec = $sendTimeoutSec;
}
public function setSendTimeoutUsec(int $sendTimeoutUsec): void
{
$this->sendTimeoutUsec = $sendTimeoutUsec;
}
public function setRecvTimeoutSec(int $recvTimeoutSec): void
{
$this->recvTimeoutSec = $recvTimeoutSec;
}
public function setRecvTimeoutUsec(int $recvTimeoutUsec): void
{
$this->recvTimeoutUsec = $recvTimeoutUsec;
}
public function setMaxWriteAttempts(int $number): void
{
$this->maxWriteAttempts = $number;
}
/**
* @throws Exception
*/
protected function createStream(): void
{
if (trim($this->host) === '') {
throw new Exception('Cannot open null host.');
}
if ($this->port <= 0) {
throw new Exception('Cannot open without port.');
}
$remoteSocket = sprintf('tcp://%s:%s', $this->host, $this->port);
$context = stream_context_create([]);
if ($this->config !== null && $this->config->getSslEnable()) { // ssl connection
$remoteSocket = sprintf('ssl://%s:%s', $this->host, $this->port);
$context = $this->createSslStreamContext();
}
$this->stream = $this->createSocket($remoteSocket, $context, $errno, $errstr);
if (! is_resource($this->stream)) {
throw new Exception(
sprintf('Could not connect to %s:%d (%s [%d])', $this->host, $this->port, $errstr, $errno)
);
}
// SASL auth
if ($this->saslProvider !== null) {
$this->saslProvider->authenticate($this);
}
}
/**
* Ecapsulation of stream_context_create for SSL connections
*
* @return resource $context
*/
protected function createSslStreamContext()
{
if (is_null($this->config)) {
throw new Exception('Cannot build SSL context without configuration');
}
if ($this->config->getSslEnableAuthentication()) {
return stream_context_create(
[
'ssl' => [
'local_cert' => $this->config->getSslLocalCert(),
'local_pk' => $this->config->getSslLocalPk(),
'verify_peer' => $this->config->getSslVerifyPeer(),
'passphrase' => $this->config->getSslPassphrase(),
'cafile' => $this->config->getSslCafile(),
'peer_name' => $this->config->getSslPeerName(),
],
]
);
}
return stream_context_create(
[
'ssl' => [
'verify_peer' => $this->config->getSslVerifyPeer(),
'cafile' => $this->config->getSslCafile(),
'peer_name' => $this->config->getSslPeerName(),
],
]
);
}
/**
* Encapsulation of stream_socket_client
*
* Because `stream_socket_client` in stream wrapper mock no effect, if don't create this function will never be testable
*
* @codeCoverageIgnore
*
* @param resource $context
*
* @return resource
*/
protected function createSocket(string $remoteSocket, $context, ?int &$errno, ?string &$errstr)
{
return stream_socket_client(
$remoteSocket,
$errno,
$errstr,
$this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000),
STREAM_CLIENT_CONNECT,
$context
);
}
/**
* @return resource
*/
public function getSocket()
{
return $this->stream;
}
/**
* Encapsulation of stream_select
*
* Because `stream_select` in stream wrapper mock no effect, if don't create this function will never be testable
*
* @codeCoverageIgnore
*
* @param resource[] $sockets
*
* @return int|bool
*/
protected function select(array $sockets, int $timeoutSec, int $timeoutUsec, bool $isRead = true)
{
$null = null;
if ($isRead) {
return @stream_select($sockets, $null, $null, $timeoutSec, $timeoutUsec);
}
return @stream_select($null, $sockets, $null, $timeoutSec, $timeoutUsec);
}
/**
* Encapsulation of stream_get_meta_data
*
* Because `stream_get_meta_data` in stream wrapper mock no effect, if don't create this function will never be testable
*
* @codeCoverageIgnore
*
* @return mixed[]
*/
protected function getMetaData(): array
{
return stream_get_meta_data($this->stream);
}
/**
* Read from the socket at most $len bytes.
*
* This method will not wait for all the requested data, it will return as
* soon as any data is received.
*
* @throws Exception
*/
public function readBlocking(int $length): string
{
if ($length > self::READ_MAX_LENGTH) {
throw Exception\Socket::invalidLength($length, self::READ_MAX_LENGTH);
}
$readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable === false) {
$this->close();
throw Exception\Socket::notReadable($length);
}
if ($readable === 0) { // select timeout
$res = $this->getMetaData();
$this->close();
if (! empty($res['timed_out'])) {
throw Exception\Socket::timedOut($length);
}
throw Exception\Socket::notReadable($length);
}
$remainingBytes = $length;
$data = $chunk = '';
while ($remainingBytes > 0) {
$chunk = fread($this->stream, $remainingBytes);
if ($chunk === false || strlen($chunk) === 0) {
// Zero bytes because of EOF?
if (feof($this->stream)) {
$this->close();
throw Exception\Socket::unexpectedEOF($length);
}
// Otherwise wait for bytes
$readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable !== 1) {
throw Exception\Socket::timedOutWithRemainingBytes($length, $remainingBytes);
}
continue; // attempt another read
}
$data .= $chunk;
$remainingBytes -= strlen($chunk);
}
return $data;
}
/**
* Write to the socket.
*
* @throws Exception
*/
public function writeBlocking(string $buffer): int
{
// fwrite to a socket may be partial, so loop until we
// are done with the entire buffer
$failedAttempts = 0;
$bytesWritten = 0;
$bytesToWrite = strlen($buffer);
while ($bytesWritten < $bytesToWrite) {
// wait for stream to become available for writing
$writable = $this->select([$this->stream], $this->sendTimeoutSec, $this->sendTimeoutUsec, false);
if ($writable === false) {
throw new Exception\Socket('Could not write ' . $bytesToWrite . ' bytes to stream');
}
if ($writable === 0) {
$res = $this->getMetaData();
if (! empty($res['timed_out'])) {
throw new Exception('Timed out writing ' . $bytesToWrite . ' bytes to stream after writing ' . $bytesWritten . ' bytes');
}
throw new Exception\Socket('Could not write ' . $bytesToWrite . ' bytes to stream');
}
if ($bytesToWrite - $bytesWritten > self::MAX_WRITE_BUFFER) {
// write max buffer size
$wrote = fwrite($this->stream, substr($buffer, $bytesWritten, self::MAX_WRITE_BUFFER));
} else {
// write remaining buffer bytes to stream
$wrote = fwrite($this->stream, substr($buffer, $bytesWritten));
}
if ($wrote === -1 || $wrote === false) {
throw new Exception\Socket('Could not write ' . strlen($buffer) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
}
if ($wrote === 0) {
// Increment the number of times we have failed
$failedAttempts++;
if ($failedAttempts > $this->maxWriteAttempts) {
throw new Exception\Socket('After ' . $failedAttempts . ' attempts could not write ' . strlen($buffer) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
}
} else {
// If we wrote something, reset our failed attempt counter
$failedAttempts = 0;
}
$bytesWritten += $wrote;
}
return $bytesWritten;
}
abstract public function close(): void;
abstract public function connect(): void;
/**
* @return void|int
*/
abstract public function write(?string $data = null);
/**
* @param string|int $data
*
* @return mixed
*/
abstract public function read($data);
}
Function Calls
None |
Stats
MD5 | 14cf922126f15a0e15b9b94d64538e45 |
Eval Count | 0 |
Decode Time | 104 ms |