Pull to refresh

Параллельный импорт данных

Reading time8 min
Views3.6K
Представим, что у нас есть некоторый набор задач, допускающих возможность параллельного выполнения. К примеру, нам нужно организовать RSS-агрегатор, обновляющий через заданный промежуток времени все свои ленты. Понятно, что основное и при этом вполне ощутимое время будет уходить на загрузку данных с удалённого источника. Учитывая это, организация такого импорта путём последовательной загрузки лент лишена смысла, так в случае сколь-либо большого количества лент, импорт не будет укладываться в отведённые ему сроки.

Варианта решения проблема тут можеть быть два. Первый — реализовать параллельную загрузку лент с помощью CURL'a. Наприме так:

// Подготовка данных для асинхронного запроса
$rMultiHandler = curl_multi_init();
$aResources = array();
foreach( $aFeedUrls as $sFeedUrl ) {
  $rResource = curl_init();

  curl_setopt($rResource, CURLOPT_RETURNTRANSFER, 1);
  curl_setopt($rResource, CURLOPT_URL, $sFeedUrl );
  curl_setopt($rResource, CURLOPT_FOLLOWLOCATION, true);
  curl_setopt($rResource, CURLOPT_TIMEOUT, 60);

  curl_multi_add_handle( $multi_handler, $rResource );
  $aResources[] = array(
    'url'  => $sFeedUrl,
    'client' => $rResource
  );
}

// Асинхронный запрос данных через CURL
$iRunningProcesses = null;
do {
  usleep( 1000000 );
  curl_multi_exec( $rMultiHandler, $iRunningProcesses );
} while( $iRunningProcesses > 0 );

// Обработка полученной информации
foreach( $aResources as $aResource ) {
  $aHeaders = curl_getinfo( $aResource['client'] );
  $sBody  = curl_multi_getcontent( $aResource['client'] );
}




Этот вариант вариант отчасти решит проблему с простоем при ожидании загрузки ленты, однако дальнейший разбор ленты придётся осуществлять в последовательном режиме. Кроме того, он не очень удобен, если в Вашей предметной моделе лента рассматривается как объект некоторого класса.

Второй вариант заключается в создании некоторого множества (пула) дочерних процессов — по каждому на ленту. Сделать это можно, к примеру, с помощью семейства фукций proc_*. Так же разумно было бы ограничить множество одновременно запущенных процессов (размер пула) некоторым числом, чтобы контролировать нагрузку сервера (в принципе это утверждение справедливо и для первого варианта). Для этого придётся сэмулировать диспетчер, который будет следить за состоянием пула и добавлять в него новые процессы по мере завершения работы процессов в пуле.

Ниже приведён самодокументированный пример реализации пула для параллельного выполнения задачи импорта RSS-лент:

/**
* Импорт лент
*/
class Import {

  /**
   * Размера пула процессов импорта
   * @var int
   */
  const POOL_SIZE = 10;

  /**
   * Максимальное время выполнения скрипта импорта заданной ленты
   * @var int
   */
  const POOL_PROC_EXEC_TIME = 180;

  /**
   * Запуск пула процессов импорта лент
   */
  public function startPool() {

    file_put_contents('import.log', "[*] Запуск процесса импорта " .
      PHP_EOL, FILE_APPEND );

    // Инициализация счётчиков
    $iSuccess = 0;
    $iFailure = 0;
    $iUpdated = 0;

    // Формирование списка идентификаторов лент, подлежащих импорту
    $aFeedId = array( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
      13, 14, 15 );

    // Инициализация пула
    $aPool = array();
    for( $iIter = 0; $iIter < self::POOL_SIZE && !empty( $aFeedId );
      $iIter++ ) {
      $iFeedId = array_shift( $aFeedId );

      $this->startProcess( $aPool, $iFeedId );
    }

    // Обработка процессов импорта в рамках пула
    while( !empty( $aPool ) ) {

      // Ожидание работы процессов из пула (1 секунда)
      usleep(1000000);

      // Обработка завершённых процессов
      foreach( $aPool as $iKey => &$aProcess ) {

        // Получение информации о процессе
        $aProcStatus = proc_get_status( $aProcess['handler'] );

        // Процесс завершён
        if( false === $aProcStatus['running'] ) {

          // Получение данных от процесса
          $iResponse = fgets( $aProcess['pipes'][1] );

          // Окончание работы с процессом
          fclose( $aProcess['pipes'][1] );
          fclose( $aProcess['pipes'][2] );
          proc_close( $aProcess['handler'] );

          // Процесс отработал успешно
          // www.php.net/manual/en/function.proc-get-status.php#92145
          if( 0 === $aProcStatus['exitcode']
            && is_numeric( $iResponse ) ) {
            $iSuccess++;
            $iUpdated += $iResponse;

          // В ходе работы процесса возникла ошибка
          } else
            $iFailure++;

          // Замена текущего процесса новым
          unset( $aPool[ $iKey ] );
          if( !empty( $aFeedId ) ) {
            $iFeedId = array_shift( $aFeedId );

            $bIsLaunched = $this->startProcess( $aPool,
              $iFeedId );
            if( !$bIsLaunched )
              $iFailure++;
          }

        // Процесс работает
        } else {

          // Процесс завис
          if( time() - $aProcess['iTimeStart'] >
            self::POOL_PROC_EXEC_TIME ) {
            file_put_contents('import.log', "[!] Процесс импорта ".
              " ленты {$aProcess['iFeedId']} завис и будет " .
              " завершён принудительно" . PHP_EOL, FILE_APPEND );
            $iSingnalCode = 15;
            proc_terminate( $aProcess['handler'], $iSingnalCode );
          }
        }
      }
      unset( $aProcess );
    }

    file_put_contents('import.log', "[*] Процесс импорта завершён: " .
      " успешно {$iSuccess}, неуспешно {$iFailure}, " .
      " добавлено {$iUpdated}" . PHP_EOL, FILE_APPEND );
  }

