Skip to content

Commit

Permalink
bug fix and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
morozovsk committed Mar 2, 2016
1 parent 1542b6c commit 8dbc716
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 179 deletions.
57 changes: 52 additions & 5 deletions Daemon.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,28 @@ protected function close($connectionId) {
unset($this->_handshakes[$connectionId]);
} elseif (isset($this->clients[$connectionId])) {
$this->onClose($connectionId);//вызываем пользовательский сценарий
} elseif (isset($this->services[$connectionId])) {
$this->onServiceClose($connectionId);//вызываем пользовательский сценарий
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
$this->onMasterClose($connectionId);//вызываем пользовательский сценарий
}

parent::close($connectionId);

if (isset($this->clients[$connectionId])) {
unset($this->clients[$connectionId]);
} elseif (isset($this->services[$connectionId])) {
unset($this->services[$connectionId]);
} elseif ($this->getIdByConnection($this->_server) == $connectionId) {
$this->_server = null;
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
$this->_service = null;
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
$this->_master = null;
}

unset($this->_write[$connectionId]);
unset($this->_read[$connectionId]);
}

protected function sendToClient($connectionId, $data, $type = 'text') {
Expand Down Expand Up @@ -146,6 +165,8 @@ protected function _decode($connectionId)
{
$data = $this->_read[$connectionId];

if (strlen($data) < 2) return false;

$unmaskedPayload = '';
$decodedData = array();

Expand Down Expand Up @@ -186,18 +207,17 @@ protected function _decode($connectionId)
}

if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
if (strlen($data) < 4) return false;
$payloadOffset = 8;
$dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
if (strlen($data) < 10) return false;
$payloadOffset = 14;
for ($tmp = '', $i = 0; $i < 8; $i++) {
$tmp .= sprintf('%08b', ord($data[$i + 2]));
}
$dataLength = bindec($tmp) + $payloadOffset;
} else {
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength + $payloadOffset;
}
Expand All @@ -209,6 +229,14 @@ protected function _decode($connectionId)
}

if ($isMasked) {
if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
} else {
$mask = substr($data, 2, 4);
}

for ($i = $payloadOffset; $i < $dataLength; $i++) {
$j = $i - $payloadOffset;
if (isset($data[$i])) {
Expand All @@ -224,15 +252,34 @@ protected function _decode($connectionId)
return $decodedData;
}

protected function getConnectionById($connectionId) {
if (isset($this->clients[$connectionId])) {
return $this->clients[$connectionId];
} elseif (isset($this->services[$connectionId])) {
return $this->services[$connectionId];
} elseif ($this->getIdByConnection($this->_server) == $connectionId) {
return $this->_server;
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
return $this->_service;
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
return $this->_master;
}
}

protected function getIdByConnection($connection) {
return intval($connection);
}

protected function onOpen($connectionId, $info) {}
protected function onClose($connectionId) {}
protected function onMessage($connectionId, $packet, $type) {}

protected function onServiceMessage($connectionId, $data) {}
protected function onServiceOpen($data) {}
protected function onServiceClose($data) {}
protected function onServiceOpen($connectionId) {}
protected function onServiceClose($connectionId) {}

protected function onMasterMessage($data) {}
protected function onMasterClose($connectionId) {}

protected function onStart() {}
}
78 changes: 15 additions & 63 deletions GenericEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,47 @@ abstract class GenericEvent
public function start() {
$this->onStart();

$this->base = new EventBase();
$this->base = new \EventBase();

if ($this->_server) {
$this->event = new EventListener($this->base, array($this, "accept"), $this->base, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $this->_server);//EventListener($this->base, array($this, "accept"), null, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $this->_server);
$this->event = new \EventListener($this->base, array($this, "accept"), $this->base, \EventListener::OPT_CLOSE_ON_FREE | \EventListener::OPT_REUSEABLE, -1, $this->_server);//EventListener($this->base, array($this, "accept"), null, \EventListener::OPT_CLOSE_ON_FREE | \EventListener::OPT_REUSEABLE, -1, $this->_server);
}

if ($this->_service) {
$this->service_event = new EventListener($this->base, array($this, "service"), $this->base, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $this->_service);//EventListener($this->base, array($this, "accept"), null, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $this->_server);
$this->service_event = new \EventListener($this->base, array($this, "service"), $this->base, \EventListener::OPT_CLOSE_ON_FREE | \EventListener::OPT_REUSEABLE, -1, $this->_service);//EventListener($this->base, array($this, "accept"), null, \EventListener::OPT_CLOSE_ON_FREE | \EventListener::OPT_REUSEABLE, -1, $this->_server);
}

if ($this->_master) {
$connectionId = $this->getIdByConnection($this->_master);
$buffer = new EventBufferEvent($this->base, $this->_master, EventBufferEvent::OPT_CLOSE_ON_FREE);
$buffer = new \EventBufferEvent($this->base, $this->_master, \EventBufferEvent::OPT_CLOSE_ON_FREE);
$buffer->setCallbacks(array($this, "onRead"), array($this, "onWrite"), array($this, "onError"), $connectionId);
$buffer->enable(Event::READ | Event::WRITE | Event::PERSIST);
$buffer->enable(\Event::READ | \Event::WRITE | \Event::PERSIST);
$this->buffers[$connectionId] = $buffer;
}

if ($this->timer) {
$timer = Event::timer($this->base, function() use (&$timer) {$timer->addTimer($this->timer);$this->onTimer();});
$timer = \Event::timer($this->base, function() use (&$timer) {$timer->addTimer($this->timer);$this->onTimer();});
$timer->addTimer($this->timer);
}

$this->base->dispatch();
}

