0,0
рейтинг
22 января 2014 в 13:56

Разработка → Делаем вебсокеты на PHP с нуля из песочницы

Некоторое время назад я выбирал библиотеку для работы с вебсокетами. На просторах интернета я натыкался на статьи по интеграции node.js с yii, а почти все статьи о вебсокетах на хабре ограничивались инструкциями к тому, как использовать phpdaemon.

Я изучал библиотеки phpdaemon и ratchet, они достаточно монструозны (причём используя ratchet для отправки сообщения конкретному пользователю рекомендовано дополнительно использовать wamp). Мне не совсем было понятно для чего использовать таких монстров, которые требуют установку других монстров. Почитав исходники этих, а также других библиотек, я разобрался как всё устроено и мне захотелось написать простой вебсокет-сервер на php самостоятельно. Это помогло мне закрепить изученный материал и наткнуться на некоторые подводные камни, о которых я не имел представления.

Так я решил написать необходимый для меня функционал с нуля.

Получившийся код и ссылка на демонстрационный чат в конце статьи.

Поставленные цели:


1) разобраться с серверными сокетами в php
2) разобраться с протоколом вебсокетов
3) написать с нуля простой сервер вебсокетов

1) Серверные сокеты в php


До этого момента я имел смутные представления о серверных сокетах. Почитав исходники нескольких библиотек для работы с вебсокетами я столкнулся с двумя схемами их реализаций:

используя расширение php «socket»:
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);//создаём сокет
socket_bind($socket, '127.0.0.1', 8000);//привязываем его к указанным ip и порту
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);//разрешаем использовать один порт для нескольких соединений
socket_listen($socket);//слушаем сокет

или используя расширение php «stream»:
$socket = stream_socket_server("tcp://127.0.0.1:8000", $errno, $errstr);

Я предпочёл второй вариант ввиду его краткости.

Итак, мы создали серверный сокет и теперь хотим обрабатывать новые соединения к нему, для этого опять же есть два варианта
while ($connect = stream_socket_accept($socket, -1)) {//ожидаем новое соединение (без таймаута)
    ...обрабатываем $connect
}

Пример простого http сервера, который на все запросы отвечает: Привет
#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)\n");
}

while ($connect = stream_socket_accept($socket, -1)) {
    fwrite($connect, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nПривет");
    fclose($connect);
}

fclose($socket);



или с использованием stream_select

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read[] = $socket;
    $write = $except = null;
    
    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        $connect = stream_socket_accept($socket, -1);//принимаем новое соединение
        $connects[] = $connect;//добавляем его в список необходимых для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        ...обрабатываем $connect
        unset($connects[ array_search($connect, $connects) ]);
    }
}

Пример простого http сервера с использованием stream_select, который на все запросы отвечает: Привет
#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)\n");
}

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read []= $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        $connect = stream_socket_accept($socket, -1);//принимаем новое соединение
        $connects[] = $connect;//добавляем его в список необходимых для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        $headers = '';
        while ($buffer = rtrim(fgets($connect))) {
            $headers .= $buffer;
        }
        fwrite($connect, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nПривет");
        fclose($connect);
        unset($connects[ array_search($connect, $connects) ]);
    }
}

fclose($server);


Т.к. нам в дальнейшем нужно будет одновременно обрабатывать и серверный сокет на предмет новых соединений, и уже существующие подключения, на предмет новых сообщений, то остановимся на втором варианте.

2) Протокол вебсокетов


В этой статье хорошо описан протокол взаимодействия.
Нас интересует два момента:
«Рукопожатие» или handshake:

