Почти настоящая многопоточность средствами php 5

Reading time5 min
В очередной раз читал про многопоточность в php точнее полное её отсутствие и всевозможные костыли в виде не блокируемых сокетов. Вот как раз там наткнулся на интересную статью в которой описывался очень простой и эффективный способ распараллеливания потоков. На основе этого материала написал небольшой класс что бы облегчить себе работу в будущем.

Итак код класса threads.php:

class Threads {
    public $phpPath = 'php';
    private $lastId = 0;
    private $descriptorSpec = array(
        0 => array('pipe''r'),
        1 => array('pipe''w')
    private $handles = array();
    private $streams = array();
    private $results = array();
    private $pipes = array();
    private $timeout = 10;
    public function newThread($filename$params=array()) {
        if (!file_exists($filename)) {
            throw new ThreadsException('FILE_NOT_FOUND');
        $params = addcslashes(serialize($params), '"');
        $command = $this->phpPath.' -q '.$filename.' --params "'.$params.'"';
        $this->handles[$this->lastId= proc_open($command$this->descriptorSpec$pipes);
        $this->streams[$this->lastId= $pipes[1];
        $this->pipes[$this->lastId= $pipes;
        return $this->lastId;
    public function iteration() {
        if (!count($this->streams)) {
            return false;
        $read = $this->streams;
            Здесь береться только один поток для удобства обработки 
            на самом деле в массиве $read их зачастую несколько

        $stream = current($read);
        $id = array_search($stream$this->streams);
        $result = stream_get_contents($this->pipes[$id][1]);
        if (feof($stream)) {
        return $result;
        Статичный метод для получения параметров из 
        параметров командной строки

    public static function getParams() {
        foreach ($_SERVER['argv'as $key => $argv) {
            if ($argv == '--params' && isset($_SERVER['argv'][$key + 1])) {
                return unserialize($_SERVER['argv'][$key + 1]);
        return false;

class ThreadsException extends Exception {


Теперь для примера создадим test.php:
$start = microtime(true);

require './threads.php';

$threads = new Threads;

for ($i=0;$i<10;$i++) {
    $threads->newThread('./delay.php'array('delay' => rand(15)));

while (false !== ($result = $threads->iteration())) {
    if (!empty($result)) {
        echo $result."\r\n";

$end = microtime(true);
echo "Execution time ".round($end - $start2)."\r\n";


И delay.php который он вызывает:

require './threads.php';

if ($params = Threads::getParams()) {
    echo 'Wait for '.$params['delay'].' s.';


В результате выполнения test.php получаем следующее:
Microsoft Windows [Version 6.1.7100]
Copyright (c) 2009 Microsoft Corporation. All rights reserved.

Z:\home\labs\www\threads>php test.php
Wait for 1 s.
Wait for 5 s.
Wait for 4 s.
Wait for 5 s.
Wait for 5 s.
Wait for 5 s.
Wait for 3 s.
Wait for 3 s.
Wait for 2 s.
Wait for 4 s.
Execution time 5.58

