31 октября 2015 в 21:15

Параллельная обработка большого селекта в нескольких сессиях

Представьте: есть селект, который возвращает записи, каждую из которых нужно обработать, и то ли много записей, то ли обработка каждой записи занимает много времени, а процесс обработки одной записи не зависит от процессов других записей.
Классический пример для того, чтобы задействовать многопоточность или в случае баз данных выполнять обработку в нескольких сессиях. В Оракле для этого используется hint /*+ parallel() */ и pipelined functions. Это здорово, но если у вас Oracle standard edition(где parallel не работает) или вы хотите обработать не каждую запись по отдельности(из соображений, что лучше накопить работу, а потом в bulk, одним ударом, выполнить), а поделить весь вывод селекта на куски и каждый обработать отдельно?

Задача ставится так:

Написать Java stored procedure, которая получает следующие параметры:
  • Текст селекта
  • Имя процедуры, которая будет работать с порцией данных
  • Колличество потоков(Thread)
  • Данные, необходимые для подключения к базе

Сначала посмотрим, что можно сделать с pipelined функцией.

Java откроет по тексту селекта result set в default connection.
Первым делом надо выполнить
select count(*) from («Текст селекта»);
Создадим connection pool с размерностью, заданной в 3-м параметре.
Создадим отдельные сессии, присоединившись через jdbc connection.

Данные для этого возьмем из 4-го параметра, нам, по большому счету нужен только пароль, все остальное получим сами(может еще порт, если он отличен от 1521).

Будем получать данные из селекта в default connection и переписывать их в сессию из пула. Как только решим, что накопили достаточно, создадим thread, передадим ему эту connection как параметр и пусть работает, а мы продолжим со следующей сессией или, если все уже прочитано, подождем окончания всех потоков.

Напишем функцию обработки. Она получает все поля селекта как параметры.

Будет удобно, чтобы, например, первые два параметра были бы номер в порции и ее размерность. Это даст возможность в dbms info выводить процент выполнения в потоке.

По метадате селекта будем конструировать ее вызов в виде примерно так:

begin proc1(23,14000,'a1',3,'tratata',35,48); end;

Хранить будем только такую строку.

Вначале это был 2-х мерный массив (i,j), где i — это номер потока(в дальнейшем...). Потом я увидел, что при большом числе записей, затраты Oracle на поддержку большого массива становятся чрезмерными и решил пользоваться также временной таблицей(temporary table).

Я положил границу в 200,000 записей. Если селект count(*) возвращает меньше 200,000 Java в-runtime использует 2-х мерный String массив, если больше — пишет во временную таблицу parallel_calls_tmp с одним полем varchar2(4000).

Итак, в PL/SQL пакете создаем функцию
FUNCTION run_pipe_parallel(pi_Select_Txt VARCHAR2,
pi_Proc_Name VARCHAR2,
pi_Parallel_Count VARCHAR2,
Pi_Password VARCHAR2) RETURN VARCHAR2 AS
LANGUAGE JAVA NAME 'com.samtrest.ParallelRunner.run_parallel(java.lang.String, java.lang.String,java.lang.String, java.lang.String) return java.lang.String';

Создаем таблицу
-- Create table
create global temporary table parallel_calls_tmp
(
  call_str varchar2(4000)
)
on commit preserve rows;

На стороне Java есть функция
  public static String run_parallel(String selectTxt, 
      String procedureName, 
      String threadCount,
      String password) throws NumberFormatException, SQLException, ClassNotFoundException {
    String rc = "OK";
    ParallelRunner parallelRunner = new  ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),password);

    try {
      parallelRunner.runProc();
    } catch (SQLException e) {
      e.printStackTrace();
      rc = e.getMessage();
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
      rc = e.getMessage();
    }   
    return rc;
  }


Получение массива типов данных полей селекта
    res = stm.executeQuery();
    ResultSetMetaData meta = res.getMetaData();
    columnCount = meta.getColumnCount();
    int [] types = new int[columnCount];
    for (int k = 0; k < columnCount; k++) {
      types[k] = meta.getColumnType(k+1);     
    }