public function accept($listener, $connection, $address, $id) {
var_dump([$listener, $connection, $address, $id]);
$connectionId = $this->getIdByConnection($connection);
$buffer = new EventBufferEvent($this->base, $connection, EventBufferEvent::OPT_CLOSE_ON_FREE);
public function accept($listener, $connectionId, $address, $id) {
$buffer = new \EventBufferEvent($this->base, $connectionId, \EventBufferEvent::OPT_CLOSE_ON_FREE);
$buffer->setCallbacks(array($this, "onRead"), array($this, "onWrite"), array($this, "onError"), $connectionId);
$buffer->enable(Event::READ | Event::WRITE | Event::PERSIST);
$this->clients[$connectionId] = $connection;//var_dump($connection);
$buffer->enable(\Event::READ | \Event::WRITE | \Event::PERSIST);
$this->clients[$connectionId] = $connectionId;//var_dump($connection);
$this->buffers[$connectionId] = $buffer;

$this->_onOpen($connectionId);
}

public function service($listener, $connection, $address, $id) {
$connectionId = $this->getIdByConnection($connection);
$buffer = new EventBufferEvent($this->base, $connection, EventBufferEvent::OPT_CLOSE_ON_FREE);
public function service($listener, $connectionId, $address, $id) {
$buffer = new \EventBufferEvent($this->base, $connectionId, \EventBufferEvent::OPT_CLOSE_ON_FREE);
$buffer->setCallbacks(array($this, "onRead"), array($this, "onWrite"), array($this, "onError"), $connectionId);
$buffer->enable(Event::READ | Event::WRITE | Event::PERSIST);
$this->services[$connectionId] = $connection;//var_dump($connection);
$buffer->enable(\Event::READ | \Event::WRITE | \Event::PERSIST);
$this->services[$connectionId] = $connectionId;//var_dump($connectionId);
$this->buffers[$connectionId] = $buffer;

$this->onServiceOpen($connectionId);
Expand Down Expand Up @@ -113,22 +110,7 @@ public function onError($buffer, $error, $connectionId) {
protected function close($connectionId) {
//fclose($this->getConnectionById($connectionId));

if (isset($this->clients[$connectionId])) {
unset($this->clients[$connectionId]);
} elseif (isset($this->services[$connectionId])) {
unset($this->services[$connectionId]);
} elseif($this->getIdByConnection($this->_server) == $connectionId) {
unset($this->_server);
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
unset($this->_service);
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
unset($this->_master);
}

unset($this->_write[$connectionId]);
unset($this->_read[$connectionId]);

$this->buffers[$connectionId]->disable(Event::READ | Event::WRITE);
$this->buffers[$connectionId]->disable(\Event::READ | \Event::WRITE);
unset($this->buffers[$connectionId]);
}

Expand All @@ -155,34 +137,4 @@ protected function _read($connectionId) {
@$this->_read[$connectionId] .= $data;//добавляем полученные данные в буфер чтения
return strlen($this->_read[$connectionId]) < self::MAX_SOCKET_BUFFER_SIZE;
}

protected function getConnectionById($connectionId) {
if (isset($this->clients[$connectionId])) {
return $this->clients[$connectionId];
} elseif (isset($this->services[$connectionId])) {
return $this->services[$connectionId];
} elseif ($this->getIdByConnection($this->_server) == $connectionId) {
return $this->_server;
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
return $this->_service;
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
return $this->_master;
}
}

protected function getIdByConnection($connection) {
return intval($connection);
}

abstract protected function _onOpen($connectionId);

abstract protected function _onMessage($connectionId);

abstract protected function onServiceMessage($connectionId, $data);

abstract protected function onMasterMessage($data);

abstract protected function onServiceOpen($connectionId);

abstract protected function onServiceClose($connectionId);
}
55 changes: 2 additions & 53 deletions GenericLibevent.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private function service($socket, $flag, $base) {
$this->services[$connectionId] = $connection;
$this->buffers[$connectionId] = $buffer;

$this->_onOpen($connectionId);
$this->onServiceOpen($connectionId);
}

private function master($connection, $flag, $base) {
Expand Down Expand Up @@ -147,28 +147,7 @@ private function onError($buffer, $error, $connectionId) {
}

protected function close($connectionId) {
fclose($this->getConnectionById($connectionId));

if (isset($this->clients[$connectionId])) {
unset($this->clients[$connectionId]);
} elseif (isset($this->services[$connectionId])) {
unset($this->services[$connectionId]);
} elseif($this->getIdByConnection($this->_server) == $connectionId) {
unset($this->_server);
/*event_del($this->event);
event_free($this->event);
unset($this->event);*/
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
unset($this->_service);
/*event_del($this->service_event);
event_free($this->service_event);
unset($this->service_event);*/
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
unset($this->_master);
}

unset($this->_write[$connectionId]);
unset($this->_read[$connectionId]);
@fclose($this->getConnectionById($connectionId));

event_buffer_disable($this->buffers[$connectionId], EV_READ | EV_WRITE | EV_PERSIST);
event_buffer_free($this->buffers[$connectionId]);
Expand Down Expand Up @@ -198,34 +177,4 @@ protected function _read($connectionId) {
@$this->_read[$connectionId] .= $data;//добавляем полученные данные в буфер чтения
return strlen($this->_read[$connectionId]) < self::MAX_SOCKET_BUFFER_SIZE;
}

protected function getConnectionById($connectionId) {
if (isset($this->clients[$connectionId])) {
return $this->clients[$connectionId];
} elseif (isset($this->services[$connectionId])) {
return $this->services[$connectionId];
} elseif ($this->getIdByConnection($this->_server) == $connectionId) {
return $this->_server;
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
return $this->_service;
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
return $this->_master;
}
}

protected function getIdByConnection($connection) {
return intval($connection);
}

abstract protected function _onOpen($connectionId);

abstract protected function _onMessage($connectionId);

abstract protected function onServiceMessage($connectionId, $data);

abstract protected function onMasterMessage($data);

abstract protected function onServiceOpen($connectionId);

abstract protected function onServiceClose($connectionId);
}
45 changes: 0 additions & 45 deletions GenericSelect.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,6 @@ protected function _onError($connectionId) {

protected function close($connectionId) {
@fclose($this->getConnectionById($connectionId));

if (isset($this->clients[$connectionId])) {
unset($this->clients[$connectionId]);
} elseif (isset($this->services[$connectionId])) {
unset($this->services[$connectionId]);
} elseif($this->getConnectionById($connectionId) == $this->_server) {
unset($this->_server);
} elseif ($this->getConnectionById($connectionId) == $this->_service) {
unset($this->_service);
} elseif ($this->getConnectionById($connectionId) == $this->_master) {
unset($this->_master);
}

unset($this->_write[$connectionId]);
unset($this->_read[$connectionId]);
}

protected function _write($connectionId, $data, $delimiter = '') {
Expand Down Expand Up @@ -185,24 +170,6 @@ protected function _read($connectionId) {
return strlen($this->_read[$connectionId]) < self::MAX_SOCKET_BUFFER_SIZE;
}

protected function getConnectionById($connectionId) {
if (isset($this->clients[$connectionId])) {
return $this->clients[$connectionId];
} elseif (isset($this->services[$connectionId])) {
return $this->services[$connectionId];
} elseif ($this->getIdByConnection($this->_server) == $connectionId) {
return $this->_server;
} elseif ($this->getIdByConnection($this->_service) == $connectionId) {
return $this->_service;
} elseif ($this->getIdByConnection($this->_master) == $connectionId) {
return $this->_master;
}
}

protected function getIdByConnection($connection) {
return intval($connection);
}

protected function _createTimer() {
$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

Expand All @@ -224,16 +191,4 @@ protected function _createTimer() {
}
}
}

abstract protected function _onOpen($connectionId);

abstract protected function _onMessage($connectionId);

abstract protected function onServiceMessage($connectionId, $data);

abstract protected function onMasterMessage($data);

abstract protected function onServiceOpen($connectionId);

abstract protected function onServiceClose($connectionId);
}
Loading

0 comments on commit 8dbc716

Please sign in to comment.