Пользователь
0,0
рейтинг
28 декабря 2012 в 10:01

Разработка → Coroutines в PHP и работа с неблокирующими функциями перевод tutorial

PHP*
Одним из самых больших нововведений в PHP 5.5 будет поддержка генераторов и корутин (сопрограмм). Генераторы уже достаточно были освещены в документации и в нескольких других постах (например в этом или в этом). Сопрограммы же получили очень мало внимания. Это гораздо более мощный, но и более сложный для понимания и объяснения, инструмент.

В этой статье я покажу как реализовать планировщик задач с использованием корутин, чтобы вы поняли, что с ними можно делать и как их применять. Начнем с нескольких вступительных слов. Если вы считаете, что вы уже достаточно хорошо знаете как работают генераторы и корутины, тогда можете сразу перейти к разделу «Совместная многозадачность».

Генераторы


Суть генератора в том, что это функция, которая возвращает не просто одно значение, а последовательность значений, где каждое значение выброшено одно за другим. Или, другими словами, генераторы позволяют вам реализовать итератор, без лишнего кода.

Очень простым примером может послужить функция xrange():
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}

foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}


Эта функция делает то же, что и встроенная в PHP, range(). Единственная разница заключается в том, что range() вернула бы вам массив из миллиона чисел, а xrange() вернет итератор, который будет выбрасывать эти числа, но никогда не будет создавать массив со всеми ними.

Преимущества такого подхода должны быть очевидны. Таким образом вы можете работать с очень большими наборами данных, не загружая их всех в память. Вы даже можете работать с бесконечными потоками данных.

Все это также может быть сделано без генераторов, создав класс имплементирующий интерфейс Iterator. Генераторы просто делают это (гораздо) более простым занятием, потому что вам не нужно имплементировать пять разных методов для каждого итератора.

Генераторы как прерываемые функции


Чтобы перейти от генераторов к корутинам, важно понять, как они устроены изнутри: генераторы — это прерываемые функции, в которых для прерываний служит ключевое слово yield.

Возвращаясь к предыдущему примеру, когда вы вызываете xrange(1, 1000000) — ничего из тела функции xrange на самом деле не будет вызвано. Вместо этого, PHP просто возвратит объект класса Generator, который имплементирует интерфейс Iterator:
<?php

$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)


Код будет запущен только тогда, когда вы запустите определенные методы итератора. Например если вы вызовете $range->rewind() код функции xrange() будет выполнен до первого yield. В этом случае это значит, что сначала будет выполнен $i = $start и затем yield $i. Что бы мы ни передали в yield, это можно будет получить вызовом $range->current().

Чтобы продолжить исполнение кода, вы должны вызвать $range->next(). Это опять заставит генератор исполнить код до следующего yield. Таким образом, используя последовательные вызовы next() и current(), вы можете получить все значения из генератора, пока он не достигнет той точки, когда код просто закончится. В случае с xrange() это произойдет, когда $i достигнет $end. В этом случае поток исполнения достигнет конца функции, не оставив больше кода. После того, как это произойдет, valid() будет возвращать false и итерирование прекратится.

Сопрограммы


Главное, что корутины добавляют к вышеописанной функциональности — это возможность отправлять значения генератору. Это делает из привычного монолога генератора полноценный диалог, где вызывающая сторона может также что-то сообщать генератору.

Значения передаются корутине вызовом метода send() вместо next(). Примером того, как это работает, послужит вот эта корутина:
function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}

$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar');


Как вы видите, здесь yield не как statement (как например return или echo), а как выражение, то есть он возвращает какое-то значение. Он возвратит то, что было послано через send(). В данном примере yield сначала возвратит "Foo", а потом "Bar".

В этом примере было представлено как yield может выступать в качестве простого получателя. Но вы также можете комбинировать оба типа использования, таким образом вы сможете и посылать и получать значения. Вот пример того, как это работает:
function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}

$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (the first var_dump in gen)
                              // string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2"   (again from within gen)
                              // NULL               (the return value of ->send())


Точный порядок вывода может показаться немного сложным для понимания на первый взгляд, так что перечитайте код и попробуйте запустить сами, чтобы разобраться почему все происходит именно в таком порядке. Здесь есть две вещи, на которые я бы хотел обратить ваше внимание: первое, использование скобок вокруг yield — это не случайность. Эти скобки нужны нам по техническим причинам (я даже подумывал добавить выброс исключения на прямое присваивание). Во-вторых, вы должно быть заметили, что current() был использован без вызова rewind(). rewind(), на самом деле, в таком случае вызывается неявно.

Совместная многозадачность


