开始使用 Amazon Redshift 表、项目和查询 - Amazon Redshift
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Amazon Redshift 表、项目和查询

以下代码示例展示了如何使用 Amazon Redshift 表、项目和查询。

Java
SDK for Java 2.x
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 Amazon 代码示例存储库中进行设置和运行。

import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.redshift.RedshiftClient; import software.amazon.awssdk.services.redshift.model.Cluster; import software.amazon.awssdk.services.redshift.model.CreateClusterRequest; import software.amazon.awssdk.services.redshift.model.CreateClusterResponse; import software.amazon.awssdk.services.redshift.model.DeleteClusterRequest; import software.amazon.awssdk.services.redshift.model.DeleteClusterResponse; import software.amazon.awssdk.services.redshift.model.DescribeClustersRequest; import software.amazon.awssdk.services.redshift.model.DescribeClustersResponse; import software.amazon.awssdk.services.redshift.model.ModifyClusterRequest; import software.amazon.awssdk.services.redshift.model.ModifyClusterResponse; import software.amazon.awssdk.services.redshift.model.RedshiftException; import software.amazon.awssdk.services.redshiftdata.RedshiftDataClient; import software.amazon.awssdk.services.redshiftdata.model.DescribeStatementRequest; import software.amazon.awssdk.services.redshiftdata.model.DescribeStatementResponse; import software.amazon.awssdk.services.redshiftdata.model.ExecuteStatementRequest; import software.amazon.awssdk.services.redshiftdata.model.ExecuteStatementResponse; import software.amazon.awssdk.services.redshiftdata.model.Field; import software.amazon.awssdk.services.redshiftdata.model.GetStatementResultRequest; import software.amazon.awssdk.services.redshiftdata.model.GetStatementResultResponse; import software.amazon.awssdk.services.redshiftdata.model.ListDatabasesRequest; import software.amazon.awssdk.services.redshiftdata.model.RedshiftDataException; import software.amazon.awssdk.services.redshiftdata.model.SqlParameter; import software.amazon.awssdk.services.redshiftdata.paginators.ListDatabasesIterable; import com.fasterxml.jackson.core.JsonParser; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Scanner; import java.util.concurrent.TimeUnit; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * This Java example performs these tasks: * * 1. Prompts the user for a unique cluster ID or use the default value. * 2. Creates a Redshift cluster with the specified or default cluster Id value. * 3. Waits until the Redshift cluster is available for use. * 4. Lists all databases using a pagination API call. * 5. Creates a table named "Movies" with fields ID, title, and year. * 6. Inserts a specified number of records into the "Movies" table by reading the Movies JSON file. * 7. Prompts the user for a movie release year. * 8. Runs a SQL query to retrieve movies released in the specified year. * 9. Modifies the Redshift cluster. * 10. Prompts the user for confirmation to delete the Redshift cluster. * 11. If confirmed, deletes the specified Redshift cluster. */ public class RedshiftScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) throws Exception { final String usage = """ Usage: <jsonFilePath>\s Where: jsonFilePath - The path to the Movies JSON file (you can locate that file in ../../../resources/sample_files/movies.json) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String jsonFilePath = args[0]; String userName; String userPassword; String databaseName = "dev" ; Scanner scanner = new Scanner(System.in); Region region = Region.US_EAST_1; RedshiftClient redshiftClient = RedshiftClient.builder() .region(region) .build(); RedshiftDataClient redshiftDataClient = RedshiftDataClient.builder() .region(region) .build(); System.out.println(DASHES); System.out.println("Welcome to the Amazon Redshift SDK Getting Started scenario."); System.out.println(""" This Java program demonstrates how to interact with Amazon Redshift by using the AWS SDK for Java (v2).\s Amazon Redshift is a fully managed, petabyte-scale data warehouse service hosted in the cloud. The program's primary functionalities include cluster creation, verification of cluster readiness,\s list databases, table creation, data population within the table, and execution of SQL statements. Furthermore, it demonstrates the process of querying data from the Movie table.\s Upon completion of the program, all AWS resources are cleaned up. """); System.out.println("Lets get started..."); System.out.println("Please enter your user name (default is awsuser)"); String user = scanner.nextLine(); userName = user.isEmpty() ? "awsuser" : user; System.out.println(DASHES); System.out.println("Please enter your user password (default is AwsUser1000)"); String userpass = scanner.nextLine(); userPassword = userpass.isEmpty() ? "AwsUser1000" : userpass; System.out.println(DASHES); System.out.println(DASHES); System.out.println("A Redshift cluster refers to the collection of computing resources and storage that work together to process and analyze large volumes of data."); System.out.println("Enter a cluster id value (default is redshift-cluster-movies): "); String userClusterId = scanner.nextLine(); String clusterId = userClusterId.isEmpty() ? "redshift-cluster-movies" : userClusterId; createCluster(redshiftClient, clusterId, userName, userPassword); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Wait until "+clusterId +" is available."); System.out.print("Press Enter to continue..."); scanner.nextLine(); waitForClusterReady(redshiftClient, clusterId); System.out.println(DASHES); System.out.println(DASHES); String databaseInfo = """ When you created $clusteridD, the dev database is created by default and used in this scenario.\s To create a custom database, you need to have a CREATEDB privilege.\s For more information, see the documentation here: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_DATABASE.html. """.replace("$clusteridD", clusterId); System.out.println(databaseInfo); System.out.print("Press Enter to continue..."); scanner.nextLine(); System.out.println(DASHES); System.out.println(DASHES); System.out.println("List databases in "+clusterId); System.out.print("Press Enter to continue..."); scanner.nextLine(); listAllDatabases(redshiftDataClient, clusterId, userName, databaseName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Now you will create a table named Movies."); System.out.print("Press Enter to continue..."); scanner.nextLine(); createTable(redshiftDataClient, clusterId, databaseName, userName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Populate the Movies table using the Movies.json file."); System.out.println("Specify the number of records you would like to add to the Movies Table."); System.out.println("Please enter a value between 50 and 200."); int numRecords; do { System.out.print("Enter a value: "); while (!scanner.hasNextInt()) { System.out.println("Invalid input. Please enter a value between 50 and 200."); System.out.print("Enter a year: "); scanner.next(); } numRecords = scanner.nextInt(); } while (numRecords < 50 || numRecords > 200); popTable(redshiftDataClient, clusterId, databaseName, userName, jsonFilePath, numRecords); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Query the Movies table by year. Enter a value between 2012-2014."); int movieYear; do { System.out.print("Enter a year: "); while (!scanner.hasNextInt()) { System.out.println("Invalid input. Please enter a valid year between 2012 and 2014."); System.out.print("Enter a year: "); scanner.next(); } movieYear = scanner.nextInt(); scanner.nextLine(); } while (movieYear < 2012 || movieYear > 2014); String id = queryMoviesByYear(redshiftDataClient, databaseName, userName, movieYear, clusterId); System.out.println("The identifier of the statement is " + id); checkStatement(redshiftDataClient, id); getResults(redshiftDataClient, id); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Now you will modify the Redshift cluster."); System.out.print("Press Enter to continue..."); scanner.nextLine(); modifyCluster(redshiftClient, clusterId); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Would you like to delete the Amazon Redshift cluster? (y/n)"); String delAns = scanner.nextLine().trim(); if (delAns.equalsIgnoreCase("y")) { System.out.println("You selected to delete " +clusterId); System.out.print("Press Enter to continue..."); scanner.nextLine(); deleteRedshiftCluster(redshiftClient, clusterId); } else { System.out.println("The "+clusterId +" was not deleted"); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("This concludes the Amazon Redshift SDK Getting Started scenario."); System.out.println(DASHES); } public static void listAllDatabases(RedshiftDataClient redshiftDataClient, String clusterId, String dbUser, String database) { try { ListDatabasesRequest databasesRequest = ListDatabasesRequest.builder() .clusterIdentifier(clusterId) .dbUser(dbUser) .database(database) .build(); ListDatabasesIterable listDatabasesIterable = redshiftDataClient.listDatabasesPaginator(databasesRequest); listDatabasesIterable.stream() .flatMap(r -> r.databases().stream()) .forEach(db -> System.out .println("The database name is : " + db)); } catch (RedshiftDataException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void deleteRedshiftCluster(RedshiftClient redshiftClient, String clusterId) { try { DeleteClusterRequest deleteClusterRequest = DeleteClusterRequest.builder() .clusterIdentifier(clusterId) .skipFinalClusterSnapshot(true) .build(); DeleteClusterResponse response = redshiftClient.deleteCluster(deleteClusterRequest); System.out.println("The status is " + response.cluster().clusterStatus()); } catch (RedshiftException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void popTable(RedshiftDataClient redshiftDataClient, String clusterId, String databaseName, String userName, String fileName, int number) throws IOException { JsonParser parser = new JsonFactory().createParser(new File(fileName)); com.fasterxml.jackson.databind.JsonNode rootNode = new ObjectMapper().readTree(parser); Iterator<JsonNode> iter = rootNode.iterator(); ObjectNode currentNode; int t = 0; while (iter.hasNext()) { if (t == number) break; currentNode = (ObjectNode) iter.next(); int year = currentNode.get("year").asInt(); String title = currentNode.get("title").asText(); // Use SqlParameter to avoid SQL injection. List<SqlParameter> parameterList = new ArrayList<>(); String sqlStatement = "INSERT INTO Movies VALUES( :id , :title, :year);"; // Create the parameters. SqlParameter idParam = SqlParameter.builder() .name("id") .value(String.valueOf(t)) .build(); SqlParameter titleParam= SqlParameter.builder() .name("title") .value(title) .build(); SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); parameterList.add(idParam); parameterList.add(titleParam); parameterList.add(yearParam); try { ExecuteStatementRequest insertStatementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .sql(sqlStatement) .database(databaseName) .dbUser(userName) .parameters(parameterList) .build(); redshiftDataClient.executeStatement(insertStatementRequest); System.out.println("Inserted: " + title + " (" + year + ")"); t++; } catch (RedshiftDataException e) { System.err.println("Error inserting data: " + e.getMessage()); System.exit(1); } } System.out.println(t + " records were added to the Movies table. "); } public static void checkStatement(RedshiftDataClient redshiftDataClient, String sqlId) { try { DescribeStatementRequest statementRequest = DescribeStatementRequest.builder() .id(sqlId) .build(); String status; while (true) { DescribeStatementResponse response = redshiftDataClient.describeStatement(statementRequest); status = response.statusAsString(); System.out.println("..." + status); if (status.compareTo("FAILED") == 0 ) { System.out.println("The Query Failed. Ending program"); System.exit(1); } else if (status.compareTo("FINISHED") == 0) { break; } TimeUnit.SECONDS.sleep(1); } System.out.println("The statement is finished!"); } catch (RedshiftDataException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void modifyCluster(RedshiftClient redshiftClient, String clusterId) { try { ModifyClusterRequest modifyClusterRequest = ModifyClusterRequest.builder() .clusterIdentifier(clusterId) .preferredMaintenanceWindow("wed:07:30-wed:08:00") .build(); ModifyClusterResponse clusterResponse = redshiftClient.modifyCluster(modifyClusterRequest); System.out.println("The modified cluster was successfully modified and has " + clusterResponse.cluster().preferredMaintenanceWindow() + " as the maintenance window"); } catch (RedshiftException e) { System.err.println(e.getMessage()); System.exit(1); } } public static String queryMoviesByYear(RedshiftDataClient redshiftDataClient, String database, String dbUser, int year, String clusterId) { try { String sqlStatement = " SELECT * FROM Movies WHERE year = :year"; SqlParameter yearParam= SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .database(database) .dbUser(dbUser) .parameters(yearParam) .sql(sqlStatement) .build(); ExecuteStatementResponse response = redshiftDataClient.executeStatement(statementRequest); return response.id(); } catch (RedshiftDataException e) { System.err.println(e.getMessage()); System.exit(1); } return ""; } public static void getResults(RedshiftDataClient redshiftDataClient, String statementId) { try { GetStatementResultRequest resultRequest = GetStatementResultRequest.builder() .id(statementId) .build(); // Extract and print the field values using streams. GetStatementResultResponse response = redshiftDataClient.getStatementResult(resultRequest); response.records().stream() .flatMap(List::stream) .map(Field::stringValue) .filter(value -> value != null) .forEach(value -> System.out.println("The Movie title field is " + value)); } catch (RedshiftDataException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void waitForClusterReady(RedshiftClient redshiftClient, String clusterId) { boolean clusterReady = false; String clusterReadyStr; System.out.println("Waiting for cluster to become available. This may take a few mins."); try { DescribeClustersRequest clustersRequest = DescribeClustersRequest.builder() .clusterIdentifier(clusterId) .build(); long startTime = System.currentTimeMillis(); // Loop until the cluster is ready. while (!clusterReady) { DescribeClustersResponse clusterResponse = redshiftClient.describeClusters(clustersRequest); List<Cluster> clusterList = clusterResponse.clusters(); for (Cluster cluster : clusterList) { clusterReadyStr = cluster.clusterStatus(); if (clusterReadyStr.contains("available")) clusterReady = true; else { long elapsedTimeMillis = System.currentTimeMillis() - startTime; long elapsedSeconds = elapsedTimeMillis / 1000; long minutes = elapsedSeconds / 60; long seconds = elapsedSeconds % 60; System.out.printf("Elapsed Time: %02d:%02d - Waiting for cluster... %n", minutes, seconds); TimeUnit.SECONDS.sleep(5); } } } long elapsedTimeMillis = System.currentTimeMillis() - startTime; long elapsedSeconds = elapsedTimeMillis / 1000; long minutes = elapsedSeconds / 60; long seconds = elapsedSeconds % 60; System.out.println(String.format("Cluster is available! Total Elapsed Time: %02d:%02d", minutes, seconds)); } catch (RedshiftException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void createTable(RedshiftDataClient redshiftDataClient, String clusterId, String databaseName, String userName) { try { ExecuteStatementRequest createTableRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .dbUser(userName) .database(databaseName) .sql("CREATE TABLE Movies (" + "id INT PRIMARY KEY, " + "title VARCHAR(100), " + "year INT)") .build(); redshiftDataClient.executeStatement(createTableRequest); System.out.println("Table created: Movies"); } catch (RedshiftDataException e) { System.err.println("Error creating table: " + e.getMessage()); System.exit(1); } } public static void createCluster(RedshiftClient redshiftClient, String clusterId, String masterUsername, String masterUserPassword) { try { CreateClusterRequest clusterRequest = CreateClusterRequest.builder() .clusterIdentifier(clusterId) .masterUsername(masterUsername) .masterUserPassword(masterUserPassword) .nodeType("ra3.4xlarge") .publiclyAccessible(true) .numberOfNodes(2) .build(); CreateClusterResponse clusterResponse = redshiftClient.createCluster(clusterRequest); System.out.println("Created cluster " + clusterResponse.cluster().clusterIdentifier()); } catch (RedshiftException e) { System.err.println(e.getMessage()); System.exit(1); } } }
Python
SDK for Python(Boto3)
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 Amazon 代码示例存储库中进行设置和运行。

class RedshiftScenario: """Runs an interactive scenario that shows how to get started with Redshift.""" def __init__(self, redshift_wrapper, redshift_data_wrapper): self.redshift_wrapper = redshift_wrapper self.redshift_data_wrapper = redshift_data_wrapper def redhift_scenario(self, json_file_path): database_name = "dev" print(DASHES) print("Welcome to the Amazon Redshift SDK Getting Started example.") print( """ This Python program demonstrates how to interact with Amazon Redshift using the AWS SDK for Python (Boto3). Amazon Redshift is a fully managed, petabyte-scale data warehouse service hosted in the cloud. The program's primary functionalities include cluster creation, verification of cluster readiness, listing databases, table creation, populating data within the table, and executing SQL statements. It also demonstrates querying data from the Movies table. Upon completion, all AWS resources are cleaned up. """ ) if not os.path.isfile(json_file_path): logging.error(f"The file {json_file_path} does not exist.") return print("Let's get started...") user_name = q.ask("Please enter your user name (default is awsuser):") user_name = user_name if user_name else "awsuser" print(DASHES) user_password = q.ask( "Please enter your user password (default is AwsUser1000):" ) user_password = user_password if user_password else "AwsUser1000" print(DASHES) print( """A Redshift cluster refers to the collection of computing resources and storage that work together to process and analyze large volumes of data.""" ) cluster_id = q.ask( "Enter a cluster identifier value (default is redshift-cluster-movies): " ) cluster_id = cluster_id if cluster_id else "redshift-cluster-movies" self.redshift_wrapper.create_cluster( cluster_id, "ra3.4xlarge", user_name, user_password, True, 2 ) print(DASHES) print(f"Wait until {cluster_id} is available. This may take a few minutes...") q.ask("Press Enter to continue...") self.wait_cluster_available(cluster_id) print(DASHES) print( f""" When you created {cluster_id}, the dev database is created by default and used in this scenario. To create a custom database, you need to have a CREATEDB privilege. For more information, see the documentation here: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_DATABASE.html. """ ) q.ask("Press Enter to continue...") print(DASHES) print(DASHES) print(f"List databases in {cluster_id}") q.ask("Press Enter to continue...") databases = self.redshift_data_wrapper.list_databases( cluster_id, database_name, user_name ) print(f"The cluster contains {len(databases)} database(s).") for database in databases: print(f" Database: {database}") print(DASHES) print(DASHES) print("Now you will create a table named Movies.") q.ask("Press Enter to continue...") self.create_table(cluster_id, database_name, user_name) print(DASHES) print("Populate the Movies table using the Movies.json file.") print( "Specify the number of records you would like to add to the Movies Table." ) print("Please enter a value between 50 and 200.") while True: try: num_records = int(q.ask("Enter a value: ", q.is_int)) if 50 <= num_records <= 200: break else: print("Invalid input. Please enter a value between 50 and 200.") except ValueError: print("Invalid input. Please enter a value between 50 and 200.") self.populate_table( cluster_id, database_name, user_name, json_file_path, num_records ) print(DASHES) print("Query the Movies table by year. Enter a value between 2012-2014.") while True: movie_year = int(q.ask("Enter a year: ", q.is_int)) if 2012 <= movie_year <= 2014: break else: print("Invalid input. Please enter a valid year between 2012 and 2014.") # Function to query database sql_id = self.query_movies_by_year( database_name, user_name, movie_year, cluster_id ) print(f"The identifier of the statement is {sql_id}") print("Checking statement status...") self.wait_statement_finished(sql_id) result = self.redshift_data_wrapper.get_statement_result(sql_id) self.display_movies(result) print(DASHES) print(DASHES) print("Now you will modify the Redshift cluster.") q.ask("Press Enter to continue...") preferred_maintenance_window = "wed:07:30-wed:08:00" self.redshift_wrapper.modify_cluster(cluster_id, preferred_maintenance_window) print(DASHES) print(DASHES) delete = q.ask("Do you want to delete the cluster? (y/n) ", q.is_yesno) if delete: print(f"You selected to delete {cluster_id}") q.ask("Press Enter to continue...") self.redshift_wrapper.delete_cluster(cluster_id) else: print(f"Cluster {cluster_id}cluster_id was not deleted") print(DASHES) print("This concludes the Amazon Redshift SDK Getting Started scenario.") print(DASHES) def create_table(self, cluster_id, database, username): self.redshift_data_wrapper.execute_statement( cluster_identifier=cluster_id, database_name=database, user_name=username, sql="CREATE TABLE Movies (statement_id INT PRIMARY KEY, title VARCHAR(100), year INT)", ) print("Table created: Movies") def populate_table(self, cluster_id, database, username, file_name, number): with open(file_name) as f: data = json.load(f) i = 0 for record in data: if i == number: break statement_id = i title = record["title"] year = record["year"] i = i + 1 parameters = [ {"name": "statement_id", "value": str(statement_id)}, {"name": "title", "value": title}, {"name": "year", "value": str(year)}, ] self.redshift_data_wrapper.execute_statement( cluster_identifier=cluster_id, database_name=database, user_name=username, sql="INSERT INTO Movies VALUES(:statement_id, :title, :year)", parameter_list=parameters, ) print(f"{i} records inserted into Movies table") def wait_cluster_available(self, cluster_id): """ Waits for a cluster to be available. :param cluster_id: The cluster identifier. Note: The cluster_available waiter can also be used. It is not used in this case to allow an elapsed time message. """ cluster_ready = False start_time = time.time() while not cluster_ready: time.sleep(30) cluster = self.redshift_wrapper.describe_clusters(cluster_id) status = cluster[0]["ClusterStatus"] if status == "available": cluster_ready = True elif status != "creating": raise Exception( f"Cluster {cluster_id} creation failed with status {status}." ) elapsed_seconds = int(round(time.time() - start_time)) minutes = int(elapsed_seconds // 60) seconds = int(elapsed_seconds % 60) print(f"Elapsed Time: {minutes}:{seconds:02d} - status {status}...") if minutes > 30: raise Exception( f"Cluster {cluster_id} is not available after 30 minutes." ) def query_movies_by_year(self, database, username, year, cluster_id): sql = "SELECT * FROM Movies WHERE year = :year" params = [{"name": "year", "value": str(year)}] response = self.redshift_data_wrapper.execute_statement( cluster_identifier=cluster_id, database_name=database, user_name=username, sql=sql, parameter_list=params, ) return response["Id"] @staticmethod def display_movies(response): metadata = response["ColumnMetadata"] records = response["Records"] title_column_index = None for i in range(len(metadata)): if metadata[i]["name"] == "title": title_column_index = i break if title_column_index is None: print("No title column found.") return print(f"Found {len(records)} movie(s).") for record in records: print(f" {record[title_column_index]['stringValue']}") def wait_statement_finished(self, sql_id): while True: time.sleep(1) response = self.redshift_data_wrapper.describe_statement(sql_id) status = response["Status"] print(f"Statement status is {status}.") if status == "FAILED": print(f"The query failed because {response['Error']}. Ending program") raise Exception("The Query Failed. Ending program") elif status == "FINISHED": break

用于显示场景实施的主函数。

def main(): redshift_client = boto3.client("redshift") redshift_data_client = boto3.client("redshift-data") redshift_wrapper = RedshiftWrapper(redshift_client) redshift_data_wrapper = RedshiftDataWrapper(redshift_data_client) redshift_scenario = RedshiftScenario(redshift_wrapper, redshift_data_wrapper) redshift_scenario.redhift_scenario( f"{os.path.dirname(__file__)}/../../../resources/sample_files/movies.json" )

场景中使用的包装器函数。

def create_cluster( self, cluster_identifier, node_type, master_username, master_user_password, publicly_accessible, number_of_nodes, ): """ Creates a cluster. :param cluster_identifier: The name of the cluster. :param node_type: The type of node in the cluster. :param master_username: The master username. :param master_user_password: The master user password. :param publicly_accessible: Whether the cluster is publicly accessible. :param number_of_nodes: The number of nodes in the cluster. :return: The cluster. """ try: cluster = self.client.create_cluster( ClusterIdentifier=cluster_identifier, NodeType=node_type, MasterUsername=master_username, MasterUserPassword=master_user_password, PubliclyAccessible=publicly_accessible, NumberOfNodes=number_of_nodes, ) return cluster except ClientError as err: logging.error( "Couldn't create a cluster. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def describe_clusters(self, cluster_identifier): """ Describes a cluster. :param cluster_identifier: The cluster identifier. :return: A list of clusters. """ try: kwargs = {} if cluster_identifier: kwargs["ClusterIdentifier"] = cluster_identifier paginator = self.client.get_paginator("describe_clusters") clusters = [] for page in paginator.paginate(**kwargs): clusters.extend(page["Clusters"]) return clusters except ClientError as err: logging.error( "Couldn't describe a cluster. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def execute_statement( self, cluster_identifier, database_name, user_name, sql, parameter_list=None ): """ Executes a SQL statement. :param cluster_identifier: The cluster identifier. :param database_name: The database name. :param user_name: The user's name. :param sql: The SQL statement. :param parameter_list: The optional SQL statement parameters. :return: The SQL statement result. """ try: kwargs = { "ClusterIdentifier": cluster_identifier, "Database": database_name, "DbUser": user_name, "Sql": sql, } if parameter_list: kwargs["Parameters"] = parameter_list response = self.client.execute_statement(**kwargs) return response except ClientError as err: logging.error( "Couldn't execute statement. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def describe_statement(self, statement_id): """ Describes a SQL statement. :param statement_id: The SQL statement identifier. :return: The SQL statement result. """ try: response = self.client.describe_statement(Id=statement_id) return response except ClientError as err: logging.error( "Couldn't describe statement. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_statement_result(self, statement_id): """ Gets the result of a SQL statement. :param statement_id: The SQL statement identifier. :return: The SQL statement result. """ try: result = { "Records": [], } paginator = self.client.get_paginator("get_statement_result") for page in paginator.paginate(Id=statement_id): if "ColumnMetadata" not in result: result["ColumnMetadata"] = page["ColumnMetadata"] result["Records"].extend(page["Records"]) return result except ClientError as err: logging.error( "Couldn't get statement result. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def modify_cluster(self, cluster_identifier, preferred_maintenance_window): """ Modifies a cluster. :param cluster_identifier: The cluster identifier. :param preferred_maintenance_window: The preferred maintenance window. """ try: self.client.modify_cluster( ClusterIdentifier=cluster_identifier, PreferredMaintenanceWindow=preferred_maintenance_window, ) except ClientError as err: logging.error( "Couldn't modify a cluster. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_databases(self, cluster_identifier, database_name, database_user): """ Lists databases in a cluster. :param cluster_identifier: The cluster identifier. :param database_name: The database name. :param database_user: The database user. :return: The list of databases. """ try: paginator = self.client.get_paginator("list_databases") databases = [] for page in paginator.paginate( ClusterIdentifier=cluster_identifier, Database=database_name, DbUser=database_user, ): databases.extend(page["Databases"]) return databases except ClientError as err: logging.error( "Couldn't list databases. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_cluster(self, cluster_identifier): """ Deletes a cluster. :param cluster_identifier: The cluster identifier. """ try: self.client.delete_cluster( ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True ) except ClientError as err: logging.error( "Couldn't delete a cluster. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

有关 Amazon SDK 开发人员指南和代码示例的完整列表,请参阅 将此服务与 Amazon SDK 结合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。