Считываем значение Sec-WebSocket-Key из пришедшего заголовка от клиента, рассчитываем на его основе Sec-WebSocket-Accept и отправляем итоговый ответ:
$SecWebSocketAccept = base64_encode(pack('H*', sha1($SecWebSocketKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
    "Upgrade: websocket\r\n" .
    "Connection: Upgrade\r\n" .
    "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";

Пример функции, которая это делает
function handshake($connect) {
    $info = array();

    $line = fgets($connect);
    $header = explode(' ', $line);
    $info['method'] = $header[0];
    $info['uri'] = $header[1];

    //считываем заголовки из соединения
    while ($line = rtrim(fgets($connect))) {
        if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
            $info[$matches[1]] = $matches[2];
        } else {
            break;
        }
    }

    $address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес клиента
    $info['ip'] = $address[0];
    $info['port'] = $address[1];

    if (empty($info['Sec-WebSocket-Key'])) {
        return false;
    }

    //отправляем заголовок согласно протоколу вебсокета
    $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
	"Upgrade: websocket\r\n" .
	"Connection: Upgrade\r\n" .
	"Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
    fwrite($connect, $upgrade);

    return $info;
}



обмен сообщениями

После получения данных из вебсокета нам нужно их раскодировать, а при отправке закодировать.
Всё в той же статье хорошо описано кодирование сообщений, но нам по-сути нужны только две функции: decode и encode.
Пример реализации функций decode и encode
function decode($data)
{
    $unmaskedPayload = '';
    $decodedData = array();

    // estimate frame type:
    $firstByteBinary = sprintf('%08b', ord($data[0]));
    $secondByteBinary = sprintf('%08b', ord($data[1]));
    $opcode = bindec(substr($firstByteBinary, 4, 4));
    $isMasked = ($secondByteBinary[0] == '1') ? true : false;
    $payloadLength = ord($data[1]) & 127;

    // unmasked frame is received:
    if (!$isMasked) {
        return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
    }

    switch ($opcode) {
        // text frame:
        case 1:
            $decodedData['type'] = 'text';
            break;

        case 2:
            $decodedData['type'] = 'binary';
            break;

        // connection close frame:
        case 8:
            $decodedData['type'] = 'close';
            break;

        // ping frame:
        case 9:
            $decodedData['type'] = 'ping';
            break;

        // pong frame:
        case 10:
            $decodedData['type'] = 'pong';
            break;

        default:
            return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
    }

    if ($payloadLength === 126) {
        $mask = substr($data, 4, 4);
        $payloadOffset = 8;
        $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
    } elseif ($payloadLength === 127) {
        $mask = substr($data, 10, 4);
        $payloadOffset = 14;
        $tmp = '';
        for ($i = 0; $i < 8; $i++) {
            $tmp .= sprintf('%08b', ord($data[$i + 2]));
        }
        $dataLength = bindec($tmp) + $payloadOffset;
        unset($tmp);
    } else {
        $mask = substr($data, 2, 4);
        $payloadOffset = 6;
        $dataLength = $payloadLength + $payloadOffset;
    }

    /**
     * We have to check for large frames here. socket_recv cuts at 1024 bytes
     * so if websocket-frame is > 1024 bytes we have to wait until whole
     * data is transferd.
     */
    if (strlen($data) < $dataLength) {
        return false;
    }

    if ($isMasked) {
        for ($i = $payloadOffset; $i < $dataLength; $i++) {
            $j = $i - $payloadOffset;
            if (isset($data[$i])) {
                $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
            }
        }
        $decodedData['payload'] = $unmaskedPayload;
    } else {
        $payloadOffset = $payloadOffset - 4;
        $decodedData['payload'] = substr($data, $payloadOffset);
    }

    return $decodedData;
}

function encode($payload, $type = 'text', $masked = false)
{
    $frameHead = array();
    $payloadLength = strlen($payload);

    switch ($type) {
        case 'text':
            // first byte indicates FIN, Text-Frame (10000001):
            $frameHead[0] = 129;
            break;

        case 'close':
            // first byte indicates FIN, Close Frame(10001000):
            $frameHead[0] = 136;
            break;

        case 'ping':
            // first byte indicates FIN, Ping frame (10001001):
            $frameHead[0] = 137;
            break;

        case 'pong':
            // first byte indicates FIN, Pong frame (10001010):
            $frameHead[0] = 138;
            break;
    }

    // set mask and payload length (using 1, 3 or 9 bytes)
    if ($payloadLength > 65535) {
        $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 255 : 127;
        for ($i = 0; $i < 8; $i++) {
            $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
        }
        // most significant bit MUST be 0
        if ($frameHead[2] > 127) {
            return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
        }
    } elseif ($payloadLength > 125) {
        $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 254 : 126;
        $frameHead[2] = bindec($payloadLengthBin[0]);
        $frameHead[3] = bindec($payloadLengthBin[1]);
    } else {
        $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
    }

    // convert frame-head to string:
    foreach (array_keys($frameHead) as $i) {
        $frameHead[$i] = chr($frameHead[$i]);
    }
    if ($masked === true) {
        // generate a random mask:
        $mask = array();
        for ($i = 0; $i < 4; $i++) {
            $mask[$i] = chr(rand(0, 255));
        }

        $frameHead = array_merge($frameHead, $mask);
    }
    $frame = implode('', $frameHead);

    // append payload to frame:
    for ($i = 0; $i < $payloadLength; $i++) {
        $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
    }

    return $frame;
}



Простой сервер вебсокетов


Итак, у нас есть вся необходимая информация.
Используя код простого http сервера из первой части, а также функции handshake, decode и encode из второй мы можем собрать простой сервер вебсокетов.

Пример реализации простого сервера вебсокетов
#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)\n");
}

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read []= $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        //принимаем новое соединение и производим рукопожатие:
        if (($connect = stream_socket_accept($socket, -1)) && $info = handshake($connect)) {
            $connects[] = $connect;//добавляем его в список необходимых для обработки
            onOpen($connect, $info);//вызываем пользовательский сценарий
        }
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        $data = fread($connect, 100000);

        if (!$data) { //соединение было закрыто
            fclose($connect);
            unset($connects[ array_search($connect, $connects) ]);
            onClose($connect);//вызываем пользовательский сценарий
            continue;
        }

        onMessage($connect, $data);//вызываем пользовательский сценарий
    }
}

fclose($server);

function handshake($connect) {
    $info = array();

    $line = fgets($connect);
    $header = explode(' ', $line);
    $info['method'] = $header[0];
    $info['uri'] = $header[1];

    //считываем заголовки из соединения
    while ($line = rtrim(fgets($connect))) {
        if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
            $info[$matches[1]] = $matches[2];
        } else {
            break;
        }
    }

    $address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес клиента
    $info['ip'] = $address[0];
    $info['port'] = $address[1];

    if (empty($info['Sec-WebSocket-Key'])) {
        return false;
    }

    //отправляем заголовок согласно протоколу вебсокета
    $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
        "Upgrade: websocket\r\n" .
        "Connection: Upgrade\r\n" .
        "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
    fwrite($connect, $upgrade);

    return $info;
}