Если читая пример функции logger(), вы подумали «Зачем я буду использовать для этого корутину? Почему бы не сделать для этого обычный класс?», тогда вы были абсолютно правы. Тот пример лишь демонстрирует, как этим можно пользоваться, но в нем нет никаких причин, чтобы использовать корутины. Как было сказано выше, во введении, корутины — это очень мощная штука, но их применение очень редко и часто сильно усложнено, что делает задачу придумать простые и не надуманные примеры довольно сложной.

Я решил показать вам реализацию совместной мультизадачности, используя корутины. Суть в том, что у нас есть несколько задач, которые надо запустить параллельно. Но процессор (прим. пер. сферический и в вакууме) может исполнять лишь одну задачу в один момент времени. Таким образом, процессору нужно переключаться между разными задачами и давать каждой «немного поработать».

«Совместная» эта многозадачность потому, что она подразумевает добровольную передачу контроля исполнения планировщику, чтобы тот мог запустить другую задачу. Есть также вытесняющая многозадачность, где планировщик сам может прервать задачу. Совместная многозадачность использовалась в ранних версиях Windows (до Win95) и Mac OS, но потом они переключились на вытесняющую. Причина очевидна — если вы полагаетесь на какую-либо программу, чтобы она добровольно отдала поток управления, то любая программа может просто оккупировать весь CPU.

Сейчас вы уже должны видеть связь между корутинами и планировщиком задач: yield дает возможность прерываться задаче самостоятельно, чтобы отдать поток управления планировщику и он мог бы запустить другую задачу. Помимо этого, yield может быть использован для коммуникации задачи с планировщиком.

В нашем случае, задача будет тонкой оберткой вокруг функции-генератора:
class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId() {
        return $this->taskId;
    }

    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    public function isFinished() {
        return !$this->coroutine->valid();
    }
}


У каждой задачи будет свой идентификатор (taskId). Используя метод setSendValue() вы можете указать, какое значение послать задаче на следующем запуске (зачем это нужно узнаете позже). Метод run() на самом деле всего лишь вызывает метод send() корутины.
Чтобы понять, зачем нам нужен дополнительный флаг beforeFirstYield, посмотрите на следующий код:
function gen() {
    yield 'foo';
    yield 'bar';
}

$gen = gen();
var_dump($gen->send('something'));

// Когда вызывается send(), перед первым yield произойдет неявный вызов rewind()
// Вот что на самом деле будет происходить:
$gen->rewind();
var_dump($gen->send('something'));

// rewind() перейдет к первому yield (и опустит его значение), send()
// перейдет ко второму yield (и var_dump-нет его значение).
// Таким образом мы потеряли первое значение


С помощью beforeFirstYield мы будем знать, был ли уже возвращен первый yield.

Планировщику теперь придется сделать чуть больше, чем просто пройтись по всем таскам:
class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();
    }

    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }

    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}


Метод newTask() создает новую задачу и кладет ее в taskMap. Более того, он добавляет ее в очередь taskQueue. Метод run() потом проходит по этой очереди и запускает задачи. Если задача завершена — она удаляется, иначе добавляется в конец очереди.

Давайте попробуем планировщик с двумя простыми (и довольно бессмысленными) задачами:
function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task1());
$scheduler->newTask(task2());

$scheduler->run();


Обе задачи просто выводят сообщение, и отдают поток управления назад планировщику. Вот что будет выведено:
This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.


Взаимодействие с планировщиком


Итак, наш планировщик работает и мы можем перейти к следующему пункту в повестке дня: взаимодействие между задачами и планировщиком. Мы будем использовать тот же способ, что используют процессы чтобы общаться с операционной системой: системные вызовы (syscall, сискол). Нам нужны сисколы, так как операционная система находится на другом уровне привилегей, нежели сами процессы. Поэтому, чтобы делать какие-то действия, требующие больших привилегий (например убивать другие процессы) должен быть способ отдать контроль ядру, чтобы оно могло произвести те действия. Изнутри это реализовано с помощью прерывающих инструкций.

Наш планировщик будет придерживаться этой архитектуры: вместо того, чтобы передавать планировщик задаче (которая может с ним сделать все что угодно), мы будем взаимодействовать с помощью сисколов, переданных через yield. В данном случае, yield будет выступать и как прерыватель и как способ передать информацию (и получить) планировщику.

Чтобы представить сискол, я буду использовать небольшую обертку над callable:
class SystemCall {
    protected $callback;

    public function __construct(callable $callback) {
        $this->callback = $callback;
    }

    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback; // Can't call it directly in PHP :/
        return $callback($task, $scheduler);
    }
}


