001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.dbutils; 018 019import java.sql.Connection; 020import java.sql.PreparedStatement; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Future; 026 027import javax.sql.DataSource; 028 029/** 030 * Executes SQL queries with pluggable strategies for handling 031 * {@code ResultSet}s. This class is thread safe. 032 * 033 * @see ResultSetHandler 034 * @since 1.4 035 */ 036public class AsyncQueryRunner extends AbstractQueryRunner { 037 038 private final ExecutorService executorService; 039 private final QueryRunner queryRunner; 040 041 /** 042 * Constructor for AsyncQueryRunner which uses a provided ExecutorService and underlying QueryRunner. 043 * 044 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 045 * @param queryRunner the {@code QueryRunner} instance to use for the queries. 046 * @since DbUtils 1.5 047 */ 048 public AsyncQueryRunner(final ExecutorService executorService, final QueryRunner queryRunner) { 049 this.executorService = executorService; 050 this.queryRunner = queryRunner; 051 } 052 053 /** 054 * Constructor for AsyncQueryRunner. 055 * 056 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 057 */ 058 public AsyncQueryRunner(final ExecutorService executorService) { 059 this(null, false, executorService); 060 } 061 062 /** 063 * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead. 064 * Constructor for AsyncQueryRunner that controls the use of {@code ParameterMetaData}. 065 * 066 * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) }; 067 * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it, 068 * and if it breaks, we'll remember not to use it again. 069 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 070 */ 071 @Deprecated 072 public AsyncQueryRunner(final boolean pmdKnownBroken, final ExecutorService executorService) { 073 this(null, pmdKnownBroken, executorService); 074 } 075 076 /** 077 * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead. 078 * Constructor for AsyncQueryRunner that takes a {@code DataSource}. 079 * 080 * Methods that do not take a {@code Connection} parameter will retrieve connections from this 081 * {@code DataSource}. 082 * 083 * @param ds The {@code DataSource} to retrieve connections from. 084 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 085 */ 086 @Deprecated 087 public AsyncQueryRunner(final DataSource ds, final ExecutorService executorService) { 088 this(ds, false, executorService); 089 } 090 091 /** 092 * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead. 093 * Constructor for AsyncQueryRunner that take a {@code DataSource} and controls the use of {@code ParameterMetaData}. 094 * Methods that do not take a {@code Connection} parameter will retrieve connections from this 095 * {@code DataSource}. 096 * 097 * @param ds The {@code DataSource} to retrieve connections from. 098 * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) }; 099 * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it, 100 * and if it breaks, we'll remember not to use it again. 101 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 102 */ 103 @Deprecated 104 public AsyncQueryRunner(final DataSource ds, final boolean pmdKnownBroken, final ExecutorService executorService) { 105 super(ds, pmdKnownBroken); 106 this.executorService = executorService; 107 this.queryRunner = new QueryRunner(ds, pmdKnownBroken); 108 } 109 110 /** 111 * @deprecated No longer used by this class. Will be removed in a future version. 112 * Class that encapsulates the continuation for batch calls. 113 */ 114 @Deprecated 115 protected class BatchCallableStatement implements Callable<int[]> { 116 private final String sql; 117 private final Object[][] params; 118 private final Connection conn; 119 private final boolean closeConn; 120 private final PreparedStatement ps; 121 122 /** 123 * Creates a new BatchCallableStatement instance. 124 * 125 * @param sql The SQL statement to execute. 126 * @param params An array of query replacement parameters. Each row in 127 * this array is one set of batch replacement values. 128 * @param conn The connection to use for the batch call. 129 * @param closeConn True if the connection should be closed, false otherwise. 130 * @param ps The {@link PreparedStatement} to be executed. 131 */ 132 public BatchCallableStatement(final String sql, final Object[][] params, final Connection conn, final boolean closeConn, final PreparedStatement ps) { 133 this.sql = sql; 134 this.params = params.clone(); 135 this.conn = conn; 136 this.closeConn = closeConn; 137 this.ps = ps; 138 } 139 140 /** 141 * The actual call to executeBatch. 142 * 143 * @return an array of update counts containing one element for each command in the batch. 144 * @throws SQLException if a database access error occurs or one of the commands sent to the database fails. 145 * @see PreparedStatement#executeBatch() 146 */ 147 @Override 148 public int[] call() throws SQLException { 149 int[] ret = null; 150 151 try { 152 ret = ps.executeBatch(); 153 } catch (final SQLException e) { 154 rethrow(e, sql, (Object[])params); 155 } finally { 156 close(ps); 157 if (closeConn) { 158 close(conn); 159 } 160 } 161 162 return ret; 163 } 164 } 165 166 /** 167 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. 168 * 169 * @param conn The {@code Connection} to use to run the query. The caller is 170 * responsible for closing this Connection. 171 * @param sql The SQL to execute. 172 * @param params An array of query replacement parameters. Each row in 173 * this array is one set of batch replacement values. 174 * @return A {@code Future} which returns the number of rows updated per statement. 175 * @throws SQLException if a database access error occurs 176 */ 177 public Future<int[]> batch(final Connection conn, final String sql, final Object[][] params) throws SQLException { 178 return executorService.submit(new Callable<int[]>() { 179 180 @Override 181 public int[] call() throws Exception { 182 return queryRunner.batch(conn, sql, params); 183 } 184 185 }); 186 } 187 188 /** 189 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. The 190 * {@code Connection} is retrieved from the {@code DataSource} 191 * set in the constructor. This {@code Connection} must be in 192 * auto-commit mode or the update will not be saved. 193 * 194 * @param sql The SQL to execute. 195 * @param params An array of query replacement parameters. Each row in 196 * this array is one set of batch replacement values. 197 * @return A {@code Future} which returns the number of rows updated per statement. 198 * @throws SQLException if a database access error occurs 199 */ 200 public Future<int[]> batch(final String sql, final Object[][] params) throws SQLException { 201 return executorService.submit(new Callable<int[]>() { 202 203 @Override 204 public int[] call() throws Exception { 205 return queryRunner.batch(sql, params); 206 } 207 208 }); 209 } 210 211 /** 212 * Class that encapsulates the continuation for query calls. 213 * @param <T> The type of the result from the call to handle. 214 */ 215 protected class QueryCallableStatement<T> implements Callable<T> { 216 private final String sql; 217 private final Object[] params; 218 private final Connection conn; 219 private final boolean closeConn; 220 private final PreparedStatement ps; 221 private final ResultSetHandler<T> rsh; 222 223 /** 224 * Creates a new {@code QueryCallableStatement} instance. 225 * 226 * @param conn The connection to use for the batch call. 227 * @param closeConn True if the connection should be closed, false otherwise. 228 * @param ps The {@link PreparedStatement} to be executed. 229 * @param rsh The handler that converts the results into an object. 230 * @param sql The SQL statement to execute. 231 * @param params An array of query replacement parameters. Each row in 232 * this array is one set of batch replacement values. 233 */ 234 public QueryCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, 235 final ResultSetHandler<T> rsh, final String sql, final Object... params) { 236 this.sql = sql; 237 this.params = params; 238 this.conn = conn; 239 this.closeConn = closeConn; 240 this.ps = ps; 241 this.rsh = rsh; 242 } 243 244 /** 245 * The actual call to {@code handle()} method. 246 * 247 * @return an array of update counts containing one element for each command in the batch. 248 * @throws SQLException if a database access error occurs. 249 * @see ResultSetHandler#handle(ResultSet) 250 */ 251 @Override 252 public T call() throws SQLException { 253 ResultSet rs = null; 254 T ret = null; 255 256 try { 257 rs = wrap(ps.executeQuery()); 258 ret = rsh.handle(rs); 259 } catch (final SQLException e) { 260 rethrow(e, sql, params); 261 } finally { 262 try { 263 close(rs); 264 } finally { 265 close(ps); 266 if (closeConn) { 267 close(conn); 268 } 269 } 270 } 271 272 return ret; 273 } 274 275 } 276 277 /** 278 * Execute an SQL SELECT query with replacement parameters. The 279 * caller is responsible for closing the connection. 280 * @param <T> The type of object that the handler returns 281 * @param conn The connection to execute the query in. 282 * @param sql The query to execute. 283 * @param rsh The handler that converts the results into an object. 284 * @param params The replacement parameters. 285 * @return A {@code Future} which returns the result of the query call. 286 * @throws SQLException if a database access error occurs 287 */ 288 public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) 289 throws SQLException { 290 return executorService.submit(new Callable<T>() { 291 292 @Override 293 public T call() throws Exception { 294 return queryRunner.query(conn, sql, rsh, params); 295 } 296 297 }); 298 } 299 300 /** 301 * Execute an SQL SELECT query without any replacement parameters. The 302 * caller is responsible for closing the connection. 303 * @param <T> The type of object that the handler returns 304 * @param conn The connection to execute the query in. 305 * @param sql The query to execute. 306 * @param rsh The handler that converts the results into an object. 307 * @return A {@code Future} which returns the result of the query call. 308 * @throws SQLException if a database access error occurs 309 */ 310 public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException { 311 return executorService.submit(new Callable<T>() { 312 313 @Override 314 public T call() throws Exception { 315 return queryRunner.query(conn, sql, rsh); 316 } 317 318 }); 319 } 320 321 /** 322 * Executes the given SELECT SQL query and returns a result object. 323 * The {@code Connection} is retrieved from the 324 * {@code DataSource} set in the constructor. 325 * @param <T> The type of object that the handler returns 326 * @param sql The SQL statement to execute. 327 * @param rsh The handler used to create the result object from 328 * the {@code ResultSet}. 329 * @param params Initialize the PreparedStatement's IN parameters with 330 * this array. 331 * @return A {@code Future} which returns the result of the query call. 332 * @throws SQLException if a database access error occurs 333 */ 334 public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException { 335 return executorService.submit(new Callable<T>() { 336 337 @Override 338 public T call() throws Exception { 339 return queryRunner.query(sql, rsh, params); 340 } 341 342 }); 343 } 344 345 /** 346 * Executes the given SELECT SQL without any replacement parameters. 347 * The {@code Connection} is retrieved from the 348 * {@code DataSource} set in the constructor. 349 * @param <T> The type of object that the handler returns 350 * @param sql The SQL statement to execute. 351 * @param rsh The handler used to create the result object from 352 * the {@code ResultSet}. 353 * 354 * @return A {@code Future} which returns the result of the query call. 355 * @throws SQLException if a database access error occurs 356 */ 357 public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh) throws SQLException { 358 return executorService.submit(new Callable<T>() { 359 360 @Override 361 public T call() throws Exception { 362 return queryRunner.query(sql, rsh); 363 } 364 365 }); 366 } 367 368 /** 369 * @deprecated No longer used by this class. Will be removed in a future version. 370 * Class that encapsulates the continuation for update calls. 371 */ 372 @Deprecated 373 protected class UpdateCallableStatement implements Callable<Integer> { 374 private final String sql; 375 private final Object[] params; 376 private final Connection conn; 377 private final boolean closeConn; 378 private final PreparedStatement ps; 379 380 /** 381 * 382 * 383 * @param conn The connection to use for the batch call. 384 * @param closeConn True if the connection should be closed, false otherwise. 385 * @param ps The {@link PreparedStatement} to be executed. 386 * @param sql The SQL statement to execute. 387 * @param params An array of query replacement parameters. Each row in 388 * this array is one set of batch replacement values. 389 */ 390 public UpdateCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, final String sql, final Object... params) { 391 this.sql = sql; 392 this.params = params; 393 this.conn = conn; 394 this.closeConn = closeConn; 395 this.ps = ps; 396 } 397 398 /** 399 * The actual call to {@code executeUpdate()} method. 400 * 401 * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or 402 * (2) 0 for SQL statements that return nothing 403 * @throws SQLException if a database access error occurs. 404 * @see PreparedStatement#executeUpdate() 405 */ 406 @Override 407 public Integer call() throws SQLException { 408 int rows = 0; 409 410 try { 411 rows = ps.executeUpdate(); 412 } catch (final SQLException e) { 413 rethrow(e, sql, params); 414 } finally { 415 close(ps); 416 if (closeConn) { 417 close(conn); 418 } 419 } 420 421 return Integer.valueOf(rows); 422 } 423 424 } 425 426 /** 427 * Execute an SQL INSERT, UPDATE, or DELETE query without replacement 428 * parameters. 429 * 430 * @param conn The connection to use to run the query. 431 * @param sql The SQL to execute. 432 * @return A {@code Future} which returns the number of rows updated. 433 * @throws SQLException if a database access error occurs 434 */ 435 public Future<Integer> update(final Connection conn, final String sql) throws SQLException { 436 return executorService.submit(new Callable<Integer>() { 437 438 @Override 439 public Integer call() throws Exception { 440 return Integer.valueOf(queryRunner.update(conn, sql)); 441 } 442 443 }); 444 } 445 446 /** 447 * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement 448 * parameter. 449 * 450 * @param conn The connection to use to run the query. 451 * @param sql The SQL to execute. 452 * @param param The replacement parameter. 453 * @return A {@code Future} which returns the number of rows updated. 454 * @throws SQLException if a database access error occurs 455 */ 456 public Future<Integer> update(final Connection conn, final String sql, final Object param) throws SQLException { 457 return executorService.submit(new Callable<Integer>() { 458 459 @Override 460 public Integer call() throws Exception { 461 return Integer.valueOf(queryRunner.update(conn, sql, param)); 462 } 463 464 }); 465 } 466 467 /** 468 * Execute an SQL INSERT, UPDATE, or DELETE query. 469 * 470 * @param conn The connection to use to run the query. 471 * @param sql The SQL to execute. 472 * @param params The query replacement parameters. 473 * @return A {@code Future} which returns the number of rows updated. 474 * @throws SQLException if a database access error occurs 475 */ 476 public Future<Integer> update(final Connection conn, final String sql, final Object... params) throws SQLException { 477 return executorService.submit(new Callable<Integer>() { 478 479 @Override 480 public Integer call() throws Exception { 481 return Integer.valueOf(queryRunner.update(conn, sql, params)); 482 } 483 484 }); 485 } 486 487 /** 488 * Executes the given INSERT, UPDATE, or DELETE SQL statement without 489 * any replacement parameters. The {@code Connection} is retrieved 490 * from the {@code DataSource} set in the constructor. This 491 * {@code Connection} must be in auto-commit mode or the update will 492 * not be saved. 493 * 494 * @param sql The SQL statement to execute. 495 * @throws SQLException if a database access error occurs 496 * @return A {@code Future} which returns the number of rows updated. 497 */ 498 public Future<Integer> update(final String sql) throws SQLException { 499 return executorService.submit(new Callable<Integer>() { 500 501 @Override 502 public Integer call() throws Exception { 503 return Integer.valueOf(queryRunner.update(sql)); 504 } 505 506 }); 507 } 508 509 /** 510 * Executes the given INSERT, UPDATE, or DELETE SQL statement with 511 * a single replacement parameter. The {@code Connection} is 512 * retrieved from the {@code DataSource} set in the constructor. 513 * This {@code Connection} must be in auto-commit mode or the 514 * update will not be saved. 515 * 516 * @param sql The SQL statement to execute. 517 * @param param The replacement parameter. 518 * @throws SQLException if a database access error occurs 519 * @return A {@code Future} which returns the number of rows updated. 520 */ 521 public Future<Integer> update(final String sql, final Object param) throws SQLException { 522 return executorService.submit(new Callable<Integer>() { 523 524 @Override 525 public Integer call() throws Exception { 526 return Integer.valueOf(queryRunner.update(sql, param)); 527 } 528 529 }); 530 } 531 532 /** 533 * Executes the given INSERT, UPDATE, or DELETE SQL statement. The 534 * {@code Connection} is retrieved from the {@code DataSource} 535 * set in the constructor. This {@code Connection} must be in 536 * auto-commit mode or the update will not be saved. 537 * 538 * @param sql The SQL statement to execute. 539 * @param params Initializes the PreparedStatement's IN (i.e. '?') 540 * parameters. 541 * @throws SQLException if a database access error occurs 542 * @return A {@code Future} which returns the number of rows updated. 543 */ 544 public Future<Integer> update(final String sql, final Object... params) throws SQLException { 545 return executorService.submit(new Callable<Integer>() { 546 547 @Override 548 public Integer call() throws Exception { 549 return Integer.valueOf(queryRunner.update(sql, params)); 550 } 551 552 }); 553 } 554 555 /** 556 * Executes {@link QueryRunner#insert(String, ResultSetHandler)} asynchronously. 557 * 558 * @param <T> Return type expected 559 * @param sql SQL insert statement to execute 560 * @param rsh {@link ResultSetHandler} for handling the results 561 * @return {@link Future} that executes a query runner insert 562 * @see QueryRunner#insert(String, ResultSetHandler) 563 * @throws SQLException if a database access error occurs 564 * @since 1.6 565 */ 566 public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh) throws SQLException { 567 return executorService.submit(new Callable<T>() { 568 569 @Override 570 public T call() throws Exception { 571 return queryRunner.insert(sql, rsh); 572 } 573 574 }); 575 } 576 577 /** 578 * Executes {@link QueryRunner#insert(String, ResultSetHandler, Object...)} asynchronously. 579 * 580 * @param <T> Return type expected 581 * @param sql SQL insert statement to execute 582 * @param rsh {@link ResultSetHandler} for handling the results 583 * @param params Parameter values for substitution in the SQL statement 584 * @return {@link Future} that executes a query runner insert 585 * @see QueryRunner#insert(String, ResultSetHandler, Object...) 586 * @throws SQLException if a database access error occurs 587 * @since 1.6 588 */ 589 public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException { 590 return executorService.submit(new Callable<T>() { 591 592 @Override 593 public T call() throws Exception { 594 return queryRunner.insert(sql, rsh, params); 595 } 596 }); 597 } 598 599 /** 600 * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler)} asynchronously. 601 * 602 * @param <T> Return type expected 603 * @param conn {@link Connection} to use to execute the SQL statement 604 * @param sql SQL insert statement to execute 605 * @param rsh {@link ResultSetHandler} for handling the results 606 * @return {@link Future} that executes a query runner insert 607 * @see QueryRunner#insert(Connection, String, ResultSetHandler) 608 * @throws SQLException if a database access error occurs 609 * @since 1.6 610 */ 611 public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException { 612 return executorService.submit(new Callable<T>() { 613 614 @Override 615 public T call() throws Exception { 616 return queryRunner.insert(conn, sql, rsh); 617 } 618 }); 619 } 620 621 /** 622 * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler, Object...)} asynchronously. 623 * 624 * @param <T> Return type expected 625 * @param conn {@link Connection} to use to execute the SQL statement 626 * @param sql SQL insert statement to execute 627 * @param rsh {@link ResultSetHandler} for handling the results 628 * @param params Parameter values for substitution in the SQL statement 629 * @return {@link Future} that executes a query runner insert 630 * @see QueryRunner#insert(Connection, String, ResultSetHandler, Object...) 631 * @throws SQLException if a database access error occurs 632 * @since 1.6 633 */ 634 public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException { 635 return executorService.submit(new Callable<T>() { 636 637 @Override 638 public T call() throws Exception { 639 return queryRunner.insert(conn, sql, rsh, params); 640 } 641 }); 642 } 643 644 /** 645 * {@link QueryRunner#insertBatch(String, ResultSetHandler, Object[][])} asynchronously. 646 * 647 * @param <T> Return type expected 648 * @param sql SQL insert statement to execute 649 * @param rsh {@link ResultSetHandler} for handling the results 650 * @param params An array of query replacement parameters. Each row in 651 * this array is one set of batch replacement values. 652 * @return {@link Future} that executes a query runner batch insert 653 * @see QueryRunner#insertBatch(String, ResultSetHandler, Object[][]) 654 * @throws SQLException if a database access error occurs 655 * @since 1.6 656 */ 657 public <T> Future<T> insertBatch(final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException { 658 return executorService.submit(new Callable<T>() { 659 660 @Override 661 public T call() throws Exception { 662 return queryRunner.insertBatch(sql, rsh, params); 663 } 664 }); 665 } 666 667 /** 668 * {@link QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])} asynchronously. 669 * 670 * @param <T> Return type expected 671 * @param conn {@link Connection} to use to execute the SQL statement 672 * @param sql SQL insert statement to execute 673 * @param rsh {@link ResultSetHandler} for handling the results 674 * @param params An array of query replacement parameters. Each row in 675 * this array is one set of batch replacement values. 676 * @return {@link Future} that executes a query runner batch insert 677 * @see QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][]) 678 * @throws SQLException if a database access error occurs 679 * @since 1.6 680 */ 681 public <T> Future<T> insertBatch(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException { 682 return executorService.submit(new Callable<T>() { 683 684 @Override 685 public T call() throws Exception { 686 return queryRunner.insertBatch(conn, sql, rsh, params); 687 } 688 }); 689 } 690 691}