function encode($payload, $type = 'text', $masked = false)
{
    $frameHead = array();
    $payloadLength = strlen($payload);

    switch ($type) {
        case 'text':
            // first byte indicates FIN, Text-Frame (10000001):
            $frameHead[0] = 129;
            break;

        case 'close':
            // first byte indicates FIN, Close Frame(10001000):
            $frameHead[0] = 136;
            break;

        case 'ping':
            // first byte indicates FIN, Ping frame (10001001):
            $frameHead[0] = 137;
            break;

        case 'pong':
            // first byte indicates FIN, Pong frame (10001010):
            $frameHead[0] = 138;
            break;
    }

    // set mask and payload length (using 1, 3 or 9 bytes)
    if ($payloadLength > 65535) {
        $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 255 : 127;
        for ($i = 0; $i < 8; $i++) {
            $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
        }
        // most significant bit MUST be 0
        if ($frameHead[2] > 127) {
            return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
        }
    } elseif ($payloadLength > 125) {
        $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 254 : 126;
        $frameHead[2] = bindec($payloadLengthBin[0]);
        $frameHead[3] = bindec($payloadLengthBin[1]);
    } else {
        $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
    }

    // convert frame-head to string:
    foreach (array_keys($frameHead) as $i) {
        $frameHead[$i] = chr($frameHead[$i]);
    }
    if ($masked === true) {
        // generate a random mask:
        $mask = array();
        for ($i = 0; $i < 4; $i++) {
            $mask[$i] = chr(rand(0, 255));
        }

        $frameHead = array_merge($frameHead, $mask);
    }
    $frame = implode('', $frameHead);

    // append payload to frame:
    for ($i = 0; $i < $payloadLength; $i++) {
        $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
    }

    return $frame;
}

function decode($data)
{
    $unmaskedPayload = '';
    $decodedData = array();

    // estimate frame type:
    $firstByteBinary = sprintf('%08b', ord($data[0]));
    $secondByteBinary = sprintf('%08b', ord($data[1]));
    $opcode = bindec(substr($firstByteBinary, 4, 4));
    $isMasked = ($secondByteBinary[0] == '1') ? true : false;
    $payloadLength = ord($data[1]) & 127;

    // unmasked frame is received:
    if (!$isMasked) {
        return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
    }

    switch ($opcode) {
        // text frame:
        case 1:
            $decodedData['type'] = 'text';
            break;

        case 2:
            $decodedData['type'] = 'binary';
            break;

        // connection close frame:
        case 8:
            $decodedData['type'] = 'close';
            break;

        // ping frame:
        case 9:
            $decodedData['type'] = 'ping';
            break;

        // pong frame:
        case 10:
            $decodedData['type'] = 'pong';
            break;

        default:
            return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
    }

    if ($payloadLength === 126) {
        $mask = substr($data, 4, 4);
        $payloadOffset = 8;
        $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
    } elseif ($payloadLength === 127) {
        $mask = substr($data, 10, 4);
        $payloadOffset = 14;
        $tmp = '';
        for ($i = 0; $i < 8; $i++) {
            $tmp .= sprintf('%08b', ord($data[$i + 2]));
        }
        $dataLength = bindec($tmp) + $payloadOffset;
        unset($tmp);
    } else {
        $mask = substr($data, 2, 4);
        $payloadOffset = 6;
        $dataLength = $payloadLength + $payloadOffset;
    }

    /**
     * We have to check for large frames here. socket_recv cuts at 1024 bytes
     * so if websocket-frame is > 1024 bytes we have to wait until whole
     * data is transferd.
     */
    if (strlen($data) < $dataLength) {
        return false;
    }

    if ($isMasked) {
        for ($i = $payloadOffset; $i < $dataLength; $i++) {
            $j = $i - $payloadOffset;
            if (isset($data[$i])) {
                $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
            }
        }
        $decodedData['payload'] = $unmaskedPayload;
    } else {
        $payloadOffset = $payloadOffset - 4;
        $decodedData['payload'] = substr($data, $payloadOffset);
    }

    return $decodedData;
}

//пользовательские сценарии:

function onOpen($connect, $info) {
    echo "open\n";
    fwrite($connect, encode('Привет'));
}

function onClose($connect) {
    echo "close\n";
}

function onMessage($connect, $data) {
    echo decode($data)['payload'] . "\n";
}



В приведённом примере можно менять пользовательские сценарии onOpen, onClose и onMessage для реализации необходимого функционала.

Поставленные цели достигнуты.
Если этот материал вам покажется интересным, то в следующей статье я опишу как можно запускать несколько процессов для обработки соединений (один мастер и несколько воркеров), межпроцессное взаимодействие, интеграцию с вашим фреймворком на примере компонента yii.

демонстрационный чат с вышеописанными функциями

Код демонстрационного чата
#!/usr/bin/env php
<?php

class WebsocketServer
{
    public function __construct($config) {
        $this->config = $config;
    }

    public function start() {
        //открываем серверный сокет
        $server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errorNumber, $errorString);

        if (!$server) {
            die("error: stream_socket_server: $errorString ($errorNumber)\r\n");
        }

        list($pid, $master, $workers) = $this->spawnWorkers();//создаём дочерние процессы

        if ($pid) {//мастер
            fclose($server);//мастер не будет обрабатывать входящие соединения на основном сокете
            $WebsocketMaster = new WebsocketMaster($workers);//мастер будет пересылать сообщения между воркерами
            $WebsocketMaster->start();
        } else {//воркер
            $WebsocketHandler = new WebsocketHandler($server, $master);
            $WebsocketHandler->start();
        }
    }

    protected function spawnWorkers() {
        $master = null;
        $workers = array();
        $i = 0;
        while ($i < $this->config['workers']) {
            $i++;
            //создаём парные сокеты, через них будут связываться мастер и воркер
            $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

            $pid = pcntl_fork();//создаём форк
            if ($pid == -1) {
                die("error: pcntl_fork\r\n");
            } elseif ($pid) { //мастер
                fclose($pair[0]);
                $workers[$pid] = $pair[1];//один из пары будет в мастере
            } else { //воркер
                fclose($pair[1]);
                $master = $pair[0];//второй в воркере
                break;
            }
        }

        return array($pid, $master, $workers);
    }
}

