Параллельная обработка большого селекта в нескольких сессиях
Представьте: есть селект, который возвращает записи, каждую из которых нужно обработать, и то ли много записей, то ли обработка каждой записи занимает много времени, а процесс обработки одной записи не зависит от процессов других записей.
Классический пример для того, чтобы задействовать многопоточность или в случае баз данных выполнять обработку в нескольких сессиях. В Оракле для этого используется hint /*+ parallel() */ и pipelined functions. Это здорово, но если у вас Oracle standard edition(где parallel не работает) или вы хотите обработать не каждую запись по отдельности(из соображений, что лучше накопить работу, а потом в bulk, одним ударом, выполнить), а поделить весь вывод селекта на куски и каждый обработать отдельно?
Задача ставится так:
Написать Java stored procedure, которая получает следующие параметры:
- Текст селекта
- Имя процедуры, которая будет работать с порцией данных
- Колличество потоков(Thread)
- Данные, необходимые для подключения к базе
Сначала посмотрим, что можно сделать с pipelined функцией.
Java откроет по тексту селекта result set в default connection.
Первым делом надо выполнить
select count(*) from («Текст селекта»);
Создадим connection pool с размерностью, заданной в 3-м параметре.
Создадим отдельные сессии, присоединившись через jdbc connection.
Данные для этого возьмем из 4-го параметра, нам, по большому счету нужен только пароль, все остальное получим сами(может еще порт, если он отличен от 1521).
Будем получать данные из селекта в default connection и переписывать их в сессию из пула. Как только решим, что накопили достаточно, создадим thread, передадим ему эту connection как параметр и пусть работает, а мы продолжим со следующей сессией или, если все уже прочитано, подождем окончания всех потоков.
Напишем функцию обработки. Она получает все поля селекта как параметры.
Будет удобно, чтобы, например, первые два параметра были бы номер в порции и ее размерность. Это даст возможность в dbms info выводить процент выполнения в потоке.
По метадате селекта будем конструировать ее вызов в виде примерно так:
begin proc1(23,14000,'a1',3,'tratata',35,48); end;
Хранить будем только такую строку.
Вначале это был 2-х мерный массив (i,j), где i — это номер потока(в дальнейшем...). Потом я увидел, что при большом числе записей, затраты Oracle на поддержку большого массива становятся чрезмерными и решил пользоваться также временной таблицей(temporary table).
Я положил границу в 200,000 записей. Если селект count(*) возвращает меньше 200,000 Java в-runtime использует 2-х мерный String массив, если больше — пишет во временную таблицу parallel_calls_tmp с одним полем varchar2(4000).
Итак, в PL/SQL пакете создаем функциюFUNCTION run_pipe_parallel(pi_Select_Txt VARCHAR2, pi_Proc_Name VARCHAR2, pi_Parallel_Count VARCHAR2, Pi_Password VARCHAR2) RETURN VARCHAR2 AS LANGUAGE JAVA NAME 'com.samtrest.ParallelRunner.run_parallel(java.lang.String, java.lang.String,java.lang.String, java.lang.String) return java.lang.String';
Создаем таблицу
-- Create table
create global temporary table parallel_calls_tmp
(
call_str varchar2(4000)
)
on commit preserve rows;
На стороне Java есть функция
public static String run_parallel(String selectTxt,
String procedureName,
String threadCount,
String password) throws NumberFormatException, SQLException, ClassNotFoundException {
String rc = "OK";
ParallelRunner parallelRunner = new ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),password);
try {
parallelRunner.runProc();
} catch (SQLException e) {
e.printStackTrace();
rc = e.getMessage();
} catch (ClassNotFoundException e) {
e.printStackTrace();
rc = e.getMessage();
}
return rc;
}
Получение массива типов данных полей селекта
res = stm.executeQuery();
ResultSetMetaData meta = res.getMetaData();
columnCount = meta.getColumnCount();
int [] types = new int[columnCount];
for (int k = 0; k < columnCount; k++) {
types[k] = meta.getColumnType(k+1);
}
Так строим строку вызова:
while (res.next()){
callStr = "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount;
for (int k = 0; k < columnCount; k++) {
callStr = callStr+",";
String value = "";
if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){
value = res.getString(k+1);
if (value == null){
value = "null";
}else{
value = "'"+value+"'";
}
}else if (types[k] == java.sql.Types.NUMERIC){
BigDecimal number = res.getBigDecimal(k+1);
if (number == null){
value = "null";
}else{
value = number.toString();
}
}else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){
Timestamp date = res.getTimestamp(k+1);
if (date == null){
value = "null";
}else{
value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')";
}
}else{
System.out.println(""+types[k]);
}
callStr = callStr + value;
}
callStr = callStr + "); end;";
Накапливаем в массиве или таблице
if (rowCount > CHUNK_LIMIT){
insert.setString(1, callStr);
insert.executeUpdate();
}else{
chunks[i][j] = callStr;
}
А теперь весь класс, который нужно загрузить в базу.
create or replace and compile java source named "ParallelRunner" as
package com.samtrest;
import java.math.BigDecimal;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
public class ParallelRunner {
String selectTxt;
String procedureName;
String tableName = "";
String additional;
int threadCount,processSeq;
String host="127.0.0.1",instance,port="1521",userName,password;
Connection [] connPool;
Connection defaultConn;
ChunkRunner [] chunkRunners;
String [][] chunks;
int CHUNK_LIMIT = 200000;
public ParallelRunner(String selectTxt, String procedureName, int threadCount,String psw) throws SQLException, ClassNotFoundException {
super();
this.selectTxt = selectTxt;
this.procedureName = procedureName;
this.threadCount = threadCount;
this.port = "1521";
this.password=psw;
connPool = new Connection[threadCount];
chunkRunners = new ChunkRunner[threadCount];
}
public static String run_parallel(String selectTxt,
String procedureName,
String threadCount,
String psw) throws NumberFormatException, SQLException, ClassNotFoundException {
String rc = "OK";
ParallelRunner parallelRunner = new ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),psw);
try {
parallelRunner.runProc();
} catch (SQLException e) {
e.printStackTrace();
rc = e.getMessage();
} catch (ClassNotFoundException e) {
e.printStackTrace();
rc = e.getMessage();
}
return rc;
}
public void populateConnectionPool() throws SQLException, ClassNotFoundException{
int siteNumber = 0;
String siteStatus ="T";
ResultSet res;
Class.forName("oracle.jdbc.driver.OracleDriver");
defaultConn = DriverManager.getConnection("jdbc:default:connection:");
PreparedStatement stm = defaultConn.prepareStatement("SELECT SYS_CONTEXT('USERENV','SESSION_USER') from dual");
res = stm.executeQuery();
if (res.next()){
userName = res.getString(1);
}
res.close();
stm = defaultConn.prepareStatement("SELECT SYS_CONTEXT('USERENV','DB_NAME') from dual");
res = stm.executeQuery();
if (res.next()){
instance = res.getString(1);
}
res.close();
for (int i = 0; i < connPool.length; i++) {
connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@"+host+":"+port+":"+instance, userName, password);
//connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@10.28.28.101:1521:orc1", userName, password);
connPool[i].setAutoCommit(false);
stm = connPool[i].prepareStatement("begin dbms_application_info.set_module('Java Parallel; process:'||"+
processSeq+",'"+connPool.length+" threads'); end;");
stm.executeUpdate();
}
stm.close();
}
public void runProc() throws SQLException, ClassNotFoundException{
int rowCount = 0,columnCount, chunkCount,i,j;
String callStr="";
PreparedStatement stm;
Statement info;
PreparedStatement insert = null;
populateConnectionPool();
info = defaultConn.createStatement();
info.executeUpdate("begin dbms_application_info.set_module('Parallel process:'||"+
processSeq+",'"+threadCount+" threads'); end;");
System.out.println(selectTxt);
stm = defaultConn.prepareStatement("select count(*) from ("+selectTxt+")");
ResultSet res = stm.executeQuery();
if ( res.next())
rowCount = res.getInt(1);
res.close();
stm.close();
chunkCount = rowCount/threadCount;
if (chunkCount*threadCount < rowCount){
chunkCount++;
}
i = 0;
j = 0;
// System.out.println("Count of parallel threads: "+connPool.length);
// System.out.println("Count of processing rows: "+rowCount);
// System.out.println("Chunk length: "+chunkCount);
info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+"'); end;");
stm = defaultConn.prepareStatement(selectTxt);
res = stm.executeQuery();
ResultSetMetaData meta = res.getMetaData();
columnCount = meta.getColumnCount();
int [] types = new int[columnCount];
for (int k = 0; k < columnCount; k++) {
types[k] = meta.getColumnType(k+1);
}
if (rowCount > CHUNK_LIMIT){
insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)");
}else{
chunks = new String[threadCount][chunkCount];
}
while (res.next()){
callStr = "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount;
for (int k = 0; k < columnCount; k++) {
callStr = callStr+",";
String value = "";
if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){
value = res.getString(k+1);
if (value == null){
value = "null";
}else{
value = "'"+value+"'";
}
}else if (types[k] == java.sql.Types.NUMERIC){
BigDecimal number = res.getBigDecimal(k+1);
if (number == null){
value = "null";
}else{
value = number.toString();
}
}else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){
Timestamp date = res.getTimestamp(k+1);
if (date == null){
value = "null";
}else{
value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')";
}
}else{
System.out.println(""+types[k]);
}
callStr = callStr + value;
}
callStr = callStr + "); end;";
if (i == 0){
if( j == 0 ){
System.out.println(callStr);
}
}
if (rowCount > CHUNK_LIMIT){
insert.setString(1, callStr);
insert.executeUpdate();
}else{
chunks[i][j] = callStr;
}
j++;
if (j == chunkCount){
connPool[i].commit();
if (rowCount > CHUNK_LIMIT){
chunkRunners[i] = new ChunkRunner(connPool[i],processSeq);
}else{
chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]);
}
chunkRunners[i].start();
i++;
if (i < connPool.length ){
if (rowCount > CHUNK_LIMIT){
insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)");
}
j = 0;
}
}
info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+" threads "+i+","+j+"'); end;");
}
res.close();
stm.close();
info.close();
connPool[i].commit();
if (j < chunkCount){
if (rowCount > CHUNK_LIMIT){
chunkRunners[i] = new ChunkRunner(connPool[i],processSeq);
}else{
chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]);
}
chunkRunners[i].start();
}
for (int k = 0; k < chunkRunners.length; k++) {
if (chunkRunners[k] != null){
try {
chunkRunners[k].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
for (int k = 0; k < chunkRunners.length; k++) {
if (chunkRunners[k] != null){
if (!connPool[k].isClosed()){
connPool[k].close();
}
if (!"".equals(chunkRunners[k].errorMsg)){
throw(new SQLException(chunkRunners[k].errorMsg));
}
}
}
defaultConn.close();
}
public class ChunkRunner extends Thread {
Connection conn;
String errorMsg = "";
String [] chunk;
String internal;
int processSeq;
public ChunkRunner(Connection conn, int process) {
super();
this.conn = conn;
this.processSeq = process;
}
public ChunkRunner(Connection conn, int process,String []chunk) {
super();
this.conn = conn;
this.chunk = chunk;
this.processSeq = process;
}
public ChunkRunner(Connection conn, int process,String inter) {
super();
this.conn = conn;
this.processSeq = process;
this.internal = inter;
}
public void run(){
Statement stm = null;
PreparedStatement select = null;
String stmt="";
try {
stm = conn.createStatement();
if ("".equals(tableName)){
if( chunk == null){
select = conn.prepareStatement("select * from parallel_calls_tmp");
ResultSet res = select.executeQuery();
while (res.next()){
stmt = res.getString(1);
if (stmt != null){
if ( stmt != ""){
stm.executeUpdate(stmt);
}
}
}
}else{
for (int i = 0; i < chunk.length; i++) {
stmt = chunk[i];
if (stmt != null){
if (stmt != ""){
stm.executeUpdate(stmt);
}
}
}
}
stm.close();
}
} catch (SQLException e) {
System.out.println(stmt);
e.printStackTrace();
errorMsg = e.getMessage();
e.printStackTrace();
} finally {
try {
conn.commit();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
/
Пример функции-обработчика
PROCEDURE close_row_process(pi_CurrentInChunk NUMBER,
pi_ChunkLength NUMBER,
pi_activity_folder_seq fr_activity_folders.activity_folder_seq%TYPE,
pi_action_seq fr_folder_actions.action_seq%TYPE,
pi_demand_sum_min NUMBER,
pi_action_code fr_folder_actions.action_code%TYPE,
pi_demand_sum fr_folder_actions.demand_sum%TYPE,
pi_contract_acc_num fr_activity_folders.contract_acc_num%TYPE,
pi_debt_reduction_sum NUMBER,
pi_response_type NUMBER,
pi_section_num NUMBER,
pi_client_id fr_activity_folders.client_id%TYPE,
pi_client_type fr_activity_folders.client_type%TYPE);
Пример использования
dbms_application_info.set_module(module_name => 'close_folders_parallel,Process: ' ||
l_ProcessSeq,
action_name => '');
v_Step := 1;
l_SelectTxt := 'SELECT af.activity_folder_seq,' ||
' fa.action_seq,' ||
' CASE WHEN fa.action_code = 3' THEN' ||
' rs.folder_closing_sum' || ' ELSE' ||
' fac.demand_sum_min' ||
' END demand_sum_min,' || ' fa.action_code,' ||
' fa.demand_sum,' || ' af.contract_acc_num,' ||
' nvl(rs.debt_reduction_sum,0) debt_reduction_sum,' ||
' fab.response_type,rca.section_num,af.client_id,af.client_type' ||
' FROM fr_activity_folders af,' ||
' fr_folder_actions fa,' ||
' rm_contract_acc rca,' ||
' fr_actions_codes fac,' ||
' rm_section rs,' ||
' fr_action_bank_answers fab' ||
' WHERE af.current_action_seq = fa.action_seq' ||
' AND rca.contract_acc_num = af.contract_acc_num' ||
' AND fa.action_code = fac.action_code' ||
' AND rca.section_num = rs.section_num(+)' ||
' AND fa.action_seq= fab.action_seq(+)' ||
' AND af.folder_status = ' ||
fr_check_potential_pkg.c_FOLDER_OPEN_STATUS --||' AND rownum <= 1000'
;
l_ProcName := 'fr_support_pkg.close_row_process';
-- Get amount limits for future actions
------------------------------------------------------
-- populate temporary table with calc balance results in parallel
dbms_application_info.set_action(action_name => 'Parallel Java');
dbms_java.set_output(1000000);
SELECT t.parallel_num
INTO v_ParallelCount
FROM sr_task_codes t
WHERE t.task_seq = 203;
v_Msg := run_pipe_parallel(pi_Select_Txt => l_SelectTxt,
pi_Proc_Name => l_ProcName,
pi_Parallel_Count => v_ParallelCount,
Pi_Password => 'psw');
IF v_Msg <> 'OK' THEN
RAISE ErrException;
END IF;
------------------------------------------------------
В принципе, я сейчас подумал, что сработает не только с Oracle, а с любой базой…
Если кому интересно, могу рассказать, что я добавил для того, чтобы работать не с симуляцией pipelined функции, а с выполнением отдельных batches…
Могу сказать, что я в результате получил выигрыш во времени: 12 часов в одной сессии против часа с половиной в 25 сессиях. При этом все 16 процессоров сервера были нагружены под 100%.