Она будет вести себя, как любая другая callable, но будет принимать в аргументы задачу и планировщик. Чтобы работать с этим, нам надо немного модифицировать метод run() планировщика:
public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();

        if ($retval instanceof SystemCall) {
            $retval($task, $this);
            continue;
        }

        if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}


Наш первый сискол просто вернет идентификатор задачи:
function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}


Это происходят потому, что мы ставим значение для отправки и ставим задачу обратно в планировщик. Для сисколов планировщик не добавляет задачи в очередь автоматически, нам нужно делать это вручную (вы узнаете почему чуть позже).
Используя этот новый сискол, мы можем переписать предыдущий пример:
function task($max) {
    $tid = (yield getTaskId()); // <-- here's the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task(10));
$scheduler->newTask(task(5));

$scheduler->run();


Этот код даст нам тот же вывод, что и предыдущий пример. Еще пара сисколов, для создания новых и убивания задач:
function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}

function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            $task->setSendValue($scheduler->killTask($tid));
            $scheduler->schedule($task);
        }
    );
}


Для функции killTask нам понадобится дополнительный метод в планировщкие:
public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;
    }

    unset($this->taskMap[$tid]);

    // Это немного криво и можно было бы оптимизировать,
    // чтобы не итерироваться по всей очереди, но я оставлю это на потом
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {
            unset($this->taskQueue[$i]);
            break;
        }
    }

    return true;
}


Маленький скрипт для теста этой функциональности:
function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!\n";
        yield;
    }
}

function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));

    for ($i = 1; $i <= 6; ++$i) {
        echo "Parent task $tid iteration $i.\n";
        yield;

        if ($i == 3) yield killTask($childTid);
    }
}

$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();


Вывод будет следующим:
Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.


Дочерняя задача убивается после трех итераций, тогда и завершаются сообщения «Child task still alive!». Стоит отметить, что это не настоящие родитель и ребенок, так как дочерняя задача может продолжать исполняться даже когда родительская уже завершилась. Или дочерняя может даже убить родительскую. Мы бы могли модифицировать планировщик, чтобы получить нормальное отношение между родителем и ребенком, но не в этой статье.

Есть еще довольно много разных типов вызовов, которые можно реализовать, например wait (ждать пока задача не завершится полностью), exec (который задает, какую задачу сейчас нужно исполнить) и fork (создание клона задачи). Клонирование довольная классная фича и вы можете ее имплементировать с корутинами, так как они поддерживают клонирование.

Неблокирующее взаимодействие


Действительно классное применения нашего планировщика очевидно — веб-сервер. Может быть одна задача, слушающая сокет на предмет наличия новых соединений и каждый раз как новое соединение сделано, будет создана новая задача для обработки этого соединения.

Сложность заключается в том, что операции на сокетах, как например чтение данных в PHP блокирующие, то есть PHP будет ждать пока клиент не завершит отправку данных. Для веб-сервера это, очевидно, совсем не хорошо: это означает, что он сможет обрабатывать лишь один запрос в момент времени.

В качестве решения нам надо спрашивать у сокета, «готов» ли он, перед тем как читать или писать данные в него. Чтобы узнать, какие сокеты готовы для передачи или получения данных, мы будем использовать функцию stream_select().

Для начала давайте добавим пару новых сисколов, которые будут отправлять определенный сокет на ожидание либо чтения, либо записи:
function waitForRead($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForRead($socket, $task);
        }
    );
}

function waitForWrite($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForWrite($socket, $task);
        }
    );
}


Эти сисколы просто прокси для соответствующих методов планировщика:
<?php

// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];

public function waitForRead($socket, Task $task) {
    if (isset($this->waitingForRead[(int) $socket])) {
        $this->waitingForRead[(int) $socket][1][] = $task;
    } else {
        $this->waitingForRead[(int) $socket] = [$socket, [$task]];
    }
}

public function waitForWrite($socket, Task $task) {
    if (isset($this->waitingForWrite[(int) $socket])) {
        $this->waitingForWrite[(int) $socket][1][] = $task;
    } else {
        $this->waitingForWrite[(int) $socket] = [$socket, [$task]];
    }
}


Поля waitingForRead и waitingForWrite просто массивы содержащие ожидающие сокеты и относящиеся к ним задачи. Самая интересная часть это этот метод, который проверяет готовы ли сокеты и перепланирует их таски:
protected function ioPoll($timeout) {
    $rSocks = [];
    foreach ($this->waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }

    $wSocks = [];
    foreach ($this->waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }

    $eSocks = []; // dummy

    if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }

    foreach ($rSocks as $socket) {
        list(, $tasks) = $this->waitingForRead[(int) $socket];
        unset($this->waitingForRead[(int) $socket]);

        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }

    foreach ($wSocks as $socket) {
        list(, $tasks) = $this->waitingForWrite[(int) $socket];
        unset($this->waitingForWrite[(int) $socket]);

        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
}