class WebsocketMaster
{
    protected $workers = array();
    protected $clients = array();

    public function __construct($workers) {
        $this->clients = $this->workers = $workers;
    }

    public function start() {
        while (true) {
            //подготавливаем массив всех сокетов, которые нужно обработать
            $read = $this->clients;

            stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать

            if ($read) {//пришли данные от подключенных клиентов
                foreach ($read as $client) {
                    $data = fread($client, 1000);

                    if (!$data) { //соединение было закрыто
                        unset($this->clients[intval($client)]);
                        @fclose($client);
                        continue;
                    }

                    foreach ($this->workers as $worker) {//пересылаем данные во все воркеры
                        if ($worker !== $client) {
                            fwrite($worker, $data);
                        }
                    }
                }
            }
        }
    }
}

abstract class WebsocketWorker
{
    protected $clients = array();
    protected $server;
    protected $master;
    protected $pid;
    protected $handshakes = array();
    protected $ips = array();

    public function __construct($server, $master) {
        $this->server = $server;
        $this->master = $master;
        $this->pid = posix_getpid();
    }

    public function start() {
        while (true) {
            //подготавливаем массив всех сокетов, которые нужно обработать
            $read = $this->clients;
            $read[] = $this->server;
            $read[] = $this->master;

            $write = array();
            if ($this->handshakes) {
                foreach ($this->handshakes as $clientId => $clientInfo) {
                    if ($clientInfo) {
                        $write[] = $this->clients[$clientId];
                    }
                }
            }

            stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать

            if (in_array($this->server, $read)) { //на серверный сокет пришёл запрос от нового клиента
                //подключаемся к нему и делаем рукопожатие, согласно протоколу вебсокета
                if ($client = stream_socket_accept($this->server, -1)) {
                    $address = explode(':', stream_socket_get_name($client, true));
                    if (isset($this->ips[$address[0]]) && $this->ips[$address[0]] > 5) {//блокируем более пяти соединий с одного ip
                        @fclose($client);
                    } else {
                        @$this->ips[$address[0]]++;

                        $this->clients[intval($client)] = $client;
                        $this->handshakes[intval($client)] = array();//отмечаем, что нужно сделать рукопожатие
                    }
                }

                //удаляем сервеный сокет из массива, чтобы не обработать его в этом цикле ещё раз
                unset($read[array_search($this->server, $read)]);
            }

            if (in_array($this->master, $read)) { //пришли данные от мастера
                $data = fread($this->master, 1000);

                $this->onSend($data);//вызываем пользовательский сценарий

                //удаляем мастера из массива, чтобы не обработать его в этом цикле ещё раз
                unset($read[array_search($this->master, $read)]);
            }

            if ($read) {//пришли данные от подключенных клиентов
                foreach ($read as $client) {
                    if (isset($this->handshakes[intval($client)])) {
                        if ($this->handshakes[intval($client)]) {//если уже было получено рукопожатие от клиента
                            continue;//то до отправки ответа от сервера читать здесь пока ничего не надо
                        }

                        if (!$this->handshake($client)) {
                            unset($this->clients[intval($client)]);
                            unset($this->handshakes[intval($client)]);
                            $address = explode(':', stream_socket_get_name($client, true));
                            if (isset($this->ips[$address[0]]) && $this->ips[$address[0]] > 0) {
                                @$this->ips[$address[0]]--;
                            }
                            @fclose($client);
                        }
                    } else {
                        $data = fread($client, 1000);

                        if (!$data) { //соединение было закрыто
                            unset($this->clients[intval($client)]);
                            unset($this->handshakes[intval($client)]);
                            $address = explode(':', stream_socket_get_name($client, true));
                            if (isset($this->ips[$address[0]]) && $this->ips[$address[0]] > 0) {
                                @$this->ips[$address[0]]--;
                            }
                            @fclose($client);
                            $this->onClose($client);//вызываем пользовательский сценарий
                            continue;
                        }

                        $this->onMessage($client, $data);//вызываем пользовательский сценарий
                    }
                }
            }

            if ($write) {
                foreach ($write as $client) {
                    if (!$this->handshakes[intval($client)]) {//если ещё не было получено рукопожатие от клиента
                        continue;//то отвечать ему рукопожатием ещё рано
                    }
                    $info = $this->handshake($client);
                    $this->onOpen($client, $info);//вызываем пользовательский сценарий
                }
            }
        }
    }

    protected function handshake($client) {
        $key = $this->handshakes[intval($client)];

        if (!$key) {
            //считываем загаловки из соединения
            $headers = fread($client, 10000);
            preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $headers, $match);

            if (empty($match[1])) {
                return false;
            }

            $key = $match[1];

            $this->handshakes[intval($client)] = $key;
        } else {
            //отправляем заголовок согласно протоколу вебсокета
            $SecWebSocketAccept = base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
            $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
                "Upgrade: websocket\r\n" .
                "Connection: Upgrade\r\n" .
                "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
            fwrite($client, $upgrade);
            unset($this->handshakes[intval($client)]);
        }

