public <S> void execute(InputLoader<S> inputLoader, InputConverter<S> converter) {
long count = 0;
+ Connection readConnection = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ Connection writeConnection = null;
+ PreparedStatement writeStatement = null;
try {
- Connection readConnection = db.getDataSource().getConnection();
- Statement stmt = null;
- ResultSet rs = null;
- Connection writeConnection = db.getDataSource().getConnection();
- PreparedStatement writeStatement = null;
- try {
- readConnection.setAutoCommit(false);
- writeConnection.setAutoCommit(false);
- writeStatement = writeConnection.prepareStatement(converter.updateSql());
- stmt = readConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ writeConnection = db.getDataSource().getConnection();
+ writeConnection.setAutoCommit(false);
+ writeStatement = writeConnection.prepareStatement(converter.updateSql());
+
+ readConnection = db.getDataSource().getConnection();
+ readConnection.setAutoCommit(false);
+
+ stmt = readConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(GROUP_SIZE);
+ if (db.getDialect().getId().equals(MySql.ID)) {
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ } else {
stmt.setFetchSize(GROUP_SIZE);
- if (db.getDialect().getId().equals(MySql.ID)) {
- stmt.setFetchSize(Integer.MIN_VALUE);
- } else {
- stmt.setFetchSize(GROUP_SIZE);
- }
- rs = stmt.executeQuery(inputLoader.selectSql());
-
- int cursor = 0;
- while (rs.next()) {
- if (converter.convert(inputLoader.load(rs), writeStatement)) {
- writeStatement.addBatch();
- cursor++;
- count++;
- }
-
- if (cursor == GROUP_SIZE) {
- writeStatement.executeBatch();
- writeConnection.commit();
- cursor = 0;
- }
+ }
+ rs = stmt.executeQuery(inputLoader.selectSql());
+
+ int cursor = 0;
+ while (rs.next()) {
+ if (converter.convert(inputLoader.load(rs), writeStatement)) {
+ writeStatement.addBatch();
+ cursor++;
+ count++;
}
- if (cursor > 0) {
+
+ if (cursor == GROUP_SIZE) {
writeStatement.executeBatch();
writeConnection.commit();
+ cursor = 0;
}
- } finally {
- DbUtils.closeQuietly(writeStatement);
- DbUtils.closeQuietly(writeConnection);
- DbUtils.closeQuietly(readConnection, stmt, rs);
-
- LOGGER.info("{} rows have been updated", count);
}
+ if (cursor > 0) {
+ writeStatement.executeBatch();
+ writeConnection.commit();
+ }
+
} catch (SQLException e) {
LOGGER.error(FAILURE_MESSAGE, e);
SqlUtil.log(LOGGER, e);
} catch (Exception e) {
LOGGER.error(FAILURE_MESSAGE, e);
throw MessageException.of(FAILURE_MESSAGE);
+
+ } finally {
+ DbUtils.closeQuietly(writeStatement);
+ DbUtils.closeQuietly(writeConnection);
+ DbUtils.closeQuietly(readConnection, stmt, rs);
+
+ LOGGER.info("{} rows have been updated", count);
}
}