Функция stream_select берет на вход массивы сокетов ожидающих чтения, записи и исключения (проигнорируем последний). Массивы передаются по ссылке и эта функция оставит в них лишь те элементы, состояние которых изменилось. После чего мы можем пройтись по всем этим массивам и перепланировать все задачи связанные с данными сокетами.

Чтобы произвести все эти действия, мы добавим в планировщик следующий метод:
protected function ioPollTask() {
    while (true) {
        if ($this->taskQueue->isEmpty()) {
            $this->ioPoll(null);
        } else {
            $this->ioPoll(0);
        }
        yield;
    }
}


Эту задачу надо зарегистрировать в какой-то момент, например вы можете добавить $this->newTask($this->ioPollTask()) в начало метода run(). Тогда она будет работать также, как любая другая задача, проверяя доступные готовые сокеты на каждое переключение между задачами. Метод ioPollTask вызовет ioPoll с нулевым таймаутом, значит stream_select вернет результат сразу, без ожидания.

Только если очередь задач пуста, мы используем null в качестве таймаута, в таком случае stream_select будет ожидать, пока какой-либо из сокетов не будет готов. Если бы мы этого не делали, то съели бы весь CPU (по крайней мере ядро), т.к. эта задача выполнялась бы в цикле раз за разом, пока кто-нибудь не подключился бы.

Сам сервер выглядит довольно просто:
function server($port) {
    echo "Starting server at port $port...\n";

    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);

    stream_set_blocking($socket, 0);

    while (true) {
        yield waitForRead($socket);
        $clientSocket = stream_socket_accept($socket, 0);
        yield newTask(handleClient($clientSocket));
    }
}

function handleClient($socket) {
    yield waitForRead($socket);
    $data = fread($socket, 8192);

    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);

    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;

    yield waitForWrite($socket);
    fwrite($socket, $response);

    fclose($socket);
}

$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();


Он будет принимать соединения на 8000 порт и просто отправлять в ответе содержимое самого запроса. Сделать что-либо «настоящее» было бы гораздо сложнее (правильная обработка HTTP-запросов — не тема этой статьи).

Теперь вы можете испытать этот сервер с помощью чего-то вроде ab -n 10000 -c 100 localhost:8000/. Так мы отправим 10000 запросов, 100 из которых будут слаться одновременно. Используя этот бенчмарк я получил средний ответ за 10 миллисекунд. Но была проблема с некоторыми запросами, которые обрабатывались очень долго (в райное 5 секунд), поэтому общая пропускная способность всего 2000 запросов в секунду. С более высокой конкуренцией (например -c 500) скрипт работает также неплохо, но некоторые соединения выбрасывают ошибку «Connections reset by peer».

Вынесенные сопрограммы


Если вы начнете делать большую систему, используя наш планировщик, вы вскоре наткнетесь на одну проблему: мы часто разделяли код, вынося какие-то куски в отдельные функции и вызывая их. Но с сопрограммами это невозможно. Представим такой код:
function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}

function task() {
    echoTimes('foo', 10); // print foo ten times
    echo "---\n";
    echoTimes('bar', 5); // print bar five times
    yield; // force it to be a coroutine
}

$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();


В этом коде мы попытались вынести сопрограмму из task()-а и выполнить ее. Но это не сработает. Как было сказано в самом начале статьи, вызов генератора не исполнит никакого кода в нем, вместо этого только вернет объект класса Generator. В нашем случае именно это и происходит, вызовы echoTimes() ничего не сделают, только возвратят объект.

Чтобы у нас это получилось, нам нужно написать небольшую обертку для сопрограмм. Таким образом так мы сможем вызывать под-сопрограммы:
$retval = (yield someCoroutine($foo, $bar));

Под-сопрограммы тоже могут возвращать значение с помощью yield:
yield retval("I'm return value!");

Функция retval() ничего не делает, кроме как возвращает обертку над значением, которая говорит нам о том, что это возвращаемое значение:
class CoroutineReturnValue {
    protected $value;

    public function __construct($value) {
        $this->value = $value;
    }

    public function getValue() {
        return $this->value;
    }
}

function retval($value) {
    return new CoroutineReturnValue($value);
}