Так строим строку вызова:
    while (res.next()){
      callStr =  "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount;
      for (int k = 0; k < columnCount; k++) {
        callStr = callStr+",";
        String value = "";
        if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){
          value = res.getString(k+1);
          if (value == null){
            value = "null";
          }else{
            value = "'"+value+"'";
          }
        }else if (types[k] == java.sql.Types.NUMERIC){
          BigDecimal number  = res.getBigDecimal(k+1);
          if (number == null){
            value = "null";
          }else{
            value = number.toString();
          }
        }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){
          Timestamp date  = res.getTimestamp(k+1);
          if (date == null){
            value = "null";
          }else{
            value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')";
          }
        }else{
          System.out.println(""+types[k]);
        }
        callStr = callStr + value;
      }

      callStr = callStr + "); end;";


Накапливаем в массиве или таблице
      if (rowCount > CHUNK_LIMIT){
        insert.setString(1, callStr);
        insert.executeUpdate();
      }else{
        chunks[i][j] = callStr;
      }


А теперь весь класс, который нужно загрузить в базу.
create or replace and compile java source named "ParallelRunner" as
package com.samtrest;

import java.math.BigDecimal;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;

public class ParallelRunner {

  String selectTxt;
  String procedureName;
  String tableName = "";
  String additional;

  int threadCount,processSeq;
  String host="127.0.0.1",instance,port="1521",userName,password;
  Connection [] connPool;
  Connection defaultConn;
  ChunkRunner [] chunkRunners;
  String [][] chunks;
  int  CHUNK_LIMIT = 200000;

  public ParallelRunner(String selectTxt, String procedureName, int threadCount,String psw) throws SQLException, ClassNotFoundException {
    super();
    this.selectTxt = selectTxt;
    this.procedureName = procedureName;
    this.threadCount = threadCount;
    this.port = "1521";
    this.password=psw;
    connPool = new Connection[threadCount];
    chunkRunners = new ChunkRunner[threadCount];
  }