  /**
   * Запуск процесса импорта ленты в рамках пула
   * @param array $aPool Пул процессов
   * @param int $iFeedId Идентификатор ленты
   */
  public function startProcess( array &$aPool, $iFeedId ) {

    // Инициализация данных для запуска процесса
    // www.php.net/manual/en/function.proc-get-status.php#93382
    $sCmd = "exec php -f " . __FILE__ . " {$iFeedId}";

    $aDescriptors = array(
      1 => array("pipe", "w"),
      2 => array("pipe", "w")
    );
    $aPipes = array();

    // Запуск процесса импорта ленты
    $bSuccess = true;
    $rProcess = proc_open( $sCmd, $aDescriptors, $aPipes );
    if( is_resource( $rProcess ) ) {
      $aPool[] = array(
        'handler'  => $rProcess,
        'pipes'   => $aPipes,
        'iFeedId'  => $iFeedId,
        'iTimeStart'=> time()
      );
    } else {
      $bSuccess = false;
      file_put_contents('import.log', "[!] Не удалось запустить " .
        " процесс импорта ленты {$iFeedId}", FILE_APPEND );
    }

    return $bSuccess;
  }

  /**
   * Импорт ленты
   * @param $iFeedId Идентификатор ленты
   * @return int Количество обновлённых элементов в ленте
   */
  public function doImport( $iFeedId ) {

    file_put_contents('import.log', "[+] Импорт ленты {$iFeedId}" .
      PHP_EOL, FILE_APPEND);

    // Имитация импорта
    $iExecTime = rand( 1, 10 );
    usleep( $iExecTime * 1000000 );
    $iUpdated = rand( 0,10 );

    file_put_contents('import.log', "[-] Импорт ленты {$iFeedId}" .
      " за {$iExecTime} секунд" . PHP_EOL, FILE_APPEND);

    // Отправка количества добавленных элементов родительскому процессу
    echo $iUpdated;

    return $iUpdated;
  }
}

/**
* Контроллер
*/
$oImport = new Import();

// Запуск пула
if( 1 === $argc ) {
  $oImport->startPool();

// Запуск импорта конкретной ленты
} else {
  $iFeedId = $argv[1];
  $oImport->doImport( $iFeedId );
}




Результатом работы этого скрипта будет следующий лог:
[*] Запуск процесса импорта
[+] Импорт ленты 1
[+] Импорт ленты 2
[+] Импорт ленты 3
[+] Импорт ленты 4
[+] Импорт ленты 5
[+] Импорт ленты 8
[+] Импорт ленты 6
[+] Импорт ленты 7
[+] Импорт ленты 9
[+] Импорт ленты 10
[-] Импорт ленты 7 за 1 секунд
[+] Импорт ленты 11
[-] Импорт ленты 1 за 5 секунд
[+] Импорт ленты 12
[-] Импорт ленты 10 за 5 секунд
[-] Импорт ленты 2 за 6 секунд
[-] Импорт ленты 12 за 1 секунд
[+] Импорт ленты 13
[+] Импорт ленты 14
[-] Импорт ленты 3 за 7 секунд
[-] Импорт ленты 6 за 7 секунд
[+] Импорт ленты 15
[-] Импорт ленты 9 за 7 секунд
[-] Импорт ленты 14 за 1 секунд
[-] Импорт ленты 11 за 6 секунд
[-] Импорт ленты 4 за 9 секунд
[-] Импорт ленты 5 за 10 секунд
[-] Импорт ленты 8 за 10 секунд
[-] Импорт ленты 13 за 7 секунд
[-] Импорт ленты 15 за 6 секунд
[*] Процесс импорта завершён: успешно 15, неуспешно 0, добавлено 89


Метод опробован в боевых условиях и на данный момент никаких нариканий не вызывал.
Tags:
Hubs:
Total votes 57: ↑46 and ↓11+35
Comments29

Articles