Чтобы сделать из обычной сопрограммы составную (с под-сопрограммами) нам придется написать еще одну функцию (которая очевидно — еще одна сопрограмма):
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;

    for (;;) {
        $value = $gen->current();

        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }

        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }

            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }

        $gen->send(yield $gen->key() => $value);
    }
}


Эта функция работает как простая прокси между вызывающим и исполняющейся под-сопрограммой. Также, она проверяет является ли возвращаемое значение также генератором и, если да, запускает его. Когда она получает объект CoroutineReturnValue, она возьмет родительскую сопрограмму и продолжит ее выполнение.

Чтобы составные сопрограммы можно было использовать в задачах, строку $this->coroutine = $coroutine; в конструкторе класса Task, нужно заменить на $this->coroutine = stackedCoroutine($coroutine);.

Теперь мы можем улучшить наш веб-сервер немного, сгруппировав функции ожидания с функциями чтения, записи и приема нового соединения. Для этого воспользуемся таким классом:
class CoSocket {
    protected $socket;

    public function __construct($socket) {
        $this->socket = $socket;
    }

    public function accept() {
        yield waitForRead($this->socket);
        yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
    }

    public function read($size) {
        yield waitForRead($this->socket);
        yield retval(fread($this->socket, $size));
    }

    public function write($string) {
        yield waitForWrite($this->socket);
        fwrite($this->socket, $string);
    }

    public function close() {
        @fclose($this->socket);
    }
}


Теперь сервер можно немного переписать:
function server($port) {
    echo "Starting server at port $port...\n";

    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);

    stream_set_blocking($socket, 0);

    $socket = new CoSocket($socket);
    while (true) {
        yield newTask(
            handleClient(yield $socket->accept())
        );
    }
}

function handleClient($socket) {
    $data = (yield $socket->read(8192));

    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);

    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;

    yield $socket->write($response);
    yield $socket->close();
}


Обработка ошибок


Как хороший программист, вы очевидно заметили, что вышеприведенным примерам кода не хватает обработки ошибок. Любая операция с сокетами всегда может вывалиться в ошибку. Я сделал так, потому что обработка ошибок для примеров очень утомительна (особенно в случае с сокетами) и это раздуло бы наш код в несколько раз.

Но, тем не менее, я бы хотел рассказать про обработку ошибок для сопрограмм в целом: сопрограммы позволяют выбрасывать исключения внутрь них используя метод throw().

Метод throw() берет исключение в первый аргумент и выбрасывает его в том месте, где стоит текущий yield (чье значение можно получить с помощью метода current()):
function gen() {
    echo "Foo\n";
    try {
        yield;
    } catch (Exception $e) {
        echo "Exception: {$e->getMessage()}\n";
    }
    echo "Bar\n";
}

$gen = gen();
$gen->rewind();                     // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
                                    // and "Bar"


Это очень классная вещь, т.к. мы можем, например, дать возможность через сисколы и под-сопрограммы выбрасывать исключения. Для сисколов, метод Scheduler::run() надо немного изменить:
if ($retval instanceof SystemCall) {
    try {
        $retval($task, $this);
    } catch (Exception $e) {
        $task->setException($e);
        $this->schedule($task);
    }
    continue;
}


И класс Task должен обрабатывать вызовы throw():
class Task {
    // ...
    protected $exception = null;

    public function setException($exception) {
        $this->exception = $exception;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } elseif ($this->exception) {
            $retval = $this->coroutine->throw($this->exception);
            $this->exception = null;
            return $retval;
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    // ...
}


Теперь мы можем начать выбрасывать исключения из сисколов! Например для killTask, давайте выбросим исключение, если переданный идентификатор задачи невалиден:
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            if ($scheduler->killTask($tid)) {
                $scheduler->schedule($task);
            } else {
                throw new InvalidArgumentException('Invalid task ID!');
            }
        }
    );
}


Теперь попробуем:
function task() {
    try {
        yield killTask(500);
    } catch (Exception $e) {
        echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
    }
}


Пока что это не будет работать, т.к. функция stackedCoroutine не обрабатывает исключения. Чтобы поправить это, модифицируем ее немного:
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    $exception = null;

    for (;;) {
        try {
            if ($exception) {
                $gen->throw($exception);
                $exception = null;
                continue;
            }

            $value = $gen->current();

            if ($value instanceof Generator) {
                $stack->push($gen);
                $gen = $value;
                continue;
            }

            $isReturnValue = $value instanceof CoroutineReturnValue;
            if (!$gen->valid() || $isReturnValue) {
                if ($stack->isEmpty()) {
                    return;
                }

                $gen = $stack->pop();
                $gen->send($isReturnValue ? $value->getValue() : NULL);
                continue;
            }

            try {
                $sendValue = (yield $gen->key() => $value);
            } catch (Exception $e) {
                $gen->throw($e);
                continue;
            }

            $gen->send($sendValue);
        } catch (Exception $e) {
            if ($stack->isEmpty()) {
                throw $e;
            }

            $gen = $stack->pop();
            $exception = $e;
        }
    }
}