  public static String run_parallel(String selectTxt, 
      String procedureName, 
      String threadCount,
      String psw) throws NumberFormatException, SQLException, ClassNotFoundException {
    String rc = "OK";
    ParallelRunner parallelRunner = new  ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),psw);

    try {
      parallelRunner.runProc();
    } catch (SQLException e) {
      e.printStackTrace();
      rc = e.getMessage();
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
      rc = e.getMessage();
    }   
    return rc;
  }

  public void populateConnectionPool() throws SQLException, ClassNotFoundException{
    int siteNumber = 0;
    String siteStatus ="T";
    ResultSet res;
    Class.forName("oracle.jdbc.driver.OracleDriver");
    defaultConn = DriverManager.getConnection("jdbc:default:connection:");
    PreparedStatement stm = defaultConn.prepareStatement("SELECT  SYS_CONTEXT('USERENV','SESSION_USER') from dual");
    res = stm.executeQuery();
    if (res.next()){
      userName = res.getString(1);
    }
    res.close();

    stm = defaultConn.prepareStatement("SELECT  SYS_CONTEXT('USERENV','DB_NAME') from dual");
    res = stm.executeQuery();
    if (res.next()){
      instance = res.getString(1);
    }
    res.close();

    for (int i = 0; i < connPool.length; i++) {
      connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@"+host+":"+port+":"+instance, userName, password);
      //connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@10.28.28.101:1521:orc1", userName, password);
      connPool[i].setAutoCommit(false);
      stm = connPool[i].prepareStatement("begin dbms_application_info.set_module('Java Parallel; process:'||"+
          processSeq+",'"+connPool.length+" threads'); end;");
      stm.executeUpdate();
    }
    stm.close();
  }

  public void runProc() throws SQLException, ClassNotFoundException{
    int rowCount = 0,columnCount, chunkCount,i,j;
    String callStr="";
    PreparedStatement stm;
    Statement info;
    PreparedStatement insert = null;

    populateConnectionPool();
    info = defaultConn.createStatement();
    info.executeUpdate("begin dbms_application_info.set_module('Parallel process:'||"+
        processSeq+",'"+threadCount+" threads'); end;");

    System.out.println(selectTxt);

    stm = defaultConn.prepareStatement("select count(*) from ("+selectTxt+")");
    ResultSet res = stm.executeQuery();

    if ( res.next())
      rowCount  = res.getInt(1);
    res.close();
    stm.close();

    chunkCount = rowCount/threadCount;
    if (chunkCount*threadCount < rowCount){
      chunkCount++;
    }
    i = 0;
    j = 0;
    //    System.out.println("Count of parallel threads: "+connPool.length);
    //    System.out.println("Count of processing rows: "+rowCount);
    //    System.out.println("Chunk length: "+chunkCount);

    info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+"'); end;");

    stm = defaultConn.prepareStatement(selectTxt);

    res = stm.executeQuery();
    ResultSetMetaData meta = res.getMetaData();
    columnCount = meta.getColumnCount();
    int [] types = new int[columnCount];
    for (int k = 0; k < columnCount; k++) {
      types[k] = meta.getColumnType(k+1);     
    }

    if (rowCount > CHUNK_LIMIT){
      insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)");
    }else{
      chunks = new String[threadCount][chunkCount];
    }
    while (res.next()){
      callStr =  "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount;
      for (int k = 0; k < columnCount; k++) {
        callStr = callStr+",";
        String value = "";
        if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){
          value = res.getString(k+1);
          if (value == null){
            value = "null";
          }else{
            value = "'"+value+"'";
          }
        }else if (types[k] == java.sql.Types.NUMERIC){
          BigDecimal number  = res.getBigDecimal(k+1);
          if (number == null){
            value = "null";
          }else{
            value = number.toString();
          }
        }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){
          Timestamp date  = res.getTimestamp(k+1);
          if (date == null){
            value = "null";
          }else{
            value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')";
          }
        }else{
          System.out.println(""+types[k]);
        }
        callStr = callStr + value;
      }

      callStr = callStr + "); end;";
      if (i == 0){
      if( j == 0 ){
        System.out.println(callStr);
      }
      }
      if (rowCount > CHUNK_LIMIT){
        insert.setString(1, callStr);
        insert.executeUpdate();
      }else{
        chunks[i][j] = callStr;
      }
      j++;
      if (j == chunkCount){
        connPool[i].commit();
        if (rowCount > CHUNK_LIMIT){
          chunkRunners[i] = new ChunkRunner(connPool[i],processSeq);
        }else{
          chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]);
        }
        chunkRunners[i].start();
        i++;
        if (i < connPool.length ){
          if (rowCount > CHUNK_LIMIT){
            insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)");
          }
          j = 0;
        }
      }
      info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+" threads "+i+","+j+"'); end;");
    }
    res.close();
    stm.close();
    info.close();

    connPool[i].commit();
    if (j < chunkCount){
      if (rowCount > CHUNK_LIMIT){
        chunkRunners[i] = new ChunkRunner(connPool[i],processSeq);
      }else{
        chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]);
      }
      chunkRunners[i].start();
    }
    for (int k = 0; k < chunkRunners.length; k++) {
      if (chunkRunners[k] != null){
        try {
          chunkRunners[k].join();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    for (int k = 0; k < chunkRunners.length; k++) {
      if (chunkRunners[k] != null){
        if (!connPool[k].isClosed()){
          connPool[k].close();
        }
        if (!"".equals(chunkRunners[k].errorMsg)){
          throw(new SQLException(chunkRunners[k].errorMsg));
        }
      }   
    }
    defaultConn.close();
  }

  public class ChunkRunner extends Thread {

    Connection conn;
    String errorMsg = "";
    String [] chunk;
    String internal;
    int processSeq;

    public ChunkRunner(Connection conn, int process) {
      super();
      this.conn = conn;
      this.processSeq = process;
    }

    public ChunkRunner(Connection conn, int process,String []chunk) {
      super();
      this.conn = conn;
      this.chunk = chunk;
      this.processSeq = process;
    }

    public ChunkRunner(Connection conn, int process,String inter) {
      super();
      this.conn = conn;
      this.processSeq = process;
      this.internal = inter;
    }

    public void run(){
      Statement stm = null;
      PreparedStatement select = null;
      String stmt="";

      try {
        stm = conn.createStatement();
        if ("".equals(tableName)){
          if( chunk == null){
            select = conn.prepareStatement("select * from parallel_calls_tmp");
            ResultSet res = select.executeQuery();

            while (res.next()){
              stmt = res.getString(1);
              if (stmt != null){
              if ( stmt != ""){
                stm.executeUpdate(stmt);
              }
              }
            }       
          }else{
            for (int i = 0; i < chunk.length; i++) {
              stmt = chunk[i];
              if (stmt != null){
              if (stmt != ""){
                stm.executeUpdate(stmt);
              }
              }
            }
          }
          stm.close();

        }

      } catch (SQLException e) {
        System.out.println(stmt);
        e.printStackTrace();
        errorMsg = e.getMessage();
        e.printStackTrace();
      } finally {
        try {
          conn.commit();
          conn.close();
        } catch (SQLException e) {
          e.printStackTrace();
        }         
      }
    }
  }
}
/

Пример функции-обработчика
  PROCEDURE close_row_process(pi_CurrentInChunk      NUMBER,
                              pi_ChunkLength         NUMBER,
                              pi_activity_folder_seq fr_activity_folders.activity_folder_seq%TYPE,
                              pi_action_seq          fr_folder_actions.action_seq%TYPE,
                              pi_demand_sum_min      NUMBER,
                              pi_action_code         fr_folder_actions.action_code%TYPE,
                              pi_demand_sum          fr_folder_actions.demand_sum%TYPE,
                              pi_contract_acc_num    fr_activity_folders.contract_acc_num%TYPE,
                              pi_debt_reduction_sum  NUMBER,
                              pi_response_type       NUMBER,
                              pi_section_num         NUMBER,
                              pi_client_id           fr_activity_folders.client_id%TYPE,
                              pi_client_type         fr_activity_folders.client_type%TYPE);


Пример использования
    dbms_application_info.set_module(module_name => 'close_folders_parallel,Process: ' ||
                                                    l_ProcessSeq,
                                     action_name => '');
    v_Step      := 1;
    l_SelectTxt := 'SELECT af.activity_folder_seq,' ||
                   '       fa.action_seq,' ||
                   '       CASE WHEN fa.action_code = 3' THEN' ||
                   '          rs.folder_closing_sum' || '         ELSE' ||
                   '          fac.demand_sum_min' ||
                   '       END demand_sum_min,' || '       fa.action_code,' ||
                   '       fa.demand_sum,' || '       af.contract_acc_num,' ||
                   '       nvl(rs.debt_reduction_sum,0) debt_reduction_sum,' ||
                   '       fab.response_type,rca.section_num,af.client_id,af.client_type' ||
                   '  FROM fr_activity_folders af,' ||
                   '       fr_folder_actions   fa,' ||
                   '       rm_contract_acc     rca,' ||
                   '       fr_actions_codes    fac,' ||
                   '       rm_section          rs,' ||
                   '       fr_action_bank_answers fab' ||
                   ' WHERE af.current_action_seq = fa.action_seq' ||
                   '   AND rca.contract_acc_num = af.contract_acc_num' ||
                   '   AND fa.action_code = fac.action_code' ||
                   '   AND rca.section_num = rs.section_num(+)' ||
                   '   AND fa.action_seq= fab.action_seq(+)' ||
                   '   AND af.folder_status = ' ||
                   fr_check_potential_pkg.c_FOLDER_OPEN_STATUS --||'    AND rownum <= 1000'
     ;
    l_ProcName  := 'fr_support_pkg.close_row_process';

    -- Get amount limits for future actions
    ------------------------------------------------------
    -- populate temporary table with calc balance results in parallel
    dbms_application_info.set_action(action_name => 'Parallel Java');
    dbms_java.set_output(1000000);

    SELECT t.parallel_num
      INTO v_ParallelCount
      FROM sr_task_codes t
     WHERE t.task_seq = 203;

    v_Msg := run_pipe_parallel(pi_Select_Txt     => l_SelectTxt,
                               pi_Proc_Name      => l_ProcName,
                               pi_Parallel_Count => v_ParallelCount,
                               Pi_Password    => 'psw');
    IF v_Msg <> 'OK' THEN
      RAISE ErrException;
    END IF;
    ------------------------------------------------------


В принципе, я сейчас подумал, что сработает не только с Oracle, а с любой базой…

Если кому интересно, могу рассказать, что я добавил для того, чтобы работать не с симуляцией pipelined функции, а с выполнением отдельных batches…

Могу сказать, что я в результате получил выигрыш во времени: 12 часов в одной сессии против часа с половиной в 25 сессиях. При этом все 16 процессоров сервера были нагружены под 100%.
Автор: @jvs
Samtrest
рейтинг 10,85
Поиск зависимостей объектов базы данных в коде

Комментарии (9)

  • 0
    почему число сессий не равно количеству ядер? из-за ожидания ответа от сервера?
    мимо не JAVA кодер
    • 0
      Зависимость между количеством ядер и сессий не прямая. Oracle и OS распределяют сами. Я, конечно, не могу сказать точно как, сужу по результату. Давал и 150 сессий — работало, правда без особого улучшения по сравнению с 25, но и не умирало. Тут ведь и обращения к дискам и swap памяти. Самое простое поиграть с параметром на вашей машине, благо, это очень просто…
  • +1
    Для большинства подобных случаев хватит dbms_parallel_execute, который работает под любой версией Oralce. Он достаточно прост и понятен, умеет прерывать и возобновлять процесс.

    Если есть координирующий процесс и размер порций работы одинаковый и не слишком большой, то использование dbms_app_info кажется излишним. Главный поток может просто смотреть, сколько выполнено и сколько осталось.
    • 0
      dbms_parallel_execute работает начиная с 11g Release 2. Том Кайт предлагал параллелизм в ранних версиях тоже делать вручную. Вот ссылка. Мой метод — это один из возможных. Кроме того, речь идет об pipelined функциях, это другая реализация параллелизма, не только деление на порции, но и обработка каждой строки. По поводу DBMS INFO, это только для красоты, можно включать при отладке, чтобы посматривать, как идут процессы.
  • 0
    хинт /* parralels */ это вообще не из этой оперы. Он выполняет параллелизацию запроса, наример сканы, сортировки и группировки, инсерт, балк лоад и т.п.
  • 0
    Вы неправы, это как раз имеет отношение к pipelined функциям. Посмотрите У Барлесона
  • 0
    Любопытно, есть несколько вопросов.
    1. Как вы пришли к решению создавать ХП, а не отдавать параллелизм на сторону приложения?
    2. К примеру, если в одном потоке транзакция аварийно откатывается, данные автоматически становятся несогласованными? Как вы обрабатываете такие случаи? Или бизнес-логика в вашем примере не критична и допускает просто повторный запуск процедуры?
    • 0
      1. Я думаю, что Oracle лучше разберется с параллельностью сессий, он это делал хорошо еще когда Явы не было. И кроме того, когда с данными работают максимально близко(внутри самой базы), я считаю это здоровей всего.
      2. Тут решение за Вами. В моем случае, все равно, то, что отработало, второй раз не попадет в селект, но если важно, можно придумать как реагировать на падение одного или нескольких процессов. Лучше всего отлавливать exception в Jave…
    • 0
      Кроме того, сервер базы — это обычно сильный компьютер с несколькими ядрами, а точно есть зависимость между параллельностью процессов и многоядерностью.

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Самое читаемое Разработка