Use ExecuteStatement
with an Amazon SDK
The following code example shows how to use ExecuteStatement
.
Action examples are code excerpts from larger programs and must be run in context. You can see this action in context in the following code example:
- Java
-
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository
. Executes a SQL statement to create a database table.
/** * Creates an asynchronous task to execute a SQL statement for creating a new table. * * @param clusterId the identifier of the Amazon Redshift cluster * @param databaseName the name of the database to create the table in * @param userName the username to use for the database connection * @return a {@link CompletableFuture} that completes with the result of the SQL statement execution * @throws RuntimeException if there is an error creating the table */ public CompletableFuture<ExecuteStatementResponse> createTableAsync(String clusterId, String databaseName, String userName) { ExecuteStatementRequest createTableRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .dbUser(userName) .database(databaseName) .sql("CREATE TABLE Movies (" + "id INT PRIMARY KEY, " + "title VARCHAR(100), " + "year INT)") .build(); return getAsyncDataClient().executeStatement(createTableRequest) .whenComplete((response, exception) -> { if (exception != null) { throw new RuntimeException("Error creating table: " + exception.getMessage(), exception); } else { logger.info("Table created: Movies"); } }); }
Executes a SQL statement to insert data into a database table.
/** * Asynchronously pops a table from a JSON file. * * @param clusterId the ID of the cluster * @param databaseName the name of the database * @param userName the username * @param fileName the name of the JSON file * @param number the number of records to process * @return a CompletableFuture that completes with the number of records added to the Movies table */ public CompletableFuture<Integer> popTableAsync(String clusterId, String databaseName, String userName, String fileName, int number) { return CompletableFuture.supplyAsync(() -> { try { JsonParser parser = new JsonFactory().createParser(new File(fileName)); JsonNode rootNode = new ObjectMapper().readTree(parser); Iterator<JsonNode> iter = rootNode.iterator(); return iter; } catch (IOException e) { throw new RuntimeException("Failed to read or parse JSON file: " + e.getMessage(), e); } }).thenCompose(iter -> processNodesAsync(clusterId, databaseName, userName, iter, number)) .whenComplete((result, exception) -> { if (exception != null) { logger.info("Error {} ", exception.getMessage()); } else { logger.info("{} records were added to the Movies table." , result); } }); } private CompletableFuture<Integer> processNodesAsync(String clusterId, String databaseName, String userName, Iterator<JsonNode> iter, int number) { return CompletableFuture.supplyAsync(() -> { int t = 0; try { while (iter.hasNext()) { if (t == number) break; JsonNode currentNode = iter.next(); int year = currentNode.get("year").asInt(); String title = currentNode.get("title").asText(); // Use SqlParameter to avoid SQL injection. List<SqlParameter> parameterList = new ArrayList<>(); String sqlStatement = "INSERT INTO Movies VALUES( :id , :title, :year);"; SqlParameter idParam = SqlParameter.builder() .name("id") .value(String.valueOf(t)) .build(); SqlParameter titleParam = SqlParameter.builder() .name("title") .value(title) .build(); SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); parameterList.add(idParam); parameterList.add(titleParam); parameterList.add(yearParam); ExecuteStatementRequest insertStatementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .sql(sqlStatement) .database(databaseName) .dbUser(userName) .parameters(parameterList) .build(); getAsyncDataClient().executeStatement(insertStatementRequest); logger.info("Inserted: " + title + " (" + year + ")"); t++; } } catch (RedshiftDataException e) { throw new RuntimeException("Error inserting data: " + e.getMessage(), e); } return t; }); }
Executes a SQL statement to query a database table.
/** * Asynchronously queries movies by a given year from a Redshift database. * * @param database the name of the database to query * @param dbUser the user to connect to the database with * @param year the year to filter the movies by * @param clusterId the identifier of the Redshift cluster to connect to * @return a {@link CompletableFuture} containing the response ID of the executed SQL statement */ public CompletableFuture<String> queryMoviesByYearAsync(String database, String dbUser, int year, String clusterId) { String sqlStatement = "SELECT * FROM Movies WHERE year = :year"; SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .database(database) .dbUser(dbUser) .parameters(yearParam) .sql(sqlStatement) .build(); return CompletableFuture.supplyAsync(() -> { try { ExecuteStatementResponse response = getAsyncDataClient().executeStatement(statementRequest).join(); // Use join() to wait for the result return response.id(); } catch (RedshiftDataException e) { throw new RuntimeException("Error executing statement: " + e.getMessage(), e); } }).exceptionally(exception -> { logger.info("Error: {}", exception.getMessage()); return ""; }); }
-
For API details, see ExecuteStatement in Amazon SDK for Java 2.x API Reference.
-
For a complete list of Amazon SDK developer guides and code examples, see Using this service with an Amazon SDK. This topic also includes information about getting started and details about previous SDK versions.