        return $key;
    }

    protected function encode($payload, $type = 'text', $masked = false)
    {
        $frameHead = array();
        $payloadLength = strlen($payload);

        switch ($type) {
            case 'text':
                // first byte indicates FIN, Text-Frame (10000001):
                $frameHead[0] = 129;
                break;

            case 'close':
                // first byte indicates FIN, Close Frame(10001000):
                $frameHead[0] = 136;
                break;

            case 'ping':
                // first byte indicates FIN, Ping frame (10001001):
                $frameHead[0] = 137;
                break;

            case 'pong':
                // first byte indicates FIN, Pong frame (10001010):
                $frameHead[0] = 138;
                break;
        }

        // set mask and payload length (using 1, 3 or 9 bytes)
        if ($payloadLength > 65535) {
            $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 255 : 127;
            for ($i = 0; $i < 8; $i++) {
                $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
            }
            // most significant bit MUST be 0
            if ($frameHead[2] > 127) {
                return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
            }
        } elseif ($payloadLength > 125) {
            $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 254 : 126;
            $frameHead[2] = bindec($payloadLengthBin[0]);
            $frameHead[3] = bindec($payloadLengthBin[1]);
        } else {
            $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
        }

        // convert frame-head to string:
        foreach (array_keys($frameHead) as $i) {
            $frameHead[$i] = chr($frameHead[$i]);
        }
        if ($masked === true) {
            // generate a random mask:
            $mask = array();
            for ($i = 0; $i < 4; $i++) {
                $mask[$i] = chr(rand(0, 255));
            }

            $frameHead = array_merge($frameHead, $mask);
        }
        $frame = implode('', $frameHead);

        // append payload to frame:
        for ($i = 0; $i < $payloadLength; $i++) {
            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
        }

        return $frame;
    }

    protected function decode($data)
    {
        $unmaskedPayload = '';
        $decodedData = array();

        // estimate frame type:
        $firstByteBinary = sprintf('%08b', ord($data[0]));
        $secondByteBinary = sprintf('%08b', ord($data[1]));
        $opcode = bindec(substr($firstByteBinary, 4, 4));
        $isMasked = ($secondByteBinary[0] == '1') ? true : false;
        $payloadLength = ord($data[1]) & 127;

        // unmasked frame is received:
        if (!$isMasked) {
            return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
        }

        switch ($opcode) {
            // text frame:
            case 1:
                $decodedData['type'] = 'text';
                break;

            case 2:
                $decodedData['type'] = 'binary';
                break;

            // connection close frame:
            case 8:
                $decodedData['type'] = 'close';
                break;

            // ping frame:
            case 9:
                $decodedData['type'] = 'ping';
                break;

            // pong frame:
            case 10:
                $decodedData['type'] = 'pong';
                break;

            default:
                return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
        }

        if ($payloadLength === 126) {
            $mask = substr($data, 4, 4);
            $payloadOffset = 8;
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
        } elseif ($payloadLength === 127) {
            $mask = substr($data, 10, 4);
            $payloadOffset = 14;
            $tmp = '';
            for ($i = 0; $i < 8; $i++) {
                $tmp .= sprintf('%08b', ord($data[$i + 2]));
            }
            $dataLength = bindec($tmp) + $payloadOffset;
            unset($tmp);
        } else {
            $mask = substr($data, 2, 4);
            $payloadOffset = 6;
            $dataLength = $payloadLength + $payloadOffset;
        }

        /**
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
         * so if websocket-frame is > 1024 bytes we have to wait until whole
         * data is transferd.
         */
        if (strlen($data) < $dataLength) {
            return false;
        }

        if ($isMasked) {
            for ($i = $payloadOffset; $i < $dataLength; $i++) {
                $j = $i - $payloadOffset;
                if (isset($data[$i])) {
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
                }
            }
            $decodedData['payload'] = $unmaskedPayload;
        } else {
            $payloadOffset = $payloadOffset - 4;
            $decodedData['payload'] = substr($data, $payloadOffset);
        }

        return $decodedData;
    }

    abstract protected function onOpen($client, $info);

    abstract protected function onClose($client);

    abstract protected function onMessage($client, $data);

    abstract protected function onSend($data);

    abstract protected function send($data);
}

//пример реализации чата
class WebsocketHandler extends WebsocketWorker
{
    protected function onOpen($client, $info) {//вызывается при соединении с новым клиентом

    }

    protected function onClose($client) {//вызывается при закрытии соединения клиентом

    }

    protected function onMessage($client, $data) {//вызывается при получении сообщения от клиента
        $data = $this->decode($data);

        if (!$data['payload']) {
            return;
        }

        if (!mb_check_encoding($data['payload'], 'utf-8')) {
            return;
        }
        //var_export($data);
        //шлем всем сообщение, о том, что пишет один из клиентов
        $message = 'пользователь #' . intval($client) . ' (' . $this->pid . '): ' . strip_tags($data['payload']);
        $this->send($message);

        $this->sendHelper($message);
    }

    protected function onSend($data) {//вызывается при получении сообщения от мастера
        $this->sendHelper($data);
    }

    protected function send($message) {//отправляем сообщение на мастер, чтобы он разослал его на все воркеры
        @fwrite($this->master, $message);
    }

    private function sendHelper($data) {
        $data = $this->encode($data);

        $write = $this->clients;
        if (stream_select($read, $write, $except, 0)) {
            foreach ($write as $client) {
                @fwrite($client, $data);
            }
        }
    }
}

$config = array(
    'host' => '0.0.0.0',
    'port' => 8000,
    'workers' => 1,
);

$WebsocketServer = new WebsocketServer($config);
$WebsocketServer->start();


Update (лучшее из комментариев):


  • на одно соединение уходит около 9кб памяти
  • если при работе с открытыми сокетами использовать fgets(), то можно получить «зависание» (функция будет ожидать конца строки или таймаута), потому что по протоколу вебсокета сообщение не заканчивается переносом строки. используйте fread()
  • при записи ответа в сокет функцией fwrite() (функция возвращает количество записанных байт) необходимо проверять, что в сокет были записаны все данные
  • при ответе с сервера нужно проверять готовность клиента получать данные при помощи функции stream_socket_accept()
  • если на сервере в сокет писать символы, отсутствующие в utf-8, то клиент будет разрывать соединение с ошибкой:
    WebSocket connection to 'ws://sharoid.ru:8000/' failed: Could not decode a text frame as UTF-8.
  • при проверке, что клиент не прислал никаких данных и нужно закрывать сокет используйте !strlen($data), а не !$data
  • перед сервером вебсокетов можно поставить nginx

