使用批量 PartiQL 语句和 Amazon SDK 查询 DynamoDB 表
以下代码示例显示如何使用批量 PartiQL 语句查询 DynamoDB 表。
- Go
-
- SDK for Go V2
-
提示 要了解如何设置和运行此示例,请参阅 GitHub
。 创建一个结构,作为可以运行 PartiQL 语句的方法的接收器。
// PartiQLRunner encapsulates the Amazon DynamoDB service actions used in the // PartiQL examples. It contains a DynamoDB service client that is used to act on the // specified table. type PartiQLRunner struct { DynamoDbClient *dynamodb.Client TableName string } // AddMovieBatch runs a batch of PartiQL INSERT statements to add multiple movies to the // DynamoDB table. func (runner PartiQLRunner) AddMovieBatch(movies []Movie) error { statementRequests := make([]types.BatchStatementRequest, len(movies)) for index, movie := range movies { params, err := attributevalue.MarshalList([]interface{}{movie.Title, movie.Year, movie.Info}) if err != nil { panic(err) } statementRequests[index] = types.BatchStatementRequest{ Statement: aws.String(fmt.Sprintf( "INSERT INTO \"%v\" VALUE {'title': ?, 'year': ?, 'info': ?}", runner.TableName)), Parameters: params, } } _, err := runner.DynamoDbClient.BatchExecuteStatement(context.TODO(), &dynamodb.BatchExecuteStatementInput{ Statements: statementRequests, }) if err != nil { log.Printf("Couldn't insert a batch of items with PartiQL. Here's why: %v\n", err) } return err } // GetMovieBatch runs a batch of PartiQL SELECT statements to get multiple movies from // the DynamoDB table by title and year. func (runner PartiQLRunner) GetMovieBatch(movies []Movie) ([]Movie, error) { statementRequests := make([]types.BatchStatementRequest, len(movies)) for index, movie := range movies { params, err := attributevalue.MarshalList([]interface{}{movie.Title, movie.Year}) if err != nil { panic(err) } statementRequests[index] = types.BatchStatementRequest{ Statement: aws.String( fmt.Sprintf("SELECT * FROM \"%v\" WHERE title=? AND year=?", runner.TableName)), Parameters: params, } } output, err := runner.DynamoDbClient.BatchExecuteStatement(context.TODO(), &dynamodb.BatchExecuteStatementInput{ Statements: statementRequests, }) var outMovies []Movie if err != nil { log.Printf("Couldn't get a batch of items with PartiQL. Here's why: %v\n", err) } else { for _, response := range output.Responses { var movie Movie err = attributevalue.UnmarshalMap(response.Item, &movie) if err != nil { log.Printf("Couldn't unmarshal response. Here's why: %v\n", err) } else { outMovies = append(outMovies, movie) } } } return outMovies, err } // GetAllMovies runs a PartiQL SELECT statement to get all movies from the DynamoDB table. // The results are projected to return only the title and rating of each movie. func (runner PartiQLRunner) GetAllMovies() ([]map[string]interface{}, error) { var output []map[string]interface{} response, err := runner.DynamoDbClient.ExecuteStatement(context.TODO(), &dynamodb.ExecuteStatementInput{ Statement: aws.String( fmt.Sprintf("SELECT title, info.rating FROM \"%v\"", runner.TableName)), }) if err != nil { log.Printf("Couldn't get movies. Here's why: %v\n", err) } else { err = attributevalue.UnmarshalListOfMaps(response.Items, &output) if err != nil { log.Printf("Couldn't unmarshal response. Here's why: %v\n", err) } } return output, err } // UpdateMovieBatch runs a batch of PartiQL UPDATE statements to update the rating of // multiple movies that already exist in the DynamoDB table. func (runner PartiQLRunner) UpdateMovieBatch(movies []Movie, ratings []float64) error { statementRequests := make([]types.BatchStatementRequest, len(movies)) for index, movie := range movies { params, err := attributevalue.MarshalList([]interface{}{ratings[index], movie.Title, movie.Year}) if err != nil { panic(err) } statementRequests[index] = types.BatchStatementRequest{ Statement: aws.String( fmt.Sprintf("UPDATE \"%v\" SET info.rating=? WHERE title=? AND year=?", runner.TableName)), Parameters: params, } } _, err := runner.DynamoDbClient.BatchExecuteStatement(context.TODO(), &dynamodb.BatchExecuteStatementInput{ Statements: statementRequests, }) if err != nil { log.Printf("Couldn't update the batch of movies. Here's why: %v\n", err) } return err } // DeleteMovieBatch runs a batch of PartiQL DELETE statements to remove multiple movies // from the DynamoDB table. func (runner PartiQLRunner) DeleteMovieBatch(movies []Movie) error { statementRequests := make([]types.BatchStatementRequest, len(movies)) for index, movie := range movies { params, err := attributevalue.MarshalList([]interface{}{movie.Title, movie.Year}) if err != nil { panic(err) } statementRequests[index] = types.BatchStatementRequest{ Statement: aws.String( fmt.Sprintf("DELETE FROM \"%v\" WHERE title=? AND year=?", runner.TableName)), Parameters: params, } } _, err := runner.DynamoDbClient.BatchExecuteStatement(context.TODO(), &dynamodb.BatchExecuteStatementInput{ Statements: statementRequests, }) if err != nil { log.Printf("Couldn't delete the batch of movies. Here's why: %v\n", err) } return err }
运行创建表并运行批量 PartIQL 查询的场景。
// RunPartiQLBatchScenario shows you how to use the AWS SDK for Go // to run batches of PartiQL statements to query a table that stores data about movies. // // * Use batches of PartiQL statements to add, get, update, and delete data for // individual movies. // // This example creates an Amazon DynamoDB service client from the specified sdkConfig so that // you can replace it with a mocked or stubbed config for unit testing. // // This example creates and deletes a DynamoDB table to use during the scenario. func RunPartiQLBatchScenario(sdkConfig aws.Config, tableName string) { defer func() { if r := recover(); r != nil { fmt.Printf("Something went wrong with the demo.") } }() log.Println(strings.Repeat("-", 88)) log.Println("Welcome to the Amazon DynamoDB PartiQL batch demo.") log.Println(strings.Repeat("-", 88)) tableBasics := actions.TableBasics{ DynamoDbClient: dynamodb.NewFromConfig(sdkConfig), TableName: tableName, } runner := actions.PartiQLRunner{ DynamoDbClient: dynamodb.NewFromConfig(sdkConfig), TableName: tableName, } exists, err := tableBasics.TableExists() if err != nil { panic(err) } if !exists { log.Printf("Creating table %v...\n", tableName) _, err = tableBasics.CreateMovieTable() if err != nil { panic(err) } else { log.Printf("Created table %v.\n", tableName) } } else { log.Printf("Table %v already exists.\n", tableName) } log.Println(strings.Repeat("-", 88)) currentYear, _, _ := time.Now().Date() customMovies := []actions.Movie{{ Title: "House PartiQL", Year: currentYear - 5, Info: map[string]interface{}{ "plot": "Wacky high jinks result from querying a mysterious database.", "rating": 8.5}}, { Title: "House PartiQL 2", Year: currentYear - 3, Info: map[string]interface{}{ "plot": "Moderate high jinks result from querying another mysterious database.", "rating": 6.5}}, { Title: "House PartiQL 3", Year: currentYear - 1, Info: map[string]interface{}{ "plot": "Tepid high jinks result from querying yet another mysterious database.", "rating": 2.5}, }, } log.Printf("Inserting a batch of movies into table '%v'.\n", tableName) err = runner.AddMovieBatch(customMovies) if err == nil { log.Printf("Added %v movies to the table.\n", len(customMovies)) } log.Println(strings.Repeat("-", 88)) log.Println("Getting data for a batch of movies.") movies, err := runner.GetMovieBatch(customMovies) if err == nil { for _, movie := range movies { log.Println(movie) } } log.Println(strings.Repeat("-", 88)) newRatings := []float64{7.7, 4.4, 1.1} log.Println("Updating a batch of movies with new ratings.") err = runner.UpdateMovieBatch(customMovies, newRatings) if err == nil { log.Printf("Updated %v movies with new ratings.\n", len(customMovies)) } log.Println(strings.Repeat("-", 88)) log.Println("Getting projected data from the table to verify our update.") projections, err := runner.GetAllMovies() if err == nil { for _, projection := range projections { log.Println(projection) } } log.Println(strings.Repeat("-", 88)) log.Println("Deleting a batch of movies.") err = runner.DeleteMovieBatch(customMovies) if err == nil { log.Printf("Deleted %v movies.\n", len(customMovies)) } err = tableBasics.DeleteTable() if err == nil { log.Printf("Deleted table %v.\n", tableBasics.TableName) } log.Println(strings.Repeat("-", 88)) log.Println("Thanks for watching!") log.Println(strings.Repeat("-", 88)) }
-
有关 API 详细信息,请参阅《Amazon SDK for Go API 参考》中的 BatchExecuteStatement
。
-
- Java
-
- SDK for Java 2.x
-
提示 要了解如何设置和运行此示例,请参阅 GitHub
。 public class ScenarioPartiQLBatch { public static void main(String [] args) throws IOException { String tableName = "MoviesPartiQBatch"; ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(); Region region = Region.US_EAST_1; DynamoDbClient ddb = DynamoDbClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); System.out.println("******* Creating an Amazon DynamoDB table named "+tableName +" with a key named year and a sort key named title."); createTable(ddb, tableName); System.out.println("******* Adding multiple records into the "+ tableName +" table using a batch command."); putRecordBatch(ddb); System.out.println("******* Updating multiple records using a batch command."); updateTableItemBatch(ddb); System.out.println("******* Deleting multiple records using a batch command."); deleteItemBatch(ddb); System.out.println("******* Deleting the Amazon DynamoDB table."); deleteDynamoDBTable(ddb, tableName); ddb.close(); } public static void createTable(DynamoDbClient ddb, String tableName) { DynamoDbWaiter dbWaiter = ddb.waiter(); ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<>(); // Define attributes. attributeDefinitions.add(AttributeDefinition.builder() .attributeName("year") .attributeType("N") .build()); attributeDefinitions.add(AttributeDefinition.builder() .attributeName("title") .attributeType("S") .build()); ArrayList<KeySchemaElement> tableKey = new ArrayList<>(); KeySchemaElement key = KeySchemaElement.builder() .attributeName("year") .keyType(KeyType.HASH) .build(); KeySchemaElement key2 = KeySchemaElement.builder() .attributeName("title") .keyType(KeyType.RANGE) // Sort .build(); // Add KeySchemaElement objects to the list. tableKey.add(key); tableKey.add(key2); CreateTableRequest request = CreateTableRequest.builder() .keySchema(tableKey) .provisionedThroughput(ProvisionedThroughput.builder() .readCapacityUnits(new Long(10)) .writeCapacityUnits(new Long(10)) .build()) .attributeDefinitions(attributeDefinitions) .tableName(tableName) .build(); try { CreateTableResponse response = ddb.createTable(request); DescribeTableRequest tableRequest = DescribeTableRequest.builder() .tableName(tableName) .build(); // Wait until the Amazon DynamoDB table is created. WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest); waiterResponse.matched().response().ifPresent(System.out::println); String newTable = response.tableDescription().tableName(); System.out.println("The " +newTable + " was successfully created."); } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void putRecordBatch(DynamoDbClient ddb) { String sqlStatement = "INSERT INTO MoviesPartiQBatch VALUE {'year':?, 'title' : ?, 'info' : ?}"; try { // Create three movies to add to the Amazon DynamoDB table. // Set data for Movie 1. List<AttributeValue> parameters = new ArrayList<>(); AttributeValue att1 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue att2 = AttributeValue.builder() .s("My Movie 1") .build(); AttributeValue att3 = AttributeValue.builder() .s("No Information") .build(); parameters.add(att1); parameters.add(att2); parameters.add(att3); BatchStatementRequest statementRequestMovie1 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parameters) .build(); // Set data for Movie 2. List<AttributeValue> parametersMovie2 = new ArrayList<>(); AttributeValue attMovie2 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue attMovie2A = AttributeValue.builder() .s("My Movie 2") .build(); AttributeValue attMovie2B = AttributeValue.builder() .s("No Information") .build(); parametersMovie2.add(attMovie2); parametersMovie2.add(attMovie2A); parametersMovie2.add(attMovie2B); BatchStatementRequest statementRequestMovie2 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersMovie2) .build(); // Set data for Movie 3. List<AttributeValue> parametersMovie3 = new ArrayList<>(); AttributeValue attMovie3 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue attMovie3A = AttributeValue.builder() .s("My Movie 3") .build(); AttributeValue attMovie3B = AttributeValue.builder() .s("No Information") .build(); parametersMovie3.add(attMovie3); parametersMovie3.add(attMovie3A); parametersMovie3.add(attMovie3B); BatchStatementRequest statementRequestMovie3 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersMovie3) .build(); // Add all three movies to the list. List<BatchStatementRequest> myBatchStatementList = new ArrayList<>(); myBatchStatementList.add(statementRequestMovie1); myBatchStatementList.add(statementRequestMovie2); myBatchStatementList.add(statementRequestMovie3); BatchExecuteStatementRequest batchRequest = BatchExecuteStatementRequest.builder() .statements(myBatchStatementList) .build(); BatchExecuteStatementResponse response = ddb.batchExecuteStatement(batchRequest); System.out.println("ExecuteStatement successful: "+ response.toString()); System.out.println("Added new movies using a batch command."); } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void updateTableItemBatch(DynamoDbClient ddb){ String sqlStatement = "UPDATE MoviesPartiQBatch SET info = 'directors\":[\"Merian C. Cooper\",\"Ernest B. Schoedsack' where year=? and title=?"; List<AttributeValue> parametersRec1 = new ArrayList<>(); // Lets update three records AttributeValue att1 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue att2 = AttributeValue.builder() .s("My Movie 1") .build(); parametersRec1.add(att1); parametersRec1.add(att2); BatchStatementRequest statementRequestRec1 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersRec1) .build(); // Update record 2. List<AttributeValue> parametersRec2 = new ArrayList<>(); AttributeValue attRec2 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue attRec2a = AttributeValue.builder() .s("My Movie 2") .build(); parametersRec2.add(attRec2); parametersRec2.add(attRec2a); BatchStatementRequest statementRequestRec2 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersRec2) .build(); // Update record 3. List<AttributeValue> parametersRec3 = new ArrayList<>(); AttributeValue attRec3 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue attRec3a = AttributeValue.builder() .s("My Movie 3") .build(); parametersRec3.add(attRec3); parametersRec3.add(attRec3a); BatchStatementRequest statementRequestRec3 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersRec3) .build(); // Add all three movies to the list. List<BatchStatementRequest> myBatchStatementList = new ArrayList<>(); myBatchStatementList.add(statementRequestRec1); myBatchStatementList.add(statementRequestRec2); myBatchStatementList.add(statementRequestRec3); BatchExecuteStatementRequest batchRequest = BatchExecuteStatementRequest.builder() .statements(myBatchStatementList) .build(); try { BatchExecuteStatementResponse response = ddb.batchExecuteStatement(batchRequest); System.out.println("ExecuteStatement successful: "+ response.toString()); System.out.println("Updated three movies using a batch command."); } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Item was updated!"); } public static void deleteItemBatch(DynamoDbClient ddb){ String sqlStatement = "DELETE FROM MoviesPartiQBatch WHERE year = ? and title=?"; List<AttributeValue> parametersRec1 = new ArrayList<>(); // Specify three records to delete. AttributeValue att1 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue att2 = AttributeValue.builder() .s("My Movie 1") .build(); parametersRec1.add(att1); parametersRec1.add(att2); BatchStatementRequest statementRequestRec1 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersRec1) .build(); // Specify record 2. List<AttributeValue> parametersRec2 = new ArrayList<>(); AttributeValue attRec2 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue attRec2a = AttributeValue.builder() .s("My Movie 2") .build(); parametersRec2.add(attRec2); parametersRec2.add(attRec2a); BatchStatementRequest statementRequestRec2 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersRec2) .build(); // Specify record 3. List<AttributeValue> parametersRec3 = new ArrayList<>(); AttributeValue attRec3 = AttributeValue.builder() .n(String.valueOf("2022")) .build(); AttributeValue attRec3a = AttributeValue.builder() .s("My Movie 3") .build(); parametersRec3.add(attRec3); parametersRec3.add(attRec3a); BatchStatementRequest statementRequestRec3 = BatchStatementRequest.builder() .statement(sqlStatement) .parameters(parametersRec3) .build(); // Add all three movies to the list. List<BatchStatementRequest> myBatchStatementList = new ArrayList<>(); myBatchStatementList.add(statementRequestRec1); myBatchStatementList.add(statementRequestRec2); myBatchStatementList.add(statementRequestRec3); BatchExecuteStatementRequest batchRequest = BatchExecuteStatementRequest.builder() .statements(myBatchStatementList) .build(); try { ddb.batchExecuteStatement(batchRequest); System.out.println("Deleted three movies using a batch command."); } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void deleteDynamoDBTable(DynamoDbClient ddb, String tableName) { DeleteTableRequest request = DeleteTableRequest.builder() .tableName(tableName) .build(); try { ddb.deleteTable(request); } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println(tableName +" was successfully deleted!"); } private static ExecuteStatementResponse executeStatementRequest(DynamoDbClient ddb, String statement, List<AttributeValue> parameters ) { ExecuteStatementRequest request = ExecuteStatementRequest.builder() .statement(statement) .parameters(parameters) .build(); return ddb.executeStatement(request); } }
-
有关 API 详细信息,请参阅《Amazon SDK for Java 2.x API 参考》中的 BatchExecuteStatement。
-
- JavaScript
-
- SDK for JavaScript V3
-
提示 要了解如何设置和运行此示例,请参阅 GitHub
。 创建客户端。
// Create the DynamoDB service client module using ES6 syntax. import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; // Set the AWS Region. export const REGION = "eu-west-1"; // For example, "us-east-1". // Create an Amazon DynamoDB service client object. export const ddbClient = new DynamoDBClient({ region: REGION });
创建文档客户端。
// Create a service client module using ES6 syntax. import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb"; import { ddbClient } from "./ddbClient.js"; // Set the AWS Region. const REGION = "eu-west-1"; // For example, "us-east-1". const marshallOptions = { // Whether to automatically convert empty strings, blobs, and sets to `null`. convertEmptyValues: false, // false, by default. // Whether to remove undefined values while marshalling. removeUndefinedValues: false, // false, by default. // Whether to convert typeof object to map attribute. convertClassInstanceToMap: false, // false, by default. }; const unmarshallOptions = { // Whether to return numbers as a string instead of converting them to native JavaScript numbers. wrapNumbers: false, // false, by default. }; const translateConfig = { marshallOptions, unmarshallOptions }; // Create the DynamoDB document client. const ddbDocClient = DynamoDBDocumentClient.from(ddbClient, translateConfig); export { ddbDocClient };
批量查询项目。
*/ import fs from "fs"; // A practical functional library used to split the data into segments. import * as R from "ramda"; import { ddbClient } from "../libs/ddbClient.js"; import { ddbDocClient } from "../libs/ddbDocClient.js"; import { BatchWriteCommand } from "@aws-sdk/lib-dynamodb"; import { CreateTableCommand, BatchExecuteStatementCommand, } from "@aws-sdk/client-dynamodb"; if (process.argv.length < 6) { console.log( "Usage: node partiQL_basics.js <tableName> <movieTitle1> <movieYear1> <movieTitle1> <movieYear1> <producer1> <producer2> \n" + "Example: node partiQL_basics.js Movies_batch 2006 'The Departed' 2013 '2 Guns' 'New View Films' 'Old Thyme Films'" ); } const tableName = process.argv[2]; const movieYear1 = parseInt(process.argv[3]); const movieTitle1 = process.argv[4]; const movieYear2 = parseInt(process.argv[5]); const movieTitle2 = process.argv[6]; const producer1 = process.argv[7]; const producer2 = process.argv[8]; // Helper function to delay running the code while the AWS service calls wait for responses. function wait(ms) { var start = Date.now(); var end = start; while (end < start + ms) { end = Date.now(); } } // Set the parameters. export const run = async ( tableName, movieYear1, movieTitle1, movieYear2, movieTitle2, producer1, producer2 ) => { try { console.log("Creating table ..."); // Set the parameters. const params = { AttributeDefinitions: [ { AttributeName: "title", AttributeType: "S", }, { AttributeName: "year", AttributeType: "N", }, ], KeySchema: [ { AttributeName: "title", KeyType: "HASH", }, { AttributeName: "year", KeyType: "RANGE", }, ], ProvisionedThroughput: { ReadCapacityUnits: 5, WriteCapacityUnits: 5, }, TableName: tableName, }; const data = await ddbClient.send(new CreateTableCommand(params)); console.log("Waiting for table to be created..."); wait(10000); console.log( "Table created. Table name is ", data.TableDescription.TableName ); try { // Before you run this example, download 'movies.json' from https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.Js.02.html, // and put it in the same folder as the example. // Get the movie data parse to convert into a JSON object. const allMovies = JSON.parse(fs.readFileSync("moviedata.json", "utf8")); // Split the table into segments of 25. const dataSegments = R.splitEvery(25, allMovies); // Loop batch write operation 10 times to upload 250 items. console.log("Writing movies in batch to table..."); for (let i = 0; i < 10; i++) { const segment = dataSegments[i]; for (let j = 0; j < 25; j++) { const params = { RequestItems: { [tableName]: [ { // Destination Amazon DynamoDB table name. PutRequest: { Item: { year: segment[j].year, title: segment[j].title, info: segment[j].info, }, }, }, ], }, }; const data = ddbDocClient.send(new BatchWriteCommand(params)); } } wait(10000); console.log("Success, movies written to table."); try { console.log("Getting movie...."); const params = { Statements: [ { Statement: "SELECT * FROM " + tableName + " where title=? and year=?", Parameters: [{ S: movieTitle1 }, { N: movieYear1 }], }, { Statement: "SELECT * FROM " + tableName + " where title=? and year=?", Parameters: [{ S: movieTitle2 }, { N: movieYear2 }], }, ], }; const data = await ddbDocClient.send( new BatchExecuteStatementCommand(params) ); console.log("Success. The query return the following data.", data); for (let i = 0; i < data.Responses.length; i++) { console.log(data.Responses[i].Item.year); console.log(data.Responses[i].Item.title); } try { const params = { Statements: [ { Statement: "DELETE FROM " + tableName + " where title=? and year=?", Parameters: [{ S: movieTitle1 }, { N: movieYear1 }], }, { Statement: "DELETE FROM " + tableName + " where title=? and year=?", Parameters: [{ S: movieTitle2 }, { N: movieYear2 }], }, ], }; const data = await ddbDocClient.send( new BatchExecuteStatementCommand(params) ); console.log("Success. Items deleted by batch."); try { const params = { Statements: [ { Statement: "INSERT INTO " + tableName + " value {'title':?, 'year':?}", Parameters: [{ S: movieTitle1 }, { N: movieYear1 }], }, { Statement: "INSERT INTO " + tableName + " value {'title':?, 'year':?}", Parameters: [{ S: movieTitle2 }, { N: movieYear2 }], }, ], }; const data = await ddbDocClient.send( new BatchExecuteStatementCommand(params) ); console.log("Success. Items added by batch."); try { const params = { Statements: [ { Statement: "UPDATE " + tableName + " SET Producer=? where title=? and year=?", Parameters: [ { S: producer1 }, { S: movieTitle1 }, { N: movieYear1 }, ], }, { Statement: "UPDATE " + tableName + " SET Producer=? where title=? and year=?", Parameters: [ { S: producer2 }, { S: movieTitle2 }, { N: movieYear2 }, ], }, ], }; console.log("Updating movies..."); const data = await ddbDocClient.send( new BatchExecuteStatementCommand(params) ); console.log("Success. Items updated by batch."); return "Run successfully"; // For unit tests. } catch (err) { console.log("Error updating items by batch. ", err); } } catch (err) { console.log("Error adding items to table by batch. ", err); } } catch (err) { console.log("Error deleting movies by batch. ", err); } } catch (err) { console.log("Error getting movies by batch. ", err); } } catch (err) { console.log("Error adding movies by batch. ", err); } } catch (err) { console.log("Error creating table. ", err); } }; run( tableName, movieYear1, movieTitle1, movieYear2, movieTitle2, producer1, producer2 );
-
有关 API 详细信息,请参阅《Amazon SDK for JavaScript API 参考》中的 BatchExecuteStatement。
-
- Kotlin
-
- SDK for Kotlin
-
注意 这是适用于预览版中功能的预发行文档。本文档随时可能更改。
提示 要了解如何设置和运行此示例,请参阅 GitHub
。 suspend fun main() { val ddb = DynamoDbClient { region = "us-east-1" } val tableName = "MoviesPartiQBatch" println("Creating an Amazon DynamoDB table named $tableName with a key named id and a sort key named title.") createTablePartiQLBatch(ddb, tableName, "year") putRecordBatch(ddb) updateTableItemBatchBatch(ddb) deleteItemsBatch(ddb) deleteTablePartiQLBatch(tableName) } suspend fun createTablePartiQLBatch(ddb: DynamoDbClient, tableNameVal: String, key: String) { val attDef = AttributeDefinition { attributeName = key attributeType = ScalarAttributeType.N } val attDef1 = AttributeDefinition { attributeName = "title" attributeType = ScalarAttributeType.S } val keySchemaVal = KeySchemaElement { attributeName = key keyType = KeyType.Hash } val keySchemaVal1 = KeySchemaElement { attributeName = "title" keyType = KeyType.Range } val provisionedVal = ProvisionedThroughput { readCapacityUnits = 10 writeCapacityUnits = 10 } val request = CreateTableRequest { attributeDefinitions = listOf(attDef, attDef1) keySchema = listOf(keySchemaVal, keySchemaVal1) provisionedThroughput = provisionedVal tableName = tableNameVal } val response = ddb.createTable(request) ddb.waitUntilTableExists { // suspend call tableName = tableNameVal } println("The table was successfully created ${response.tableDescription?.tableArn}") } suspend fun putRecordBatch(ddb: DynamoDbClient) { val sqlStatement = "INSERT INTO MoviesPartiQBatch VALUE {'year':?, 'title' : ?, 'info' : ?}" // Create three movies to add to the Amazon DynamoDB table. val parametersMovie1 = mutableListOf<AttributeValue>() parametersMovie1.add(AttributeValue.N("2022")) parametersMovie1.add(AttributeValue.S("My Movie 1")) parametersMovie1.add(AttributeValue.S("No Information")) val statementRequestMovie1 = BatchStatementRequest { statement = sqlStatement parameters = parametersMovie1 } // Set data for Movie 2. val parametersMovie2 = mutableListOf<AttributeValue>() parametersMovie2.add(AttributeValue.N("2022")) parametersMovie2.add(AttributeValue.S("My Movie 2")) parametersMovie2.add(AttributeValue.S("No Information")) val statementRequestMovie2 = BatchStatementRequest { statement = sqlStatement parameters = parametersMovie2 } // Set data for Movie 3. val parametersMovie3 = mutableListOf<AttributeValue>() parametersMovie3.add(AttributeValue.N("2022")) parametersMovie3.add(AttributeValue.S("My Movie 3")) parametersMovie3.add(AttributeValue.S("No Information")) val statementRequestMovie3 = BatchStatementRequest { statement = sqlStatement parameters = parametersMovie3 } // Add all three movies to the list. val myBatchStatementList = mutableListOf<BatchStatementRequest>() myBatchStatementList.add(statementRequestMovie1) myBatchStatementList.add(statementRequestMovie2) myBatchStatementList.add(statementRequestMovie3) val batchRequest = BatchExecuteStatementRequest { statements = myBatchStatementList } val response = ddb.batchExecuteStatement(batchRequest) println("ExecuteStatement successful: " + response.toString()) println("Added new movies using a batch command.") } suspend fun updateTableItemBatchBatch(ddb: DynamoDbClient) { val sqlStatement = "UPDATE MoviesPartiQBatch SET info = 'directors\":[\"Merian C. Cooper\",\"Ernest B. Schoedsack' where year=? and title=?" val parametersRec1 = mutableListOf<AttributeValue>() parametersRec1.add(AttributeValue.N("2022")) parametersRec1.add(AttributeValue.S("My Movie 1")) val statementRequestRec1 = BatchStatementRequest { statement = sqlStatement parameters = parametersRec1 } // Update record 2. val parametersRec2 = mutableListOf<AttributeValue>() parametersRec2.add(AttributeValue.N("2022")) parametersRec2.add(AttributeValue.S("My Movie 2")) val statementRequestRec2 = BatchStatementRequest { statement = sqlStatement parameters = parametersRec2 } // Update record 3. val parametersRec3 = mutableListOf<AttributeValue>() parametersRec3.add(AttributeValue.N("2022")) parametersRec3.add(AttributeValue.S("My Movie 3")) val statementRequestRec3 = BatchStatementRequest { statement = sqlStatement parameters = parametersRec3 } // Add all three movies to the list. val myBatchStatementList = mutableListOf<BatchStatementRequest>() myBatchStatementList.add(statementRequestRec1) myBatchStatementList.add(statementRequestRec2) myBatchStatementList.add(statementRequestRec3) val batchRequest = BatchExecuteStatementRequest { statements = myBatchStatementList } val response = ddb.batchExecuteStatement(batchRequest) println("ExecuteStatement successful: $response") println("Updated three movies using a batch command.") println("Items were updated!") } suspend fun deleteItemsBatch(ddb: DynamoDbClient) { // Specify three records to delete. val sqlStatement = "DELETE FROM MoviesPartiQBatch WHERE year = ? and title=?" val parametersRec1 = mutableListOf<AttributeValue>() parametersRec1.add(AttributeValue.N("2022")) parametersRec1.add(AttributeValue.S("My Movie 1")) val statementRequestRec1 = BatchStatementRequest { statement = sqlStatement parameters = parametersRec1 } // Specify record 2. val parametersRec2 = mutableListOf<AttributeValue>() parametersRec2.add(AttributeValue.N("2022")) parametersRec2.add(AttributeValue.S("My Movie 2")) val statementRequestRec2 = BatchStatementRequest { statement = sqlStatement parameters = parametersRec2 } // Specify record 3. val parametersRec3 = mutableListOf<AttributeValue>() parametersRec3.add(AttributeValue.N("2022")) parametersRec3.add(AttributeValue.S("My Movie 3")) val statementRequestRec3 = BatchStatementRequest { statement = sqlStatement parameters = parametersRec3 } // Add all three movies to the list. val myBatchStatementList = mutableListOf<BatchStatementRequest>() myBatchStatementList.add(statementRequestRec1) myBatchStatementList.add(statementRequestRec2) myBatchStatementList.add(statementRequestRec3) val batchRequest = BatchExecuteStatementRequest { statements = myBatchStatementList } ddb.batchExecuteStatement(batchRequest) println("Deleted three movies using a batch command.") } suspend fun deleteTablePartiQLBatch(tableNameVal: String) { val request = DeleteTableRequest { tableName = tableNameVal } DynamoDbClient { region = "us-east-1" }.use { ddb -> ddb.deleteTable(request) println("$tableNameVal was deleted") } }
-
有关 API 详细信息,请参阅《适用于 Kotlin 的 Amazon SDK API 参考》中的 BatchExecuteStatement
。
-
- Python
-
- 适用于 Python (Boto3) 的 SDK
-
提示 要了解如何设置和运行此示例,请参阅 GitHub
。 创建一个可以运行批量 PartiQL 语句的类。
from datetime import datetime from decimal import Decimal import logging from pprint import pprint import boto3 from botocore.exceptions import ClientError from scaffold import Scaffold logger = logging.getLogger(__name__) class PartiQLBatchWrapper: """ Encapsulates a DynamoDB resource to run PartiQL statements. """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource def run_partiql(self, statements, param_list): """ Runs a PartiQL statement. A Boto3 resource is used even though `execute_statement` is called on the underlying `client` object because the resource transforms input and output from plain old Python objects (POPOs) to the DynamoDB format. If you create the client directly, you must do these transforms yourself. :param statements: The batch of PartiQL statements. :param param_list: The batch of PartiQL parameters that are associated with each statement. This list must be in the same order as the statements. :return: The responses returned from running the statements, if any. """ try: output = self.dyn_resource.meta.client.batch_execute_statement( Statements=[{ 'Statement': statement, 'Parameters': params } for statement, params in zip(statements, param_list)]) except ClientError as err: if err.response['Error']['Code'] == 'ResourceNotFoundException': logger.error( "Couldn't execute batch of PartiQL statements because the table " "does not exist.") else: logger.error( "Couldn't execute batch of PartiQL statements. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return output
运行创建表并批量运行 PartIQL 查询的场景。
def run_scenario(scaffold, wrapper, table_name): logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') print('-'*88) print("Welcome to the Amazon DynamoDB PartiQL batch statement demo.") print('-'*88) print(f"Creating table '{table_name}' for the demo...") scaffold.create_table(table_name) print('-'*88) movie_data = [{ 'title': f"House PartiQL", 'year': datetime.now().year - 5, 'info': { 'plot': "Wacky high jinks result from querying a mysterious database.", 'rating': Decimal('8.5')}}, { 'title': f"House PartiQL 2", 'year': datetime.now().year - 3, 'info': { 'plot': "Moderate high jinks result from querying another mysterious database.", 'rating': Decimal('6.5')}}, { 'title': f"House PartiQL 3", 'year': datetime.now().year - 1, 'info': { 'plot': "Tepid high jinks result from querying yet another mysterious database.", 'rating': Decimal('2.5')}}] print(f"Inserting a batch of movies into table '{table_name}.") statements = [ f"INSERT INTO \"{table_name}\" " f"VALUE {{'title': ?, 'year': ?, 'info': ?}}"] * len(movie_data) params = [list(movie.values()) for movie in movie_data] wrapper.run_partiql(statements, params) print("Success!") print('-'*88) print(f"Getting data for a batch of movies.") statements = [ f"SELECT * FROM \"{table_name}\" WHERE title=? AND year=?"] * len(movie_data) params = [[movie['title'], movie['year']] for movie in movie_data] output = wrapper.run_partiql(statements, params) for item in output['Responses']: print(f"\n{item['Item']['title']}, {item['Item']['year']}") pprint(item['Item']) print('-'*88) ratings = [Decimal('7.7'), Decimal('5.5'), Decimal('1.3')] print(f"Updating a batch of movies with new ratings.") statements = [ f"UPDATE \"{table_name}\" SET info.rating=? " f"WHERE title=? AND year=?"] * len(movie_data) params = [ [rating, movie['title'], movie['year']] for rating, movie in zip(ratings, movie_data)] wrapper.run_partiql(statements, params) print("Success!") print('-'*88) print(f"Getting projected data from the table to verify our update.") output = wrapper.dyn_resource.meta.client.execute_statement( Statement=f'SELECT title, info.rating FROM "{table_name}"') pprint(output['Items']) print('-'*88) print(f"Deleting a batch of movies from the table.") statements = [ f"DELETE FROM \"{table_name}\" WHERE title=? AND year=?"] * len(movie_data) params = [[movie['title'], movie['year']] for movie in movie_data] wrapper.run_partiql(statements, params) print("Success!") print('-'*88) print(f"Deleting table '{table_name}'...") scaffold.delete_table() print('-'*88) print("\nThanks for watching!") print('-'*88) if __name__ == '__main__': try: dyn_res = boto3.resource('dynamodb') scaffold = Scaffold(dyn_res) movies = PartiQLBatchWrapper(dyn_res) run_scenario(scaffold, movies, 'doc-example-table-partiql-movies') except Exception as e: print(f"Something went wrong with the demo! Here's what: {e}")
-
有关 API 详细信息,请参阅《适用于 Python (Boto3) 的 Amazon SDK API 参考》中的 BatchExecuteStatement。
-
有关 Amazon 软件开发工具包开发人员指南和代码示例的完整列表,请参阅 结合使用 DynamoDB 与 Amazon SDK。本主题还包括有关入门的信息以及有关先前的软件开发工具包版本的详细信息。