PHP

индекс
206,76

Параллельный импорт данных

Представим, что у нас есть некоторый набор задач, допускающих возможность параллельного выполнения. К примеру, нам нужно организовать RSS-агрегатор, обновляющий через заданный промежуток времени все свои ленты. Понятно, что основное и при этом вполне ощутимое время будет уходить на загрузку данных с удалённого источника. Учитывая это, организация такого импорта путём последовательной загрузки лент лишена смысла, так в случае сколь-либо большого количества лент, импорт не будет укладываться в отведённые ему сроки.

Варианта решения проблема тут можеть быть два. Первый — реализовать параллельную загрузку лент с помощью CURL'a. Наприме так:

// Подготовка данных для асинхронного запроса
$rMultiHandler = curl_multi_init();
$aResources = array();
foreach( $aFeedUrls as $sFeedUrl ) {
  $rResource = curl_init();

  curl_setopt($rResource, CURLOPT_RETURNTRANSFER, 1);
  curl_setopt($rResource, CURLOPT_URL, $sFeedUrl );
  curl_setopt($rResource, CURLOPT_FOLLOWLOCATION, true);
  curl_setopt($rResource, CURLOPT_TIMEOUT, 60);

  curl_multi_add_handle( $multi_handler, $rResource );
  $aResources[] = array(
    'url'  => $sFeedUrl,
    'client' => $rResource
  );
}

// Асинхронный запрос данных через CURL
$iRunningProcesses = null;
do {
  usleep( 1000000 );
  curl_multi_exec( $rMultiHandler, $iRunningProcesses );
} while( $iRunningProcesses > 0 );

// Обработка полученной информации
foreach( $aResources as $aResource ) {
  $aHeaders = curl_getinfo( $aResource['client'] );
  $sBody  = curl_multi_getcontent( $aResource['client'] );
}




Этот вариант вариант отчасти решит проблему с простоем при ожидании загрузки ленты, однако дальнейший разбор ленты придётся осуществлять в последовательном режиме. Кроме того, он не очень удобен, если в Вашей предметной моделе лента рассматривается как объект некоторого класса.

Второй вариант заключается в создании некоторого множества (пула) дочерних процессов — по каждому на ленту. Сделать это можно, к примеру, с помощью семейства фукций proc_*. Так же разумно было бы ограничить множество одновременно запущенных процессов (размер пула) некоторым числом, чтобы контролировать нагрузку сервера (в принципе это утверждение справедливо и для первого варианта). Для этого придётся сэмулировать диспетчер, который будет следить за состоянием пула и добавлять в него новые процессы по мере завершения работы процессов в пуле.

Ниже приведён самодокументированный пример реализации пула для параллельного выполнения задачи импорта RSS-лент:

/**
* Импорт лент
*/
class Import {

  /**
   * Размера пула процессов импорта
   * @var int
   */
  const POOL_SIZE = 10;

  /**
   * Максимальное время выполнения скрипта импорта заданной ленты
   * @var int
   */
  const POOL_PROC_EXEC_TIME = 180;