Подводя итог


В этой статье мы построили планировщик задач, используя совместную мультизадачность, возможность слать сисколы, поработали с неблокирующими операциями и обработкой ошибок. Самая классная вещь во всем этом, это то, что конечный код выглядит абсолютно синхронным, даже не смотря на то, что он делает много асинхронных операций. Если вам нужно прочитать данные из сокета, вам не надо передавать коллбэк или регистрировать листенер. Вместо этого вы пишите yield $socket->read().

Когда я впервые услышал об этом, я понял, какая это крутая вещь, это и мотивировало меня имплементировать их в PHP. В то же время я нахожу сопрограммы действительно пугающими. Есть тонкая грань между классным кодом и жуткой грязью, и я думаю, сопрограммы стоят на этой грани. Мне сложно сказать, является ли использование сопрограмм в таком ключе, в каком я описал выше, каким-то выигрышным.

В любом случае, я думаю это очень интересная тема и надеюсь вы тоже :)

ПС от переводчика: статья очень большая и сложная для восприятия, поэтому могут быть ошибки, в т.ч. смысловые. если увидели такую — отправляйте пожалуйста.
Перевод: nikic
Никита Нефедов @nikita2206
карма
137,2
рейтинг 0,0
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Спецпроект

Самое читаемое Разработка

