使用 Amazon SDK 通过 DAX 加快 DynamoDB 读取速度 - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Amazon SDK 通过 DAX 加快 DynamoDB 读取速度

以下代码示例显示了如何:

  • 同时使用 DAX 和 SDK 客户端创建数据并将其写入表。

  • 同时使用 DAX 和 SDK 客户端获取、查询和扫描表,并比较其性能。

有关更多信息,请参阅使用 DynamoDB Accelerator ke'hu'd客户端进行开发

Python
适用于 Python (Boto3) 的 SDK
注意

在 GitHub 上查看更多内容。在 Amazon 代码示例存储库 中查找完整示例,了解如何进行设置和运行。

使用 DAX 或 Boto3 客户端创建一个表。

import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table_name = 'TryDaxTable' params = { 'TableName': table_name, 'KeySchema': [ {'AttributeName': 'partition_key', 'KeyType': 'HASH'}, {'AttributeName': 'sort_key', 'KeyType': 'RANGE'} ], 'AttributeDefinitions': [ {'AttributeName': 'partition_key', 'AttributeType': 'N'}, {'AttributeName': 'sort_key', 'AttributeType': 'N'} ], 'ProvisionedThroughput': { 'ReadCapacityUnits': 10, 'WriteCapacityUnits': 10 } } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == '__main__': dax_table = create_dax_table() print(f"Created table.")

将测试数据写入表中。

import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') some_data = 'X' * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item(Item={ 'partition_key': partition_key, 'sort_key': sort_key, 'some_data': some_data }) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == '__main__': write_key_count = 10 write_item_size = 1000 print(f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters.") write_data_to_dax_table(write_key_count, write_item_size)

获取项目以查看 DAX 客户端和 Boto3 客户端的多次迭代,并报告每个客户端花费的时间。

import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item(Key={ 'partition_key': partition_key, 'sort_key': sort_key }) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print(f"Getting each item from the table {test_iterations} times, " f"using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax) else: print(f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client.") test_start, test_end = get_item_test( test_key_count, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}.")

在表中查询 DAX 客户端和 Boto3 客户端的多次迭代,并报告每个客户端花费的时间。

import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') key_condition_expression = \ Key('partition_key').eq(partition_key) & \ Key('sort_key').between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")

扫描表以查看 DAX 客户端和 Boto3 客户端的多次迭代,并报告每个客户端花费的时间。

import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') start = time.perf_counter() for _ in range(iterations): table.scan() print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")

删除 表。

import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == '__main__': delete_dax_table() print("Table deleted!")

有关 Amazon 软件开发工具包开发人员指南和代码示例的完整列表,请参阅 结合使用 DynamoDB 与 Amazon SDK。本主题还包括有关入门的信息以及有关先前的软件开发工具包版本的详细信息。