  /**
   * Запуск пула процессов импорта лент
   */
  public function startPool() {

    file_put_contents('import.log', "[*] Запуск процесса импорта " .
      PHP_EOL, FILE_APPEND );

    // Инициализация счётчиков
    $iSuccess = 0;
    $iFailure = 0;
    $iUpdated = 0;

    // Формирование списка идентификаторов лент, подлежащих импорту
    $aFeedId = array( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
      13, 14, 15 );

    // Инициализация пула
    $aPool = array();
    for( $iIter = 0; $iIter < self::POOL_SIZE && !empty( $aFeedId );
      $iIter++ ) {
      $iFeedId = array_shift( $aFeedId );

      $this->startProcess( $aPool, $iFeedId );
    }

    // Обработка процессов импорта в рамках пула
    while( !empty( $aPool ) ) {

      // Ожидание работы процессов из пула (1 секунда)
      usleep(1000000);

      // Обработка завершённых процессов
      foreach( $aPool as $iKey => &$aProcess ) {

        // Получение информации о процессе
        $aProcStatus = proc_get_status( $aProcess['handler'] );

        // Процесс завершён
        if( false === $aProcStatus['running'] ) {

          // Получение данных от процесса
          $iResponse = fgets( $aProcess['pipes'][1] );

          // Окончание работы с процессом
          fclose( $aProcess['pipes'][1] );
          fclose( $aProcess['pipes'][2] );
          proc_close( $aProcess['handler'] );

          // Процесс отработал успешно
          // www.php.net/manual/en/function.proc-get-status.php#92145
          if( 0 === $aProcStatus['exitcode']
            && is_numeric( $iResponse ) ) {
            $iSuccess++;
            $iUpdated += $iResponse;

          // В ходе работы процесса возникла ошибка
          } else
            $iFailure++;

          // Замена текущего процесса новым
          unset( $aPool[ $iKey ] );
          if( !empty( $aFeedId ) ) {
            $iFeedId = array_shift( $aFeedId );

            $bIsLaunched = $this->startProcess( $aPool,
              $iFeedId );
            if( !$bIsLaunched )
              $iFailure++;
          }

        // Процесс работает
        } else {

          // Процесс завис
          if( time() - $aProcess['iTimeStart'] >
            self::POOL_PROC_EXEC_TIME ) {
            file_put_contents('import.log', "[!] Процесс импорта ".
              " ленты {$aProcess['iFeedId']} завис и будет " .
              " завершён принудительно" . PHP_EOL, FILE_APPEND );
            $iSingnalCode = 15;
            proc_terminate( $aProcess['handler'], $iSingnalCode );
          }
        }
      }
      unset( $aProcess );
    }

    file_put_contents('import.log', "[*] Процесс импорта завершён: " .
      " успешно {$iSuccess}, неуспешно {$iFailure}, " .
      " добавлено {$iUpdated}" . PHP_EOL, FILE_APPEND );
  }

  /**
   * Запуск процесса импорта ленты в рамках пула
   * @param array $aPool Пул процессов
   * @param int $iFeedId Идентификатор ленты
   */
  public function startProcess( array &$aPool, $iFeedId ) {

    // Инициализация данных для запуска процесса
    // www.php.net/manual/en/function.proc-get-status.php#93382
    $sCmd = "exec php -f " . __FILE__ . " {$iFeedId}";

    $aDescriptors = array(
      1 => array("pipe", "w"),
      2 => array("pipe", "w")
    );
    $aPipes = array();

    // Запуск процесса импорта ленты
    $bSuccess = true;
    $rProcess = proc_open( $sCmd, $aDescriptors, $aPipes );
    if( is_resource( $rProcess ) ) {
      $aPool[] = array(
        'handler'  => $rProcess,
        'pipes'   => $aPipes,
        'iFeedId'  => $iFeedId,
        'iTimeStart'=> time()
      );
    } else {
      $bSuccess = false;
      file_put_contents('import.log', "[!] Не удалось запустить " .
        " процесс импорта ленты {$iFeedId}", FILE_APPEND );
    }

    return $bSuccess;
  }

  /**
   * Импорт ленты
   * @param $iFeedId Идентификатор ленты
   * @return int Количество обновлённых элементов в ленте
   */
  public function doImport( $iFeedId ) {

    file_put_contents('import.log', "[+] Импорт ленты {$iFeedId}" .
      PHP_EOL, FILE_APPEND);

    // Имитация импорта
    $iExecTime = rand( 1, 10 );
    usleep( $iExecTime * 1000000 );
    $iUpdated = rand( 0,10 );

    file_put_contents('import.log', "[-] Импорт ленты {$iFeedId}" .
      " за {$iExecTime} секунд" . PHP_EOL, FILE_APPEND);

    // Отправка количества добавленных элементов родительскому процессу
    echo $iUpdated;

    return $iUpdated;
  }
}

/**
* Контроллер
*/
$oImport = new Import();

// Запуск пула
if( 1 === $argc ) {
  $oImport->startPool();

// Запуск импорта конкретной ленты
} else {
  $iFeedId = $argv[1];
  $oImport->doImport( $iFeedId );
}