Комментарии (55)

  • +1
    Вопрос. Вы тот Никита или тезка?
    • +2
      Тезка я :)
      • +5
        Все равно оба молодцы :)
  • +6
    Радует, что PHP становится более универсален.
    • +5
      Подождите сейчас набегут рубисты скажут «похапе говно, я на нем писцал 5 лет назад, ничего не изменилось» :)
      • +9
        Уже ↓ :)
        • +2
          С точностью до наоборот.
          Я не рубист. Похапэ не говно. Писал я на нем последний раз не 5, а 2,5 года назад.
          И в нем все изменилось с того момента как я первый раз пытался разобраться со скриптом ( лет 7 назад, тогда еще php 4 был очень распространен). Раньше можно было подобрать десяток различных функций для выполнения одной операции с каким угодно порядком аргументов и любым методом именований функций. А теперь задачи можно решать еще и десятком различных парадигм и с помощью десятка способов записи.

          p.s. прошу менч извинить тех, чьи религиозные чувства были задеты
          • +5
            Какие религиозные чувства? Здесь по-вашему фанатики PHP собрались? Тут сидят умные люди, работающие головой, и понимающие, что это просто инструмент… надеюсь.

            Сейчас PHP действительно очень изменился. Я бы даже это назвал другим языком (PHP4 != PHP5 != PHP5.4 — это не булева алгебра:) ), настолько серьезные изменения в подходах.
            • 0
              >Какие религиозные чувства?
              Не знаю какие, но народ оценил)) -17 за пост и -8 в карму

              p.s. ухожу, ухожу, ухожу, а то совсем тухлыми помидорами закидают))
    • –14
      Кухонный комбайн со встроенной подводной лодкой на солнечных батареях
  • –15
    <sarcasm>Теперь на PHP можно написать ОС уровня Windows 3.11</sarcasm>
    • +9
      Товарищи, я сам пишу на PHP, если что, причём очень давно и считаю его неплохим языком :). Если кто-то вдруг не понял, корутины добавляют поддержку сохранения _состояния_ выполнения функции, что дает возможность написать что-то вроде ОС, как например Windows 3.11. Если правильно реализовать все блокирующие вызовы, то оно даже будет работать. Если кто-то не согласен, что PHP не годится для написания более сложных ОС, прошу привести аргументы :).
      • +1
        Ну, ОС можно написать и на JS, если суметь его скомпилировать (вместе с интерпретатором:) ) так, чтобы он ловил прерывания.
      • +2
        Выкрутился.
  • +1
    На самом деле генераторы действительно очень крутая штука, первый раз столкнулся с ними в питоне и сразу не совсем понял зачем они мне вообще нужны пока не научился ими пользоваться.
    • 0
      Кстати замечу что код (понимаю что это пример, но все-же):

      $data = (yield $socket->read(8192));

      может все-таки зависнуть. Сокет может получить менее 8192 байт и вернет ошибку.
      • 0
        Вроде как нет:

        Reading stops as soon as one of the following conditions is met:
        — length bytes have been read
        — EOF (end of file) is reached
        — a packet becomes available or the socket timeout occurs (for network streams)
        — if the stream is read buffered and it does not represent a plain file, at most one read of up to a number of bytes equal to the chunk size (usually 8192) is made; depending on the previously buffered data, the size of the returned data may be larger than the chunk size.

        fread()
      • 0
        Не зависнет, а вернёт меньше чем 8192 байт. Поэтому при работе с неблокирующими сокетами нужно проверять сколько данных считалось и если меньше, чем просили — снова вставать в очередь на чтение.
  • +1
    в настоящее время внедряю использование сопрограмм в разных своих сервисах (на Си)
    спасибо, что показали нам эту возможность для РНР…

    думаю пригодится
  • +1
    А кто-то может объяснить, зачем нужны (в практическом применении) корутины, ну помимо написания таскменеджера, который, почему-то видится в php как немного чуждая вещь?
    • +1
      Итератор в меньшее количество строк, например. И с двусторонней связью.
      • +3
        Это вы сейчас про корутины или генераторы? С генераторами-то всё понятно, я их сразу оценил. Не понятно с корутинами.
    • 0
      корутины применяются для псевдо-распараллеливания кода, так как пхп не поддерживает треды…
      распараллеливание выгодно там, где есть операции в/в:
      пока одна коротина занята приемом/отправлением данных — другая коротина может какие-то другие данные обработать.
      один из вариантов использования — это мультиплексный асинхронный сервер

      единственное, что хотелось бы увидеть от автора: это реализация двух серверов — один на libevent, воторй на коротинах и измерить их эффективность ( память, кол-во обработанных соединений за сек… )

      я понимаю что это тема отдельной статьи, но све же может кто возмется???
      • 0
        Только в один момент времени работает только одна корутина, в нашем случае сокеты сами отрабатывали в буффер, а потом считывались.
      • 0
        Т.е., речь-таки не об обычном применении php, а об узкоспециализированном.
        • +2
          да, как и применение либэвент — чисто в узкоспециализированном применении
      • +2
        Действительно не поддерживает? Или я в чём-то ошибся?
        • +1
          • 0
            скажите мне, как можно организовать паралельность,
            если в исходниках ни одной phtread функции.
            • +1
              Что-то плохо вы смотрели, есть они там, вот например.
              • 0
                нда… слона-то я и не увидел…
                рад за многопоточность в РНР!
                спасибо за подсказку использования этого расширения…
            • –1

    • 0
      достоинство коротин (не в пхп) в том, что они позволяют распараллелить код, без накладных расходов в однопоточной модели. При этом всегда будет занято только одно ядро процессора, в отличие от тредовой модели. При реализации многотредовой модели появляются проблемы в синхронизации, затраты ресурсов на переключении контекста приложения между тредами и т.п…
      считается что в ряде задач, решения на коротинах — эффективнее.
    • 0
      еще корутины могут быть нужны при распределение вычислений между несколькими серверами задач:
      родили несколько коротин, которые отправили данные на разные сервера задач: например обработка машинный перевод или распределенный поиск… далее переключая контекст коротин мы смотрим статусы задач и отдаем уже результат
  • +4
    Один к одному yield из питона (даже api генераторов то же). Странно, что об этом не упомянули в статье.
    Более того, отсутствует поддержка yield from, который позволяет не дублировать код, а выносить части генераторов в отдельные генераторы.
    • +2
      Как бы, генератор — это довольно общий концепт и он есть в крайне большом количестве языков. Да и весь implementation-specific апи тут — это метод send, не сильно много сходств.
  • +1
    Очень полезная статья. И в целом PHP движется в верном направлении :)
    Надеюсь выход php5.5 сподвигнет к обновлению хостеров хотя бы до 5.4
    • 0
      Ленивые shared-хостеры, которым западло давать последнюю версию php клиентам в век тотальных облаков и VPS/VDS? Они там живы еще, что ли?
      • 0
        Ну справедливости ради стоит сказать, что многие сейчас боятся обновляться до 5.4, т.к. APC в ногу с трейтами любят падать сегфолтами.
  • 0
    У меня есть два вопроса:
    Генератор обязательно должен быть глобальной функцией? Он может быть замыканием?
    Когда релиз php 5.5? :)
    • 0
      Релиз обещали в феврале-марте, если сроки не сдвигались.
    • 0
      Нет, насколько я помню генератором может быть любая фукнция (метод, анонимная функция).
  • +4
    А я все жду нормальные getters & setters, текущее извращение с __* надоело :(
  • +2
    Что за «корутины»? «Сопрограммы» нормальный же перевод. Почему только в заголовке.
    • 0
      Я знаю, таких как я не любят, но я не люблю многие русские слова. Некоторые из них просто до смешного ужасно звучат и их очень неудобно использовать в тех-литературе, например «сопрограмма», «область видимости» (scope — куда проще), «поток исполнения» (нифига не понятно, что имеется ввиду), и т.д..
      • +2
        Ясно, что не удобно использовать русские термины, но если пользоваться английскими, то их можно уж и оригинально писать, а не делать транслитерацию.
        • 0
          Скакать с раскладки на раскладку очень не приятное занятие, а еще мне нравится слово «корутина», напоминает мне «аскорутин» — вкусные витаминки, которые я люблю.
      • +4
        «Сопрограмма» ужасно звучит? О_О
        Странно. А «подпрограмма», «сопроцессор» как, нормально? Приступов паники не вызывают? :)
      • +6
        Адекватные аналоги на русском можно и нужно использовать, как бы ужасно, по чьему-либо мнению, они не звучали. Чего не стоит делать, так это мешать «coroutines», «сопрограммы» и «корутины» в одном месте.

        Калькирование «корутины» уже плохо тем, что провоцирует лингвистически неискушенного человека на ошибки при склонении.

        В остальном перевод читается довольно гладко. Спасибо.
  • –4
    ОМГ, да вы посмотрите на это. Парсер PHP исчерпал себя. Для объявления джоин поинтов был внедрён отдельный синтаксис. А что будем делать с клиентом? Ах да, давайте теперь функция, которая вроде бы функция вдруг станет специальным объектом со специальным интерфейсом.

    Кто мешал заимплементить операторы <</>>? Ой, так они же уже используются для бинарного сдвига. А что делать? Так давай в лоб сделаем, как получается. Думать некогда.
    • +4
      Ради этого ранта я даже компьютер включил.
      Кстати не в первый раз тебя вижу в треде, но, как и в прошлый раз, больше чем просто побросаться словами ты не можешь.

      > Парсер PHP исчерпал себя
      Открыл америку. Парсер пхп делает все на лету, конечно он паршивый до нельзя, но он никуда не исчерпывал себя, он всегда таким был. В будущем собираются сделать AST-based парсер.

      > Для объявления джоин поинтов был внедрён отдельный синтаксис
      Каких джоин поинтов? где отдельный синтаксис? Джоин поинты относятся к АОП, причем тут генераторы?

      > Ах да, давайте теперь функция, которая вроде бы функция вдруг станет специальным объектом со специальным интерфейсом.
      Функция — не объект. Функция при вызове возвращает объект. Также сделано, например, в шарпе. Это очень даже логично.

      >> Кто мешал заимплементить операторы <</>>?
      Для чего? Я бы не хотел, чтобы эти операторы использовались для чего-то другого, кроме сдвига. Впрочем да, перегрузки операторов в юзерлэнде пхп не хватает.
      • 0
        Не туда ответил. смотри комментарий ниже.
  • –1
    Каких джоин поинтов? где отдельный синтаксис? Джоин поинты относятся к АОП, причем тут генераторы?
    А чем yeild не отдельный синтаксис, так же как echo к примеру? Уж не знал я как назвать точки приёма и отправки сообщений.
    Для чего? Я бы не хотел, чтобы эти операторы использовались для чего-то другого, кроме сдвига.
    Да, для того же для чего был реализован yeild. Целостность. А так с одной стороны получается один подход, а с другой — другой.
  • 0
    Планировщик, для которого был написан пример, может быть выполнен
    при помощи расширения ev:

    <?php
    $w1 = new EvIdle(function ($w) {
    	(--$w->data[0] <= 0) and $w->stop();
    	
    	echo "Task 1, i = ", $w->data[0], PHP_EOL;
    }, array (10));
    
    $w2 = new EvIdle(function ($w) {
    	(--$w->data[0] <= 0) and $w->stop();
    	
    	echo "Task 2, i = ", $w->data[0], PHP_EOL;
    }, array (5));
    
    Ev::run();
    ?>
    


    Небольшие примеры.

    Слежение за доступностью сокета, файла или другого узла может быть выполнено с помощью
    EvIo. Асинхронные файловые операции
    можно также выполнять, например, через eio_read, eio_write, eio_sendfile и др.
    • 0
      Классно, на самом деле под пхп есть много хороших расширений, но многие обходятся вниманием. Видимо осадок у людей остался от шаред хостингов, мол расширения там не будет — не запустишь приложение.

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.