PhP-WebSocket Server
const MAX_FRAME_RECV = 100000; const MAX_TIMEOUT = 25;
const TIMEOUT_PONG = 5; const TIMEOUT_RECV = 10;
const OPCODE_CONTINUATION = 0; const OPCODE_TEXT = 1; const OPCODE_BINARY = 2; const OPCODE_CLOSE = 8; const OPCODE_PING = 9; const OPCODE_PONG = 10;
const READY_STATE_CONNECTING = 0; const READY_STATE_OPEN = 1; const READY_STATE_CLOSING = 2; const READY_STATE_CLOSED = 3;
const PAYLOAD_LENGTH_16 = 126; const PAYLOAD_LENGTH_63 = 127;
const STATUS_PROTOCOL_ERROR = 1002; const STATUS_MESSAGE_TOO_BIG = 1004;
const FIN = 128; const MASK = 128;
/** * @var array */ public $clients = array (); /** * @var array */ public $ws = array ();
/** * @var array */ public $events = array (); /** * @var int */ public $clientsCount = 0;
/** * @var null */ public static $_instance = null;
/** * @return WebSocketApi|null */ public static function getInstance (){ if (self::$_instance === null){ self::$_instance = new self (); }
return self::$_instance; }
/** * @param string $host * @param int $port * @return bool */ public function startWServer ($host = '127.0.0.1', $port = 8000){ if (isset ($this→ws[0])) return false;
if (phpversion () >= 5.5){ cli_set_process_title («WebSockets Server 1.0»); }
if (!$this→ws[0] = socket_create (AF_INET, SOCK_STREAM, SOL_TCP)) { return false; } if (! socket_set_option ($this→ws[0], SOL_SOCKET, SO_REUSEADDR, 1)) { socket_close ($this→ws[0]); return false; } if (! socket_bind ($this→ws[0], $host, $port)) { socket_close ($this→ws[0]); return false; } if (! socket_listen ($this→ws[0], 10)) { socket_close ($this→ws[0]); return false; }
$write = null; $except = null;
$nextPingCheck = time () + 1; while (isset ($this→ws[0])){ $read = $this→ws; $count = socket_select ($read, $write, $except, 1); if ($count === false){ socket_close ($this→ws[0]); return false; }elseif ($count > 0){ foreach ($read as $clientId => $socket){ if ($clientId!= 0){ $buffer = ''; $bytes = @socket_recv ($socket, $buffer, 4096, 0);
if ($bytes === false){ $this→_closeClient ($clientId); }elseif ($bytes > 0){ if (!$this→_checkClient ($clientId, $buffer, $bytes)){ $this→_closeClient ($clientId); } }else{ $this→_removeClient ($clientId); } }else{ $client = socket_accept ($this→ws[0]); if ($client!== false){ // TODO: Добавить ограничения по макс. количеству клиентов $ip = ''; $result = socket_getpeername ($client, $ip); $ip = ip2long ($ip); if ($result!== false){ $this→_addClient ($client, $ip); }else{ socket_close ($client); } } } } } if (time () >= $nextPingCheck){ $nextPingCheck = time () + 1; $this→_checkActiveClients (); } } return true; }
/** * */ private function _checkActiveClients (){ $time = time (); foreach ($this→clients as $clientId => $socket){ if ($socket→state!= self: READY_STATE_CLOSED){ if ($socket→ready_state!== false){ if ($time >= $socket→ready_state + self: TIMEOUT_PONG){ $this→_closeClient ($clientId); $this→_removeClient ($clientId); } }elseif ($time >= $socket→time + self: TIMEOUT_RECV){ if ($socket→state!= self: READY_STATE_CONNECTING){ $this→clients[$clientId]→ready_state = time (); $this→sendClientMsg ($clientId, self: OPCODE_PING, ''); }else{ $this→_removeClient ($clientId); } } } } }
/** * @param $socket * @param string $clientIp */ private function _addClient ($socket, $clientIp = ''){ $this→clientsCount += 1; $clientId = $this→_nextId ();
$this→clients[$clientId] = (object) array ( 'socket' => $socket, 'msg_buffer' => '', 'state' => self: READY_STATE_CONNECTING, 'time' => time (), 'ready_state' => false, 'close_status' => 0, 'client_ip' => $clientIp, 'header_length' => false, 'buffer_length' => 0, 'buffer' => '', 'msg_opcode' => 0, 'msg_data_len' => 0 ); $this→ws[$clientId] = $socket; }
/** * @return int */ private function _nextId (){ $i = 1; while (isset ($this→ws[$i])) $i++; return $i; }
/** * @param $clientId */ private function _removeClient ($clientId){ if (array_key_exists ('onClose', $this→events)){ foreach ($this→events['onClose'] as $function => $args){ if (is_array ($args)){ call_user_func_array ($function, $args); }else{ call_user_func ($function, $args); } } }
socket_close ($this→clients[$clientId]→socket); unset ($this→ws[$clientId], $this→clients[$clientId]); $this→clientsCount -= 1; }
/** * @param $clientId * @return bool */ private function _closeClient ($clientId){ if ($this→clients[$clientId]→state == self: READY_STATE_CLOSING || $this→clients[$clientId]→state == self: READY_STATE_CLOSED){ return true; }
$this→clients[$clientId]→close_status = self: STATUS_PROTOCOL_ERROR; $this→sendClientMsg ($clientId, self: OPCODE_CLOSE, pack ('n', self: STATUS_PROTOCOL_ERROR)); $this→clients[$clientId]→state = self: READY_STATE_CLOSING; }
/** * @param $clientId * @param $opCode * @param $msg * @return bool */ public function sendClientMsg ($clientId, $opCode, $msg){ if ($this→clients[$clientId]→state == self: READY_STATE_CLOSING || $this→clients[$clientId]→state == self: READY_STATE_CLOSED){ return true; }
$msgLength = strlen ($msg); $buffSize = 4096;
$frameCount = ceil ($msgLength / $buffSize); if ($frameCount == 0){ $frameCount = 1; }
$maxFrame = $frameCount — 1; $lastFrameBuffLength = ($msgLength % $buffSize) != 0? $msgLength % $buffSize: ($msgLength!= 0? $buffSize: 0);
for ($i=0; $i<$frameCount; $i++){ $final = $i != $maxFrame ? 0 : self::FIN; $opCode = $i != 0 ? self::OPCODE_CONTINUATION : $opCode; $buffLength = $i != $maxFrame ? $buffSize : $lastFrameBuffLength;
if ($buffLength <= 125){ $payloadLength = $buffLength; $payloadLengthExt = ''; $payloadLengthExtL = 0; }elseif($buffLength <= 65535){ $payloadLength = self::PAYLOAD_LENGTH_16; $payloadLengthExt = pack('n', $buffLength); $payloadLengthExtL = 2; }else{ $payloadLength = self::PAYLOAD_LENGTH_63; $payloadLengthExt = pack('xxxxN', $buffLength); $payloadLengthExtL = 8; }
$buffer = pack ('n', (($final | $opCode) << 8) | $payloadLength) . $payloadLengthExt . substr($msg, $i * $buffSize, $buffLength); $socket = $this->clients[$clientId]→socket; $left = 2 + $payloadLengthExtL + $buffLength;
do{ $sent = @socket_send ($socket, $buffer, $left, 0); if ($sent === false){ return false; } $left -= $sent; if ($sent > 0){ $buffer = substr ($buffer, $sent); } } while ($left > 0); }
return true; }
/** * @param $clientId * @param $buffer * @param $bLength * @return bool */ private function _checkClient ($clientId, &$buffer, $bLength){ if ($this→clients[$clientId]→state == self: READY_STATE_OPEN){ $result = $this→_buildClientFrame ($clientId, $buffer, $bLength); }elseif ($this→clients[$clientId]→state == self: READY_STATE_CONNECTING){ $result = $this→_makeHandShake ($clientId, $buffer); if ($result){ $this→clients[$clientId]→state = self: READY_STATE_OPEN; if (array_key_exists ('onOpen', $this→events)){ foreach ($this→events['onOpen'] as $function => $args){ if (is_array ($args)){ call_user_func_array ($function, $args); }else{ call_user_func ($function, $args); } } } } }else{ $result = false; }
return $result; }
/** * @param $clientId * @param $buffer * @param $bufferLength * @return bool */ private function _buildClientFrame ($clientId, &$buffer, $bufferLength){ $this→clients[$clientId]→buffer_length += $bufferLength; $this→clients[$clientId]→buffer .= $buffer;
if ($this→clients[$clientId]→header_length!== false || $this→_checkSizeClientFrame ($clientId) == true){ $headerLength = ($this→clients[$clientId]→header_length <= 125 ? 0 : ($this->clients[$clientId]→header_length <= 65535 ? 2 : 8)) + 6; $frameLength = $this->clients[$clientId]→header_length + $headerLength; if ($this→clients[$clientId]→buffer_length >= $frameLength){ $nextFrameLength = $this→clients[$clientId]→buffer_length — $frameLength; if ($nextFrameLength > 0){ $this→clients[$clientId]→buffer_length -= $nextFrameLength; $nextFrameBytes = substr ($this→clients[$clientId]→buffer, $frameLength); $this→clients[$clientId]→buffer = substr ($this→clients[$clientId]→buffer, 0, $frameLength); }
$result = $this→_processClientFrame ($clientId);
if (isset ($this→clients[$clientId])){ $this→clients[$clientId]→header_length = false; $this→clients[$clientId]→buffer_length = 0; $this→clients[$clientId]→buffer = ''; }
if ($nextFrameLength <= 0 || !$result){ return $result; }
return $this→_buildClientFrame ($clientId, $nextFrameBytes, $nextFrameLength); } }
return true; }
/** * @param $clientId * @return bool */ private function _processClientFrame ($clientId){ $this→clients[$clientId]→time = time ();
$buffer = &$this→clients[$clientId]→buffer; if (substr ($buffer, 5, 1) === false) return false;
$a1 = ord (substr ($buffer, 0, 1)); $a2 = ord (substr ($buffer, 1, 1));
$f = $a1 & self: FIN; $opCode = $a1 & 15; $mask = $a2 & self: MASK;
if (!$mask) return false;
$seek = $this→clients[$clientId]→header_length <= 125 ? 2 : ($this->clients[$clientId]→header_length <= 65535 ? 4 : 10); $maskKey = substr($buffer, $seek, 4);
$array = unpack ('Na', $maskKey); $maskKey = $array['a']; $maskKey = array ( $maskKey >> 24, ($maskKey >> 16) & 255, ($maskKey >> 8) & 255, $maskKey & 255 ); $seek += 4;
if (substr ($buffer, $seek, 1) !== false) { $data = str_split (substr ($buffer, $seek)); foreach ($data as $key => $byte){ $data[$key] = chr (ord ($byte) ^ ($maskKey[$key % 4])); } $data = implode ('', $data); }else{ $data = ''; }
if ($opCode!= self: OPCODE_CONTINUATION && $this→clients[$clientId]→msg_data_len > 0) { $this→clients[$clientId]→msg_data_len = 0; $this→clients[$clientId]→msg_buffer = ''; }
if ($f == self: FIN){ if ($opCode!= self: OPCODE_CONTINUATION){ return $this→_processClientMsg ($clientId, $opCode, $data, $this→clients[$clientId]→header_length); }else{ $this→clients[$clientId]→msg_data_len += $this→clients[$clientId]→header_length; $this→clients[$clientId]→msg_buffer .= $data;
$result = $this→_processClientMsg ($clientId, $this→clients[$clientId]→msg_opcode, $this→clients[$clientId]→msg_buffer, $this→clients[$clientId]→msg_data_len);
if (isset ($this→clients[$clientId])){ $this→clients[$clientId]→msg_buffer = ''; $this→clients[$clientId]→msg_opcode = 0; $this→clients[$clientId]→msg_data_len = 0; }
return $result; } }else{ if ($opCode & 8) return false; $this→clients[$clientId]→msg_data_len += $this→clients[$clientId]→header_length; $this→clients[$clientId]→msg_buffer .= $data;
if ($opCode!= self: OPCODE_CONTINUATION){ $this→clients[$clientId]→msg_opcode = $opCode; } } return true; }
/** * @param $clientId * @param $opCode * @param $data * @param $dataLength * @return bool */ private function _processClientMsg ($clientId, $opCode, &$data, $dataLength){ if ($opCode == self: OPCODE_PING){ $this→sendClientMsg ($clientId, self: OPCODE_PONG, $data); }elseif ($opCode == self: OPCODE_PONG){ if ($this→clients[$clientId]→ready_state!== false){ $this→clients[$clientId]→ready_state = false; } }elseif ($opCode == self: OPCODE_CLOSE){ if ($this→clients[$clientId]→state == self: READY_STATE_CLOSING){ $this→clients[$clientId]→state = self: READY_STATE_CLOSED; }else{ // TODO: добавить типы закрытия $this→_closeClient ($clientId); } $this→_removeClient ($clientId); }elseif ($opCode == self: OPCODE_TEXT || $opCode == self: OPCODE_BINARY){ if (array_key_exists ('onMsg', $this→events)){ foreach ($this→events['onMsg'] as $function => $args){ if (is_array ($args)){ call_user_func_array ($function, $args); }else{ call_user_func ($function, $args); } } } }else{ return false; }
return true; }
/** * @param $clientId * @return bool */ private function _checkSizeClientFrame ($clientId){ if ($this→clients[$clientId]→buffer_length > 1){ $payloadLength = ord (substr ($this→clients[$clientId]→buffer, 1, 1)) & 127; if ($payloadLength <= 125) { $this->clients[$clientId]→header_length = $payloadLength; }elseif ($payloadLength == 126){ if (substr ($this→clients[$clientId]→buffer, 3, 1) !== false) { $payloadLengthExtended = substr ($this→clients[$clientId]→buffer, 2, 2); $array = unpack ('na', $payloadLengthExtended); $this→clients[$clientId]→header_length = $array['a']; } }else{ if (substr ($this→clients[$clientId]→buffer, 9, 1) !== false) { $payloadLengthExtended = substr ($this→clients[$clientId]→buffer, 2, 8); $payloadLengthExtended32_1 = substr ($payloadLengthExtended, 0, 4); $array = unpack ('Na', $payloadLengthExtended32_1);
if ($array['a'] != 0 || ord (substr ($payloadLengthExtended, 4, 1)) & 128) { $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; }
$payloadLengthExtended32_2 = substr ($payloadLengthExtended, 4, 4); $array = unpack ('Na', $payloadLengthExtended32_2);
if ($array['a'] > 2147479538) { $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; }
$this→clients[$clientId]→header_length = $array['a']; } }
if ($this→clients[$clientId]→header_length!== false) { if ($this→clients[$clientId]→header_length > self: MAX_FRAME_RECV) { $this→clients[$clientId]→header_length = false; $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; }
$controlFrame = (ord (substr ($this→clients[$clientId]→buffer, 0, 1)) & 8) == 8; if (!$controlFrame) { $newMessagePayloadLength = $this→clients[$clientId]→msg_data_len + $this→clients[$clientId]→header_length; if ($newMessagePayloadLength > self: MAX_FRAME_RECV || $newMessagePayloadLength > 2147483647) { $this→_closeClient ($clientId, self: STATUS_MESSAGE_TOO_BIG); return false; } }
return true; } } return false; }
/** * @param $clientId * @param $buffer * @return bool */ private function _makeHandShake ($clientId, $buffer){ $sep = strpos ($buffer,»\r\n\r\n»); if (!$sep) return false;
$headers = explode (»\r\n», substr ($buffer, 0, $sep)); $headersCount = sizeof ($headers); if ($headersCount < 1) return false;
$request = &$headers[0]; $requestParts = explode (' ', $request); $requestPartsSize = sizeof ($requestParts); if ($requestPartsSize < 3) return false;
if (strtoupper ($requestParts[0]) != 'GET') return false;
$httpPart = &$requestParts[$requestPartsSize — 1]; $httpParts = explode ('/', $httpPart); if (! isset ($httpParts[1]) || (float) $httpParts[1] < 1.1) return false;
$headersKeyed = array (); for ($i=1; $i<$headersCount; $i++) { $parts = explode(':', $headers[$i]); if (!isset($parts[1])) return false;
$headersKeyed[trim ($parts[0])] = trim ($parts[1]); }
if (! isset ($headersKeyed['Host'])) return false;
if (! isset ($headersKeyed['Sec-WebSocket-Key'])) return false;
$key = $headersKeyed['Sec-WebSocket-Key']; if (strlen (base64_decode ($key)) != 16) return false;
if (! isset ($headersKeyed['Sec-WebSocket-Version']) || (int) $headersKeyed['Sec-WebSocket-Version'] < 7) return false;
$hash = base64_encode (sha1($key.'258EAFA5-E914–47DA-95CA-C5AB0DC85B11', true));
$headers = array ( 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', 'Sec-WebSocket-Accept: '.$hash ); $headers = implode (»\r\n», $headers).»\r\n\r\n»; $socket = $this→clients[$clientId]→socket;
$left = strlen ($headers); do{ $sent = @socket_send ($socket, $headers, $left, 0); if ($sent === false) return false;
$left -= $sent; if ($sent > 0) $headers = substr ($headers, $sent); } while ($left > 0);
return true; } }