Результатом работы этого скрипта будет следующий лог:
[*] Запуск процесса импорта
[+] Импорт ленты 1
[+] Импорт ленты 2
[+] Импорт ленты 3
[+] Импорт ленты 4
[+] Импорт ленты 5
[+] Импорт ленты 8
[+] Импорт ленты 6
[+] Импорт ленты 7
[+] Импорт ленты 9
[+] Импорт ленты 10
[-] Импорт ленты 7 за 1 секунд
[+] Импорт ленты 11
[-] Импорт ленты 1 за 5 секунд
[+] Импорт ленты 12
[-] Импорт ленты 10 за 5 секунд
[-] Импорт ленты 2 за 6 секунд
[-] Импорт ленты 12 за 1 секунд
[+] Импорт ленты 13
[+] Импорт ленты 14
[-] Импорт ленты 3 за 7 секунд
[-] Импорт ленты 6 за 7 секунд
[+] Импорт ленты 15
[-] Импорт ленты 9 за 7 секунд
[-] Импорт ленты 14 за 1 секунд
[-] Импорт ленты 11 за 6 секунд
[-] Импорт ленты 4 за 9 секунд
[-] Импорт ленты 5 за 10 секунд
[-] Импорт ленты 8 за 10 секунд
[-] Импорт ленты 13 за 7 секунд
[-] Импорт ленты 15 за 6 секунд
[*] Процесс импорта завершён: успешно 15, неуспешно 0, добавлено 89


Метод опробован в боевых условиях и на данный момент никаких нариканий не вызывал.
+35
9 апреля 2010, 13:25
107

комментарии (28)

+2
aleks_raiden #
почему бы не использовать для этого специальные инструменты — gearmand
+14
develop7 #
использовать правильные готовые решения вместо написания «велосипедов» — это же не труЪ.
+8
tushev #
Не всегда есть возможность установить заказчику «правильное решение». Иногда все приходится решать на голом PHP.

А еще я полагаю, что если никто не будет изобретать усовершенствованные велосипеды, в нашем мире не будет появляться ничего нового. Так и будем все делать, на «старом и проверенном».
0
Lemoor #
Согласен. Не так давно пришлось писать свою реализацию БД на голом PHP. Извращение? А это ведь было условие, обязательное к выполнению.
+3
develop7 #
Не всегда есть возможность установить заказчику «правильное решение». Иногда все приходится решать на голом PHP.
самый элементарный VPS стоит, кажется, 300р/мес. время разработчика стоит порядка 300р/час. Время, потраченное на разработку велосипеда — в лучшем случае 20-30 часов. Посчитаете сами? И да, установка gearman — 30 минут (если что-то пойдёт не так).
если никто не будет изобретать усовершенствованные велосипеды
Извините, я лично такое видел года три назад и лично же его допиливал. Я бы отнёс именно приведённое решение к «старым и проверенным», так как качественного прорыва здесь нет. Чем это решение отличается в лучшую сторону от стапятисот подобных поделий с phpclasses.org, я затрудняюсь сказать (хоть и практически уверен, что ничем).
0
fantaseour #
это где это VPS за 300р/мес?

если пройти мимо скидок при покупке на год, то стоить будет 1000р. Народ жмется, правда.
+2
Vit228 #
не сочтите за рекламу
я от «шаред» отказался в пользу vds от 1гб. ру
пока доволен.
0
fantaseour #
интересно почему сильно различаются праметры OpenVZ и Hyper-V?