Вторая часть статьи: IPC. Межпроцессное взаимодействие
Третья часть статьи: От чата до игры: Battle City
Исходный код библиотеки и примеров лежит на гитхабе и доступен под лицензией MIT
Владимир Гончаров @morozovsk
карма
27,7
рейтинг 0,0
Backend developer
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Реклама

Самое читаемое Разработка

Комментарии (74)

  • +2
    Спасибо за код!
    может пригодится) Но я бы добавил хотя бы strip_tags)
  • 0
    Чат очень сильно глючит, это из-за нагрузок?
    • 0
      Не думаю. Скорее всего это связано с настройками окружения на сервере. Локально я тестировал, открывая 10к сокетов одновременно и проблем не было. Сейчас разбираюсь, в чём проблема на сервере.
      • 0
        то что вы открыли 10К соединений никак не покажет вам как сервер держит нагрузку.
      • 0
        Будет круто, если расскажите сколько памяти и процессора «съел» PHP и остальное при 10K подключиях.
        • 0
          около 90 мб на 10к соединений, т.е. 9кб на одно соединение
          • 0
            Хороший показатель с памятью! А что с процессором?
            • 0
              если сообщений нет, то процессор вообще не жрёт.
              если отправить сообщение с одного клиента на остальные 10к, то на долю секунды скачёк до 20% процессора.
              если в чате зажимаю и не отпускаю enter, то нагрузка до 50% процессора.
              • 0
                Если в линуксе iptables будет настраивать криворукий админ, тогда показатели процессора могут стать печальней)
              • +1
                50% процессора циферка довольно относительная. Подозреваю что у вас двухядерная система и выжирается ядро целиком.
  • 0
    Пока что (на момент написания комментария!), увы, демонстрационный чат небезопасен — спокойно выполняет любой пришедший JS.
    • +1
      Автор не дремлет, уже не выполняется.
  • +1
    Через рандомные промежутки отключает с сообщением в консоль (Хром 28.0.1500.95 убунта):
    WebSocket connection to 'ws://sharoid.ru:8000/' failed: Could not decode a text frame as UTF-8.
    • 0
      спасибо. как временное решение: на закрытие повесил перезапуск соекта, теперь не выбрасывает.
  • 0
    Как приятно, что не перевелись еще Кулибины! ) Так держать
    В данной реализации (если я ничего не пропустил) не обрабатывается вариант, когда одно сообщение содержит более 100000 байт. Или же по каким-то причинам одновременно прочиталось два сообщения. Я на этот случай обычно использую входящий буфер на каждый коннект. Сервер складывает все, что пришло от клиента, в этот буфер, а дальше происходит уже анализ содержимого этого буфера.
    • 0
      опять же в дополнение к такой обработке появляется потребность убирать блокировку сокета, чтобы (пока ничего не пришло) могли обрабатываться сообщения, уже имеющиеся в буффере, либо могли выполняться какие-то внутренние действия
    • 0
      Какой смысл в своем буфере, если такой буфер уже есть в операционной системе? Если вам не хватает дефолтного размера буфера чтения/записи, никто не помешает вам указать свой размер.
      • +1
        Смысл в том, сообщение может приходить кусками. Пришедший кусок нельзя не читать — иначе он так и будет сигналить о готовности чтения каждый раз — но и обработать прочитанное еще нельзя.

        Отсюда, кстати, и валятся периодически ошибки у автора.
  • +11
    if (($connect = stream_socket_accept($socket, -1)) && $info = handshake($connect)) {
        $connects[] = $connect;//добавляем его в список необходимых для обработки
        onOpen($connect, $info);//вызываем пользовательский сценарий
    }
    $connects[] = $connect;//добавляем его в список необходимых для обработки
    

    2 раза добавляете один и тот же сокет.

     $data = fread($connect, 100000);
    if (!$data) { //соединение было закрыто
    

    А вот и нет. Клиент может прислать «0».
    Используйте strlen($data).

    Еще ваш код, как и 90% мануалов по сокетам, не учитывает 3 вещи:
    1. при чтении данные могли прийти не полностью
    2. блокирующая запись (сокет может быть не готов)
    3. данные записались не полностью

    Лечить так:
    1. буфер чтения
    2. разобраться с использованием параметра $write для stream_select
    3. буфер записи

    Хорошие исходники для медитации:
    github.com/reactphp/event-loop/blob/master/StreamSelectLoop.php
    github.com/igorw/webserver-zceu
    • 0
      я для чтения из сокетов предпочитаю использовать более «низкоуровневые» вызовы *_recv
      тогда отключение можно поймать, когда после селекта мы из сокета вычитываем 0 байт.
      • +1
        strlen($data) — это и есть количество байт, если там 0, значит клиент отключился.
        Вы используете socket_recv? Оно работает с врапперами (ssl://)?
        • 0
          strlen — зачем нужен лишний вызов?
          врапперы — очевидно, что нет. Тут уже все зависит от задачи, есть ли необходимость во врапперах или нет.
          • 0
            strlen — зачем нужен лишний вызов?

            Вы правы (я и не спорил). В случае с socket_recv можно сэкономить на вызове strlen.

            врапперы — очевидно, что нет

            Мне было не очевидно, т.к. я socket_recv не использовал. Поэтому Вам и задал вопрос.
          • 0
            именно по этой причине (зависимость от использования сетевого протокола, ssl и т.д.) стоит использовать потоки, а не низкоуровневое api. Хотя все опять же зависит от задачи.
      • +1
        Всегда казалось что предпочтительнее использовать потоки…
    • 0
      спасибо, код в статье подправил.
      я сначала хотел в статье написать ещё пример с использованием libevent, но решил, что для статьи, в которой «делаем простой сервер вебсокетов» этого будет слишком много.
      • 0
        Советую взглянуть на pecl/event, он предоставляет более высокоуровневый и объектный интерфейс. Документации, правда, толком нет, но примеров из bitbucket-а вполне достаточно, если есть опыт работы с либевентом.
        • 0
          Да, я его тоже пробовал, написал примеры, но в эту статью не стал впихивать.
          В следующей статье приведу пример реализации на event и libevent для сравнения, кому что больше понравится.
    • 0
      клиент не может прислать 0, данные кодируются по протоколу вебсокета — если клиент отправит 0, то придёт ������
      по-этому можно не делать strlen($data), а достаточно if (!$data)
      • +2
        Первое правило безопасности программ — данные, полученные из внешнего источника, могут быть какими угодно.
        • +1
          UPD: второе правило безопасности — на любую, даже самую мелкую, ошибку всегда найдется хакер, который положит все нафиг с ее помощью.
          • 0
            if (!$data) {@fclose($client);}
            куда уже безопаснее? если пользователь в обход протокола запишет «0», то соединение будет разорвано.
            если бы хакер прислал «0», то он не положил бы чат, а разорвал соединение.
            чат зависал из-за того что использовалась fgets($client);, а она ожидает конец строки или таймаут.
            т.к. конца строки вы не приходило, то она «зависала».
            • 0
              А если этот нолик случайно окажется пересланным в первом байте между мастером и воркером? Кончено, для создания такой ситуации нужно гнать ОЧЕНЬ много нулей через чат — но и эффект будет интересным…
      • +2
        клиент не может прислать 0, данные кодируются по протоколу вебсокета — если клиент отправит 0, то придёт ������
        по-этому можно не делать strlen($data), а достаточно if (!$data)

        В Вашей конкретной реализации — возможно.

        В общем случае сервер состоит из:
        1. цикл обработки событий (event loop) (абстрактный, не зависящий от протокола)
        2. обработчик конкретного протокола

        Цикл читает данные и передает в обработчик. Обработка закрытия/обрыва соединения находится в цикле (в общем случае).
        Исходя из этих пунктов, !$data в цикле обработки использовать не стоит.
        Т.к. цикл обработки использует stream_select, то сервер может получать данные по кускам.
        Т.е. сообщение

        Это стоит 1000$
        

        Может прийти в виде

        Эт
        о с
        тоит 1
        0
        00$
        

        И на предпоследнем куске обработчик, проверяющий !$data, ошибочно закроет соединение. Каким бы способом сообщение не кодировалось, если там встречается «0» — соединение будет обработано неверно.
  • 0
    Про несколько процессов-воркеров будет крайне интересно увидеть статью.
    • 0
      алгоритмы prefork-серверов слабо зависят от языка реализации. Суть везде та же.
  • +5
    Не забывайте, что Вам нужно следить за жизнью этого демона. Рестартовать при падении, перезагружать, собирать логи и т.д. Тут у Вас, опять же, 2 пути:
    1. Написать обвязку самому.
    2. Использовать готовые решения.

    Я Вам рекомендую использовать supervisord.org/ — это Вас лишит лишней головной боли.
    Пример конфига можно увидеть тут: webadvent.org/2009/daemonize-your-php-by-sean-coates
  • +3
    Монстрообразность указанных в начале статьи библиотек обусловлена, как минимум наличием 4х протоколов вебсокетов (в вашем примере только один) и возможностью завести библиотеки вместе с libevent и pcntl (форки\многопроцессовость).
  • +3
    Мы используем HttpPushStreamModule для работы с сокетами из пхп. Поддерживает, к слову, много чего, включая деградацию до long-pooling.
    • 0
      А где можно поглядеть на пример взаимодействия php с этим модулем?
      • 0
        Где на пример взаимодействия с php посмотреть, не подскажу. Могу сказать, что взаимодействие сводится к http-запросам (например, записать что-то в канал = отправить запрос), но это, наверное, и так понятно.
        • 0
          Меня интересует вот что:

          Publisher отправляет сообщение. Я его получаю из:
          $_POST
          php://input
          STDIN
          Какая-то библиотека

          Откуда?

          Дальше. Есть несколько subscribers. Я хочу отправить им это сообщение. Я пишу его в:
          Сокет
          Пайп
          STDOUT
          echo

          Куда?
          • 0
            Хотите получить сообщение (сообщения) из канала — отправляете запрос.
            Хотите отправить сообщение подписчикам — отправляете запрос.
            Как именно отправить http-запрос из приложения — вам решать.
            • 0
              Хотите получить сообщение (сообщения) из канала — отправляете запрос.

              Это polling получается. Я имел ввиду использование WebSockets… Что-то я запутался. Надо пробовать на практике.

              В любом случае, спасибо за наводку.
              • 0
                Нет, клиент (из браузера) коннектится через веб-сокеты к каналу, открытому nginx. Приложение (бэк-энд) читает и пишет в канал, используя http запросы.
  • +1
    не будет ли проблемой то, что handshake выполняется сразу же после accept? Не будет ли fgets блокировать поток? Я имею в виду ситуацию, при которой любой сможет положить ваш comet-сервер просто установив соединение с сервером но не посылая никаких данных…
    • +1
      Будет и блокирует. Я проверил.
      • +1
        Чат замер. Уже весь Хабр проверяет.
        • 0
          На самом деле это легко чинится. Просто нужно вынести handshake в цикл и разруливать необходимость оного через socket_select. А еще не плохо вводить таймауты, что бы отбрасывать долговисящие пустые соединения.
          • 0
            сделал рукопожатие неблокирующим, как вы и написали.
            итоговый код в статье обновил.
  • 0
    имеет смысл открыть для себя библиотеки libev/libevent/libevent2
  • +13
    $isMasked = ($secondByteBinary[0] == '1') ? true : false;
    

    wat?
  • 0
    от «Войны и Мира» упал. Upd: а нет, уже всё ок
  • –2
    Вешается чат довольно таки просто — telnet sharoid.ru 8000…
    • 0
      добавил неблокирующую запись и неблокирующее рукопожатие.
      итоговый код в статье обновил.
      теперь ваша команда не сработает.
      • 0
        Только я записываю в сокет нуль-символ — возвращаемся к началу.
        • 0
          это потому что использовалась функция fgets($client);, а она ожидает конец строки или таймаут.
          т.к. конца строки вы не передавали, то она «зависала».
          поменял на fread? теперь это не сработает, исправил итоговый код в статье.
          • 0
            … а проверку на пустоту массива вы так и не исправили…
  • +1
    while true; do nc sharoid.ru 8000 < /dev/zero ; done
    
    phpdaemon и пр. не от хорошей жизни такие «монструозные», а из за того, что, в том числе, над безопасностью думают.
    • +1
      это статья про то как работают вебсокеты, вы слишком ответственно отнеслись к тестированию :)
    • 0
      мне казалось, что часа вам будет достаточно, чтобы убедиться в своей правоте и перестать глушить мой сервер, который я использую не только для тестов, но видимо я ошибался.
      • +1
        В смысле? Так я один раз попробовал, отправил комментарий и забыл. Кто то ещё запустил что-ли? Ну извините, возможно стоило в личку написать.
        Поставьте Nginx на фронт — он с недавних пор хорошо websocket проксирует и такой флуд легко отсекает. Плюс сможете websocket на 80 порту держать.
    • 0
      Причем тут phpdaemon? На самом деле, вы успешно провели атаку на замеченную ранее ошибку с некорректной проверкой длины.
  • 0
    А вот Ratchet реализация
  • 0
    Если вы хотите довести этот пример до возможности использования в реальной жизни — то настоятельно рекомендую прочитать вот эту статью: www.kegel.com/c10k.html
  • 0
    Вот Вам еще пара ссылок по сокетам и select:
    linux.die.net/man/3/select
    beej.us/guide/bgnet/output/html/singlepage/bgnet.html

    Там примеры на C под UNIX, но они здорово помогают глубже и полнее понять асинхронную работу с select.

    P.S.
    Путь «велосипеда» — это правильный путь. Правильный для обучения.
    Но чем больше начинаешь понимать всяких нюансов, тем чаще тебя начинает посещать мысль, что не такие уже и монструозные, на самом деле, те готовые решения, использования которых ты решил избежать.
    • 0
      Рекоммендовал бы еще рассмотреть случай, когда в сетевой поток могла бы прекратиться запись до того, как будет записано все.
      Пример расписан в оффициальной документации us2.php.net/manual/ru/function.fwrite.php
  • 0
    Я с PHP года 4 уже не работал, так что могу не знать нюансов, сужу по косвенным признакам… Вы написали, что переделали работу с сокетами на неблокирующую, но я нигде в коде не вижу вызовов stream_set_blocking да и read/write/fgets у вас используются так, как использовались бы с блокирующими сокетами. Мне кажется они у вас блокирующие…
    Пример — вызов fgets (его вы уже выпилили) возвращает в 3-х случаях: сокет закрыт, достигнут лимит или найден "\n". Как она работает с неблокирующими сокетами вообще не представляю, но совершенно точно что нужно удостовериться, что последний символ в считанных данных это "\n".

    Дальше fwrite: на неблокирующих сокетах он не обязан записывать всё, что ему сказали записать — сколько он запишет зависит от размера буферов, так что нужно делать что-то вроде
    if (length($data) != ($written = fwrite($data))) {
        $data = substr($data, $written),
    }
    
    и дописывать остаток когда сокет снова станет записываемым.

    Насчёт read та же фигня — в неблокирующем режиме он может вернуть хоть один байт (привет if(!$data)) хоть всё вплоть до $length. Так что всегда нужно проверять достаточно ли данных считалось и если нет — сохранять в буфер и дожидаться, пока сокет снова станет readable.

    Тот факт, что сокеты у вас всё-же блокирующие, позволяет, по идее, заблокировать воркера послав, например, всего один байт, в то время как сервер делает read($fd, 10000). Хотя насчёт этого не совсем уверен, возможно в PHP там промежуточные буферы какие то есть.
  • –3
    NodeJS + socket.io, нет?
  • 0
    foreach($read as $connect) {//обрабатываем все соединения ...обрабатываем $connect
    разве не foreach($read as $connects)?
    • +1
      $read это массив сокетов, по сути соединений. Так что у автора все правильно.
      • 0
        Понял. Благодарю за разъяснение, мозги в первой половине дня не в ту сторону повёрнуты)
    • 0
      $read — это массив сокетов, готовых для чтения
  • 0
    Как запускать сторонние скрипты из функций onMessage?
    fwrite($connect, encode(Test::prt(decode($data)['payload'])));
    


    Метод prt делает конкатенацию входящего сообщения с заданной строкой.
    public function prt($m){
         return $m . 'work!';
    }
    


    В результате корректно работает раз от раза:
    work!
    work!
    fdwork!
    work!
    work!
    fdwork!
    


    В чем причина?

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.