PhP-WebSocket Server

const MAX_FRAME_RECV = 100000; const MAX_TIMEOUT = 25;

const TIMEOUT_PONG = 5; const TIMEOUT_RECV = 10;

const OPCODE_CONTINUATION = 0; const OPCODE_TEXT = 1; const OPCODE_BINARY = 2; const OPCODE_CLOSE = 8; const OPCODE_PING = 9; const OPCODE_PONG = 10;

const READY_STATE_CONNECTING = 0; const READY_STATE_OPEN = 1; const READY_STATE_CLOSING = 2; const READY_STATE_CLOSED = 3;

const PAYLOAD_LENGTH_16 = 126; const PAYLOAD_LENGTH_63 = 127;

const STATUS_PROTOCOL_ERROR = 1002; const STATUS_MESSAGE_TOO_BIG = 1004;

const FIN = 128; const MASK = 128;

/** * @var array */ public $clients = array (); /** * @var array */ public $ws = array ();

/** * @var array */ public $events = array (); /** * @var int */ public $clientsCount = 0;

/** * @var null */ public static $_instance = null;

/** * @return WebSocketApi|null */ public static function getInstance (){ if (self::$_instance === null){ self::$_instance = new self (); }

return self::$_instance; }

/** * @param string $host * @param int $port * @return bool */ public function startWServer ($host = '127.0.0.1', $port = 8000){ if (isset ($this→ws[0])) return false;

if (phpversion () >= 5.5){ cli_set_process_title («WebSockets Server 1.0»); }

if (!$this→ws[0] = socket_create (AF_INET, SOCK_STREAM, SOL_TCP)) { return false; } if (! socket_set_option ($this→ws[0], SOL_SOCKET, SO_REUSEADDR, 1)) { socket_close ($this→ws[0]); return false; } if (! socket_bind ($this→ws[0], $host, $port)) { socket_close ($this→ws[0]); return false; } if (! socket_listen ($this→ws[0], 10)) { socket_close ($this→ws[0]); return false; }

$write = null; $except = null;

$nextPingCheck = time () + 1; while (isset ($this→ws[0])){ $read = $this→ws; $count = socket_select ($read, $write, $except, 1); if ($count === false){ socket_close ($this→ws[0]); return false; }elseif ($count > 0){ foreach ($read as $clientId => $socket){ if ($clientId!= 0){ $buffer = ''; $bytes = @socket_recv ($socket, $buffer, 4096, 0);

if ($bytes === false){ $this→_closeClient ($clientId); }elseif ($bytes > 0){ if (!$this→_checkClient ($clientId, $buffer, $bytes)){ $this→_closeClient ($clientId); } }else{ $this→_removeClient ($clientId); } }else{ $client = socket_accept ($this→ws[0]); if ($client!== false){ // TODO: Добавить ограничения по макс. количеству клиентов $ip = ''; $result = socket_getpeername ($client, $ip); $ip = ip2long ($ip); if ($result!== false){ $this→_addClient ($client, $ip); }else{ socket_close ($client); } } } } } if (time () >= $nextPingCheck){ $nextPingCheck = time () + 1; $this→_checkActiveClients (); } } return true; }

/** * */ private function _checkActiveClients (){ $time = time (); foreach ($this→clients as $clientId => $socket){ if ($socket→state!= self: READY_STATE_CLOSED){ if ($socket→ready_state!== false){ if ($time >= $socket→ready_state + self: TIMEOUT_PONG){ $this→_closeClient ($clientId); $this→_removeClient ($clientId); } }elseif ($time >= $socket→time + self: TIMEOUT_RECV){ if ($socket→state!= self: READY_STATE_CONNECTING){ $this→clients[$clientId]→ready_state = time (); $this→sendClientMsg ($clientId, self: OPCODE_PING, ''); }else{ $this→_removeClient ($clientId); } } } } }

/** * @param $socket * @param string $clientIp */ private function _addClient ($socket, $clientIp = ''){ $this→clientsCount += 1; $clientId = $this→_nextId ();

$this→clients[$clientId] = (object) array ( 'socket' => $socket, 'msg_buffer' => '', 'state' => self: READY_STATE_CONNECTING, 'time' => time (), 'ready_state' => false, 'close_status' => 0, 'client_ip' => $clientIp, 'header_length' => false, 'buffer_length' => 0, 'buffer' => '', 'msg_opcode' => 0, 'msg_data_len' => 0 ); $this→ws[$clientId] = $socket; }

/** * @return int */ private function _nextId (){ $i = 1; while (isset ($this→ws[$i])) $i++; return $i; }

/** * @param $clientId */ private function _removeClient ($clientId){ if (array_key_exists ('onClose', $this→events)){ foreach ($this→events['onClose'] as $function => $args){ if (is_array ($args)){ call_user_func_array ($function, $args); }else{ call_user_func ($function, $args); } } }

socket_close ($this→clients[$clientId]→socket); unset ($this→ws[$clientId], $this→clients[$clientId]); $this→clientsCount -= 1; }

/** * @param $clientId * @return bool */ private function _closeClient ($clientId){ if ($this→clients[$clientId]→state == self: READY_STATE_CLOSING || $this→clients[$clientId]→state == self: READY_STATE_CLOSED){ return true; }

$this→clients[$clientId]→close_status = self: STATUS_PROTOCOL_ERROR; $this→sendClientMsg ($clientId, self: OPCODE_CLOSE, pack ('n', self: STATUS_PROTOCOL_ERROR)); $this→clients[$clientId]→state = self: READY_STATE_CLOSING; }

/** * @param $clientId * @param $opCode * @param $msg * @return bool */ public function sendClientMsg ($clientId, $opCode, $msg){ if ($this→clients[$clientId]→state == self: READY_STATE_CLOSING || $this→clients[$clientId]→state == self: READY_STATE_CLOSED){ return true; }

$msgLength = strlen ($msg); $buffSize = 4096;

$frameCount = ceil ($msgLength / $buffSize); if ($frameCount == 0){ $frameCount = 1; }

$maxFrame = $frameCount — 1; $lastFrameBuffLength = ($msgLength % $buffSize) != 0? $msgLength % $buffSize: ($msgLength!= 0? $buffSize: 0);

for ($i=0; $i<$frameCount; $i++){ $final = $i != $maxFrame ? 0 : self::FIN; $opCode = $i != 0 ? self::OPCODE_CONTINUATION : $opCode; $buffLength = $i != $maxFrame ? $buffSize : $lastFrameBuffLength;

if ($buffLength <= 125){ $payloadLength = $buffLength; $payloadLengthExt = ''; $payloadLengthExtL = 0; }elseif($buffLength <= 65535){ $payloadLength = self::PAYLOAD_LENGTH_16; $payloadLengthExt = pack('n', $buffLength); $payloadLengthExtL = 2; }else{ $payloadLength = self::PAYLOAD_LENGTH_63; $payloadLengthExt = pack('xxxxN', $buffLength); $payloadLengthExtL = 8; }

$buffer = pack ('n', (($final | $opCode) << 8) | $payloadLength) . $payloadLengthExt . substr($msg, $i * $buffSize, $buffLength); $socket = $this->clients[$clientId]→socket; $left = 2 + $payloadLengthExtL + $buffLength;

do{ $sent = @socket_send ($socket, $buffer, $left, 0); if ($sent === false){ return false; } $left -= $sent; if ($sent > 0){ $buffer = substr ($buffer, $sent); } } while ($left > 0); }

return true; }

/** * @param $clientId * @param $buffer * @param $bLength * @return bool */ private function _checkClient ($clientId, &$buffer, $bLength){ if ($this→clients[$clientId]→state == self: READY_STATE_OPEN){ $result = $this→_buildClientFrame ($clientId, $buffer, $bLength); }elseif ($this→clients[$clientId]→state == self: READY_STATE_CONNECTING){ $result = $this→_makeHandShake ($clientId, $buffer); if ($result){ $this→clients[$clientId]→state = self: READY_STATE_OPEN; if (array_key_exists ('onOpen', $this→events)){ foreach ($this→events['onOpen'] as $function => $args){ if (is_array ($args)){ call_user_func_array ($function, $args); }else{ call_user_func ($function, $args); } } } } }else{ $result = false; }

return $result; }

/** * @param $clientId * @param $buffer * @param $bufferLength * @return bool */ private function _buildClientFrame ($clientId, &$buffer, $bufferLength){ $this→clients[$clientId]→buffer_length += $bufferLength; $this→clients[$clientId]→buffer .= $buffer;

if ($this→clients[$clientId]→header_length!== false || $this→_checkSizeClientFrame ($clientId) == true){ $headerLength = ($this→clients[$clientId]→header_length <= 125 ? 0 : ($this->clients[$clientId]→header_length <= 65535 ? 2 : 8)) + 6; $frameLength = $this->clients[$clientId]→header_length + $headerLength; if ($this→clients[$clientId]→buffer_length >= $frameLength){ $nextFrameLength = $this→clients[$clientId]→buffer_length — $frameLength; if ($nextFrameLength > 0){ $this→clients[$clientId]→buffer_length -= $nextFrameLength; $nextFrameBytes = substr ($this→clients[$clientId]→buffer, $frameLength); $this→clients[$clientId]→buffer = substr ($this→clients[$clientId]→buffer, 0, $frameLength); }

$result = $this→_processClientFrame ($clientId);

if (isset ($this→clients[$clientId])){ $this→clients[$clientId]→header_length = false; $this→clients[$clientId]→buffer_length = 0; $this→clients[$clientId]→buffer = ''; }

if ($nextFrameLength <= 0 || !$result){ return $result; }

return $this→_buildClientFrame ($clientId, $nextFrameBytes, $nextFrameLength); } }

return true; }

/** * @param $clientId * @return bool */ private function _processClientFrame ($clientId){ $this→clients[$clientId]→time = time ();

$buffer = &$this→clients[$clientId]→buffer; if (substr ($buffer, 5, 1) === false) return false;

$a1 = ord (substr ($buffer, 0, 1)); $a2 = ord (substr ($buffer, 1, 1));

$f = $a1 & self: FIN; $opCode = $a1 & 15; $mask = $a2 & self: MASK;

if (!$mask) return false;

$seek = $this→clients[$clientId]→header_length <= 125 ? 2 : ($this->clients[$clientId]→header_length <= 65535 ? 4 : 10); $maskKey = substr($buffer, $seek, 4);

$array = unpack ('Na', $maskKey); $maskKey = $array['a']; $maskKey = array ( $maskKey >> 24, ($maskKey >> 16) & 255, ($maskKey >> 8) & 255, $maskKey & 255 ); $seek += 4;

if (substr ($buffer, $seek, 1) !== false) { $data = str_split (substr ($buffer, $seek)); foreach ($data as $key => $byte){ $data[$key] = chr (ord ($byte) ^ ($maskKey[$key % 4])); } $data = implode ('', $data); }else{ $data = ''; }

if ($opCode!= self: OPCODE_CONTINUATION && $this→clients[$clientId]→msg_data_len > 0) { $this→clients[$clientId]→msg_data_len = 0; $this→clients[$clientId]→msg_buffer = ''; }

if ($f == self: FIN){ if ($opCode!= self: OPCODE_CONTINUATION){ return $this→_processClientMsg ($clientId, $opCode, $data, $this→clients[$clientId]→header_length); }else{ $this→clients[$clientId]→msg_data_len += $this→clients[$clientId]→header_length; $this→clients[$clientId]→msg_buffer .= $data;

$result = $this→_processClientMsg ($clientId, $this→clients[$clientId]→msg_opcode, $this→clients[$clientId]→msg_buffer, $this→clients[$clientId]→msg_data_len);

if (isset ($this→clients[$clientId])){ $this→clients[$clientId]→msg_buffer = ''; $this→clients[$clientId]→msg_opcode = 0; $this→clients[$clientId]→msg_data_len = 0; }

return $result; } }else{ if ($opCode & 8) return false; $this→clients[$clientId]→msg_data_len += $this→clients[$clientId]→header_length; $this→clients[$clientId]→msg_buffer .= $data;

if ($opCode!= self: OPCODE_CONTINUATION){ $this→clients[$clientId]→msg_opcode = $opCode; } } return true; }

/** * @param $clientId * @param $opCode * @param $data * @param $dataLength * @return bool */ private function _processClientMsg ($clientId, $opCode, &$data, $dataLength){ if ($opCode == self: OPCODE_PING){ $this→sendClientMsg ($clientId, self: OPCODE_PONG, $data); }elseif ($opCode == self: OPCODE_PONG){ if ($this→clients[$clientId]→ready_state!== false){ $this→clients[$clientId]→ready_state = false; } }elseif ($opCode == self: OPCODE_CLOSE){ if ($this→clients[$clientId]→state == self: READY_STATE_CLOSING){ $this→clients[$clientId]→state = self: READY_STATE_CLOSED; }else{ // TODO: добавить типы закрытия $this→_closeClient ($clientId); } $this→_removeClient ($clientId); }elseif ($opCode == self: OPCODE_TEXT || $opCode == self: OPCODE_BINARY){ if (array_key_exists ('onMsg', $this→events)){ foreach ($this→events['onMsg'] as $function => $args){ if (is_array ($args)){ call_user_func_array ($function, $args); }else{ call_user_func ($function, $args); } } } }else{ return false; }

return true; }

/** * @param $clientId * @return bool */ private function _checkSizeClientFrame ($clientId){ if ($this→clients[$clientId]→buffer_length > 1){ $payloadLength = ord (substr ($this→clients[$clientId]→buffer, 1, 1)) & 127; if ($payloadLength <= 125) { $this->clients[$clientId]→header_length = $payloadLength; }elseif ($payloadLength == 126){ if (substr ($this→clients[$clientId]→buffer, 3, 1) !== false) { $payloadLengthExtended = substr ($this→clients[$clientId]→buffer, 2, 2); $array = unpack ('na', $payloadLengthExtended); $this→clients[$clientId]→header_length = $array['a']; } }else{ if (substr ($this→clients[$clientId]→buffer, 9, 1) !== false) { $payloadLengthExtended = substr ($this→clients[$clientId]→buffer, 2, 8); $payloadLengthExtended32_1 = substr ($payloadLengthExtended, 0, 4); $array = unpack ('Na', $payloadLengthExtended32_1);

if ($array['a'] != 0 || ord (substr ($payloadLengthExtended, 4, 1)) & 128) { $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; }

$payloadLengthExtended32_2 = substr ($payloadLengthExtended, 4, 4); $array = unpack ('Na', $payloadLengthExtended32_2);

if ($array['a'] > 2147479538) { $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; }

$this→clients[$clientId]→header_length = $array['a']; } }

if ($this→clients[$clientId]→header_length!== false) { if ($this→clients[$clientId]→header_length > self: MAX_FRAME_RECV) { $this→clients[$clientId]→header_length = false; $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; }

$controlFrame = (ord (substr ($this→clients[$clientId]→buffer, 0, 1)) & 8) == 8; if (!$controlFrame) { $newMessagePayloadLength = $this→clients[$clientId]→msg_data_len + $this→clients[$clientId]→header_length; if ($newMessagePayloadLength > self: MAX_FRAME_RECV || $newMessagePayloadLength > 2147483647) { $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; } }

return true; } } return false; }

/** * @param $clientId * @param $buffer * @return bool */ private function _makeHandShake ($clientId, $buffer){ $sep = strpos ($buffer,»\r\n\r\n»); if (!$sep) return false;

$headers = explode (»\r\n», substr ($buffer, 0, $sep)); $headersCount = sizeof ($headers); if ($headersCount < 1) return false;

$request = &$headers[0]; $requestParts = explode (' ', $request); $requestPartsSize = sizeof ($requestParts); if ($requestPartsSize < 3) return false;

if (strtoupper ($requestParts[0]) != 'GET') return false;

$httpPart = &$requestParts[$requestPartsSize — 1]; $httpParts = explode ('/', $httpPart); if (! isset ($httpParts[1]) || (float) $httpParts[1] < 1.1) return false;

$headersKeyed = array (); for ($i=1; $i<$headersCount; $i++) { $parts = explode(':', $headers[$i]); if (!isset($parts[1])) return false;

$headersKeyed[trim ($parts[0])] = trim ($parts[1]); }

if (! isset ($headersKeyed['Host'])) return false;

if (! isset ($headersKeyed['Sec-WebSocket-Key'])) return false;

$key = $headersKeyed['Sec-WebSocket-Key']; if (strlen (base64_decode ($key)) != 16) return false;

if (! isset ($headersKeyed['Sec-WebSocket-Version']) || (int) $headersKeyed['Sec-WebSocket-Version'] < 7) return false;

$hash = base64_encode (sha1($key.'258EAFA5-E914–47DA-95CA-C5AB0DC85B11', true));

$headers = array ( 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', 'Sec-WebSocket-Accept: '.$hash ); $headers = implode (»\r\n», $headers).»\r\n\r\n»; $socket = $this→clients[$clientId]→socket;

$left = strlen ($headers); do{ $sent = @socket_send ($socket, $headers, $left, 0); if ($sent === false) return false;

$left -= $sent; if ($sent > 0) $headers = substr ($headers, $sent); } while ($left > 0);

return true; } }

© Habrahabr.ru