я сижу на виртуоззо и что-то получается даже 1500р в мес. Мелколавочного клиента на такое трудно уговорить. А на 300-400р вполне можно.
0
develop7 #
Ну вот у 1gb.ru нижняя планка как раз 300RUR/мес. Для многопоточного граббера интернетов — за глаза, мягко говоря.
0
fantaseour #
Хостеры оказывают медвежью услугу предоставляя шаред на самом деле. Ибо разница не столь велика, но в глазах клиента она в три раза. Ну и все — возникает куча костылей, которые потом только боком выйдут.
0
m00t #
Есть шаред хостинг за 10$, все рюшечки имеются — вплоть до SSH. Есть VDS за 20-30$ или около того (64 оперативки, 400 проц) от ПервыхВДС. И есть веб-приложение на Zend Framework. На шареде летает. На VDS — 3-4 сек генерация страницы. Чтобы на ВДС тоже работало нормально, нужно купить далеко не за 20-30$ сервер.
Я не говорю про нагрузки от большого количества посетителей — это отдельная тема.
0
m00t #
Я к тому, что дешевый VDS != шаред + root access в общем случае.
+2
SilentImp #
За разработку «велосипеда» заказчик заплатит 1 раз.
За VPS платит на протяжении всего времени жизни проекта.
Часто первое — дешевле.
Кроме того, многие проекты разрабатываются в рамках фиксированного бюджета и набора условий, а клиента и разработчика разделяет цепочка из N менеджеров, никто из которых не знают даже того, как расшифровывается VPS. Более того, им это не интересно и шанс докричатся до того, кто понимает о чем речь и может принять решения равен нулю. Да, все бывает ТАК печально.
И тогда способность создать такой вот велосипед имеет вес золота.
0
SilentImp #
Кстати… выделенных серверов виртуальных с более менее интересными для меня параметрами дешевле, чем за 50$/месяц я что то не видел.
+1
krestjaninoff #
Честно говоря, не знал про gearman, так что спасибо за ценный комментарий. Не уверен, что его стоило бы применять в данном случае в виду простоты исходной задачи, но в целом данный инструмент может быть весьма полезен.
+1
tushev #
Согласен. Gearman неплох если делаешь сеть краулеров а-ля гугл на нескольких разных физических машинах или даже датацентрах. Когда много разного ПО… А тут автор предложил простенький RSS грабер на несколько десятков источников, и поразмышлял, как бы это поудобнее и попроще реализовать.

Только вот последнее время на хабре у комментаторов пошла тенденция «стрелять из пушки по воробьям». И если вы пытаетесь решить простую задачу простым способом, то вас могут обозвать дилетантом.
0
ruFog #
спасибо! как раз пригодится для пачковой рассылки почты.
0
crocodile2u #
А можно сравнение со stream_select? В плане удобства использования в основном…
+2
kashey #
Имхо правильный вариант — форкать некий универсальный контролер, который далее к тебе приконектиться, а ты ему по сокету передашь нужную информацию, в том числе и более сложную чем просто номер фида…
0
Quiz #
Будет над чем подумать, спасибо.
Одна из задач, в которую я в последнее время упёрся — реализация пусть медленного, но «лёгкого» многопоточного краулера для одного из проектов.
Когда допилю его, надо будет статейку на Хабре награфоманить. =)
Что касается велосипедов, о которых очень часто пишут в таких случаях — иногда от них не удаётся уйти. Это касается, в основном, нетривиальных задач в системах, построенных на нечёткой логике.
+1
m00t #
ИМХО, проблема немного надумана. Первый пример с мультикурлом — ждем выполнения _всех_ загрузок, только потом их обрабатываем. Почему бы не обрабатывать каждую ленту сразу после ее загрузки? Т.е. поставили на загрузку 10, одна из них загрузилась, еще девять грузятся. В это время мы вполне ее успеем распарсить и сложить как надо в базу. Очень редко у меня многопоточный граббер упирался в своей роизводительности в роцессор и mySQL — как правило именно загрузка контента является самым узким местом. Ну так давайте параллелить только ее? Благо мультикурл прекрасно справляется с такой задачей.
+4
tushev #
Мне кажется, что во многих случаях удобно разделить грабер и обработчик.
Есть список URL-ов в БД. Грабер берет очередную порцию URL-ов, и в несколько потоков выкачивает их, загружает сырой результат в ту же БД. Затем берет следующую порцию и т.д.
Совершенно независимо, параллельно с этим работает обработчик (парсер). Когда в базе появляются новые сырые данные он начинает их обрабатывать уже по очереди. Возможно этот обработчик сам добавляет новые URL-лы для грабера, например если это краулер.
0
symbix #
Забыт еще один способ — самостоятельное мультиплексирование. Раскладывается, собственно, на два варианта: 1) socket_select/stream_select, 2) pecl/libevent — если надо обрабатывать действительно большое количество параллельных запросов.
0
nfx #
Для одного проекта занимался паралельной обработкой 200к записей. Работа с несколькими АРІ одновременно. Были свои подводные камни. Построение на основе Dependency Injection не раз спасало. Может как-то написать об етом топик.
+1
m00t #
Можно узнать, какое отношение DI имеет к параллельной обработке данных?
+1
phpdude #
ему просто слово нравится, это как «нанотехнологии» «во всем»
–2
egorinsk #
Ненавижу названия функций типа doImport(). Что за фигня? чем она отличается от import()?
0
dzugaru #
Семантикой )

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.