<?php
/**
* Hoa
*
*
* @license
*
* New BSD License
*
* Copyright © 2007-2017, Hoa community. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the Hoa nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
namespace Psy\Readline\Hoa;
/**
* Class \Hoa\Stream.
*
* Static register for all streams (files, sockets etc.).
*/
abstract class Stream implements IStream, EventListenable
{
use EventListens;
/**
* Name index in the stream bucket.
*/
const NAME = 0;
/**
* Handler index in the stream bucket.
*/
const HANDLER = 1;
/**
* Resource index in the stream bucket.
*/
const RESOURCE = 2;
/**
* Context index in the stream bucket.
*/
const CONTEXT = 3;
/**
* Default buffer size.
*/
const DEFAULT_BUFFER_SIZE = 8192;
/**
* Current stream bucket.
*/
protected $_bucket = [];
/**
* Static stream register.
*/
private static $_register = [];
/**
* Buffer size (default is 8Ko).
*/
protected $_bufferSize = self::DEFAULT_BUFFER_SIZE;
/**
* Original stream name, given to the stream constructor.
*/
protected $_streamName = null;
/**
* Context name.
*/
protected $_context = null;
/**
* Whether the opening has been deferred.
*/
protected $_hasBeenDeferred = false;
/**
* Whether this stream is already opened by another handler.
*/
protected $_borrowing = false;
/**
* Set the current stream.
* If not exists in the register, try to call the
* `$this->_open()` method. Please, see the `self::_getStream()` method.
*/
public function __construct(string $streamName, string $context = null, bool $wait = false)
{
$this->_streamName = $streamName;
$this->_context = $context;
$this->_hasBeenDeferred = $wait;
$this->setListener(
new EventListener(
$this,
[
'authrequire',
'authresult',
'complete',
'connect',
'failure',
'mimetype',
'progress',
'redirect',
'resolve',
'size',
]
)
);
if (true === $wait) {
return;
}
$this->open();
return;
}
/**
* Get a stream in the register.
* If the stream does not exist, try to open it by calling the
* $handler->_open() method.
*/
private static function &_getStream(
string $streamName,
self $handler,
string $context = null
): array {
$name = \md5($streamName);
if (null !== $context) {
if (false === StreamContext::contextExists($context)) {
throw new StreamException('Context %s was not previously declared, cannot retrieve '.'this context.', 0, $context);
}
$context = StreamContext::getInstance($context);
}
if (!isset(self::$_register[$name])) {
self::$_register[$name] = [
self::NAME => $streamName,
self::HANDLER => $handler,
self::RESOURCE => $handler->_open($streamName, $context),
self::CONTEXT => $context,
];
Event::register(
'hoa://Event/Stream/'.$streamName,
$handler
);
// Add :open-ready?
Event::register(
'hoa://Event/Stream/'.$streamName.':close-before',
$handler
);
} else {
$handler->_borrowing = true;
}
if (null === self::$_register[$name][self::RESOURCE]) {
self::$_register[$name][self::RESOURCE]
= $handler->_open($streamName, $context);
}
return self::$_register[$name];
}
/**
* Open the stream and return the associated resource.
* Note: This method is protected, but do not forget that it could be
* overloaded into a public context.
*/
abstract protected function &_open(string $streamName, StreamContext $context = null);
/**
* Close the current stream.
* Note: this method is protected, but do not forget that it could be
* overloaded into a public context.
*/
abstract protected function _close(): bool;
/**
* Open the stream.
*/
final public function open(): self
{
$context = $this->_context;
if (true === $this->hasBeenDeferred()) {
if (null === $context) {
$handle = StreamContext::getInstance(\uniqid());
$handle->setParameters([
'notification' => [$this, '_notify'],
]);
$context = $handle->getId();
} elseif (true === StreamContext::contextExists($context)) {
$handle = StreamContext::getInstance($context);
$parameters = $handle->getParameters();
if (!isset($parameters['notification'])) {
$handle->setParameters([
'notification' => [$this, '_notify'],
]);
}
}
}
$this->_bufferSize = self::DEFAULT_BUFFER_SIZE;
$this->_bucket = self::_getStream(
$this->_streamName,
$this,
$context
);
return $this;
}
/**
* Close the current stream.
*/
final public function close()
{
$streamName = $this->getStreamName();
if (null === $streamName) {
return;
}
$name = \md5($streamName);
if (!isset(self::$_register[$name])) {
return;
}
Event::notify(
'hoa://Event/Stream/'.$streamName.':close-before',
$this,
new EventBucket()
);
if (false === $this->_close()) {
return;
}
unset(self::$_register[$name]);
$this->_bucket[self::HANDLER] = null;
Event::unregister(
'hoa://Event/Stream/'.$streamName
);
Event::unregister(
'hoa://Event/Stream/'.$streamName.':close-before'
);
return;
}
/**
* Get the current stream name.
*/
public function getStreamName()
{
if (empty($this->_bucket)) {
return null;
}
return $this->_bucket[self::NAME];
}
/**
* Get the current stream.
*/
public function getStream()
{
if (empty($this->_bucket)) {
return null;
}
return $this->_bucket[self::RESOURCE];
}
/**
* Get the current stream context.
*/
public function getStreamContext()
{
if (empty($this->_bucket)) {
return null;
}
return $this->_bucket[self::CONTEXT];
}
/**
* Get stream handler according to its name.
*/
public static function getStreamHandler(string $streamName)
{
$name = \md5($streamName);
if (!isset(self::$_register[$name])) {
return null;
}
return self::$_register[$name][self::HANDLER];
}
/**
* Set the current stream. Useful to manage a stack of streams (e.g. socket
* and select). Notice that it could be unsafe to use this method without
* taking time to think about it two minutes. Resource of type “Unknown” is
* considered as valid.
*/
public function _setStream($stream)
{
if (false === \is_resource($stream) &&
('resource' !== \gettype($stream) ||
'Unknown' !== \get_resource_type($stream))) {
throw new StreamException('Try to change the stream resource with an invalid one; '.'given %s.', 1, \gettype($stream));
}
$old = $this->_bucket[self::RESOURCE];
$this->_bucket[self::RESOURCE] = $stream;
return $old;
}
/**
* Check if the stream is opened.
*/
public function isOpened(): bool
{
return \is_resource($this->getStream());
}
/**
* Set the timeout period.
*/
public function setStreamTimeout(int $seconds, int $microseconds = 0): bool
{
return \stream_set_timeout($this->getStream(), $seconds, $microseconds);
}
/**
* Whether the opening of the stream has been deferred.
*/
protected function hasBeenDeferred()
{
return $this->_hasBeenDeferred;
}
/**
* Check whether the connection has timed out or not.
* This is basically a shortcut of `getStreamMetaData` + the `timed_out`
* index, but the resulting code is more readable.
*/
public function hasTimedOut(): bool
{
$metaData = $this->getStreamMetaData();
return true === $metaData['timed_out'];
}
/**
* Set blocking/non-blocking mode.
*/
public function setStreamBlocking(bool $mode): bool
{
return \stream_set_blocking($this->getStream(), $mode);
}
/**
* Set stream buffer.
* Output using fwrite() (or similar function) is normally buffered at 8 Ko.
* This means that if there are two processes wanting to write to the same
* output stream, each is paused after 8 Ko of data to allow the other to
* write.
*/
public function setStreamBuffer(int $buffer): bool
{
// Zero means success.
$out = 0 === \stream_set_write_buffer($this->getStream(), $buffer);
if (true === $out) {
$this->_bufferSize = $buffer;
}
return $out;
}
/**
* Disable stream buffering.
* Alias of $this->setBuffer(0).
*/
public function disableStreamBuffer(): bool
{
return $this->setStreamBuffer(0);
}
/**
* Get stream buffer size.
*/
public function getStreamBufferSize(): int
{
return $this->_bufferSize;
}
/**
* Get stream wrapper name.
*/
public function getStreamWrapperName(): string
{
if (false === $pos = \strpos($this->getStreamName(), '://')) {
return 'file';
}
return \substr($this->getStreamName(), 0, $pos);
}
/**
* Get stream meta data.
*/
public function getStreamMetaData(): array
{
return \stream_get_meta_data($this->getStream());
}
/**
* Whether this stream is already opened by another handler.
*/
public function isBorrowing(): bool
{
return $this->_borrowing;
}
/**
* Notification callback.
*/
public function _notify(
int $ncode,
int $severity,
$message,
$code,
$transferred,
$max
) {
static $_map = [
\STREAM_NOTIFY_AUTH_REQUIRED => 'authrequire',
\STREAM_NOTIFY_AUTH_RESULT => 'authresult',
\STREAM_NOTIFY_COMPLETED => 'complete',
\STREAM_NOTIFY_CONNECT => 'connect',
\STREAM_NOTIFY_FAILURE => 'failure',
\STREAM_NOTIFY_MIME_TYPE_IS => 'mimetype',
\STREAM_NOTIFY_PROGRESS => 'progress',
\STREAM_NOTIFY_REDIRECTED => 'redirect',
\STREAM_NOTIFY_RESOLVE => 'resolve',
\STREAM_NOTIFY_FILE_SIZE_IS => 'size',
];
$this->getListener()->fire($_map[$ncode], new EventBucket([
'code' => $code,
'severity' => $severity,
'message' => $message,
'transferred' => $transferred,
'max' => $max,
]));
}
/**
* Call the $handler->close() method on each stream in the static stream
* register.
* This method does not check the return value of $handler->close(). Thus,
* if a stream is persistent, the $handler->close() should do anything. It
* is a very generic method.
*/
final public static function _Hoa_Stream()
{
foreach (self::$_register as $entry) {
$entry[self::HANDLER]->close();
}
return;
}
/**
* Transform object to string.
*/
public function __toString(): string
{
return $this->getStreamName();
}
/**
* Close the stream when destructing.
*/
public function __destruct()
{
if (false === $this->isOpened()) {
return;
}
$this->close();
return;
}
}
/**
* Class \Hoa\Stream\_Protocol.
*
* The `hoa://Library/Stream` node.
*
* @license New BSD License
*/
class _Protocol extends ProtocolNode
{
/**
* Component's name.
*
* @var string
*/
protected $_name = 'Stream';
/**
* ID of the component.
*
* @param string $id ID of the component
*
* @return mixed
*/
public function reachId(string $id)
{
return Stream::getStreamHandler($id);
}
}
/*
* Shutdown method.
*/
\register_shutdown_function([Stream::class, '_Hoa_Stream']);
/**
* Add the `hoa://Library/Stream` node. Should be use to reach/get an entry
* in the stream register.
*/
$protocol = Protocol::getInstance();
$protocol['Library'][] = new _Protocol();