

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Python 和 ElastiCache
<a name="ElastiCache-Getting-Started-Tutorials-Python"></a>

在本教程中，您将使用适用于 Python 的Amazon SDK (Boto3) 来编写用于执行以下操作的简单程序： ElastiCache 
+  ElastiCache 为 Redis OSS 集群创建（启用集群模式和禁用集群模式）
+ 检查用户或用户组是否存在，如不存在，请进行创建。（此功能适用于 Valkey 7.2 和更高版本，以及 Redis OSS 6.0 到 7.1。）
+ 连接到 ElastiCache
+ 执行诸如设置和获取字符串、从 Steam 中读取和写入以及从频道发布和订阅等操作。 Pub/Sub 

在学习本教程时，你可以参考适用于 Python 的Amazon软件开发工具包 (Boto) 文档。以下部分特定于 ElastiCache：[ElastiCache 低级客户端](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/elasticache.html)

## 教程的先决条件
<a name="ElastiCache-Getting-Started-Tutorials-Prerquisites"></a>
+ 设置Amazon访问密钥以使用Amazon SDKs。有关更多信息，请参阅 [设置 ElastiCache](set-up.md)。
+ 安装 Python 3.0 或更高版本。欲了解更多信息，请参阅 [https://www.python.org/downloads](https://www.python.org/downloads)。有关说明，请参阅 Boto 3 文档中的[快速入门](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html)。

**Topics**
+ [

## 教程的先决条件
](#ElastiCache-Getting-Started-Tutorials-Prerquisites)
+ [

## 教程：创建 ElastiCache 集群和用户
](#ElastiCache-Getting-Started-Tutorials-Create-Cluster-and-Users)
+ [

## 教程：连接到 ElastiCache
](#ElastiCache-Getting-Started-Tutorials-Connecting)
+ [

## 用法示例
](#ElastiCache-Getting-Started-Tutorials-Usage)

## 教程：创建 ElastiCache 集群和用户
<a name="ElastiCache-Getting-Started-Tutorials-Create-Cluster-and-Users"></a>

以下示例使用 boto3 开发工具包进行 Redis OSS 管理操作（创建集群或用户），使用 redis redis-py-cluster -py/ 进行数据处理。 ElastiCache 

**Topics**
+ [

### 创建已禁用集群模式的集群
](#ElastiCache-Getting-Started-Tutorials-Create-Cluster)
+ [

### 创建采用 TLS 和 RBAC 且已禁用集群模式的集群
](#ElastiCache-Getting-Started-Tutorials-RBAC)
+ [

### 创建已启用集群模式的集群
](#ElastiCache-Getting-Started-Tutorials-Cluster-Enabled)
+ [

### 创建采用 TLS 和 RBAC 且已启用集群模式的集群
](#ElastiCache-Getting-Started-Tutorials-Cluster-RBAC)
+ [

### 检查是否 users/usergroup 存在，否则请创建它们
](#ElastiCache-Getting-Started-Tutorials-Users)

### 创建已禁用集群模式的集群
<a name="ElastiCache-Getting-Started-Tutorials-Create-Cluster"></a>

复制以下程序并将其粘贴到名为 *CreateClusterModeDisabledCluster.py* 的文件中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cluster',ReplicationGroupId=None):
    """Creates an ElastiCache Cluster with cluster mode disabled

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas).
    If not specified, cluster will be created with 1 primary and 1 replica.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Name for the cluster
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return 'ReplicationGroupId parameter is required'

    response = client.create_replication_group(
        AutomaticFailoverEnabled=True,
        CacheNodeType=CacheNodeType,
        Engine='valkey',
        EngineVersion=EngineVersion,
        NumCacheClusters=NumCacheClusters,
        ReplicationGroupDescription=ReplicationGroupDescription,
        ReplicationGroupId=ReplicationGroupId,
        SnapshotRetentionLimit=30,
    )
    return response


if __name__ == '__main__':
    
    # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas
    elasticacheResponse = create_cluster_mode_disabled(
        #CacheNodeType='cache.m6g.large', 
        EngineVersion='8.0',
        NumCacheClusters=3,
        ReplicationGroupDescription='Valkey cluster mode disabled with replicas',
        ReplicationGroupId='valkey202104053'
        )
    
    logging.info(elasticacheResponse)
```

要运行该程序，请输入以下命令：

 `python CreateClusterModeDisabledCluster.py`

有关更多信息，请参阅 [在 ElastiCache 中管理集群](Clusters.md)。

### 创建采用 TLS 和 RBAC 且已禁用集群模式的集群
<a name="ElastiCache-Getting-Started-Tutorials-RBAC"></a>

为确保安全性，在创建已禁用集群模式的集群时，您可以使用传输层安全性 (TLS) 和基于角色的访问控制 (RBAC)。与 Valkey 或 Redis OSS AUTH（如果对客户端令牌进行身份验证，则所有经过身份验证的客户端都对复制组具有完全的访问权限）不同的是，RBAC 让您能够通过用户组控制集群访问权限。这些用户组旨在作为一种管理对复制组的访问权限的方式。有关更多信息，请参阅 [基于角色的访问控制（RBAC）](Clusters.RBAC.md)。

复制以下程序并将其粘贴到名为 *ClusterModeDisabledWithRBAC.py* 的文件中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None):
    """Creates an ElastiCache Cluster with cluster mode disabled and RBAC

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas).
    If not specified, cluster will be created with 1 primary and 1 replica.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Mandatory name for the cluster.
    :param UserGroupIds: The ID of the user group to be assigned to the cluster.
    :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used
    :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used.
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return {'Error': 'ReplicationGroupId parameter is required'}
    elif not isinstance(UserGroupIds,(list)):
        return {'Error': 'UserGroupIds parameter is required and must be a list'}

    params={'AutomaticFailoverEnabled': True, 
            'CacheNodeType': CacheNodeType, 
            'Engine': 'valkey', 
            'EngineVersion': EngineVersion, 
            'NumCacheClusters': NumCacheClusters, 
            'ReplicationGroupDescription': ReplicationGroupDescription, 
            'ReplicationGroupId': ReplicationGroupId, 
            'SnapshotRetentionLimit': 30, 
            'TransitEncryptionEnabled': True, 
            'UserGroupIds':UserGroupIds
        }

    # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit.
    if isinstance(SecurityGroupIds,(list)):
        params.update({'SecurityGroupIds':SecurityGroupIds})
    if CacheSubnetGroupName:
        params.update({'CacheSubnetGroupName':CacheSubnetGroupName})

    response = client.create_replication_group(**params)
    return response

if __name__ == '__main__':

    # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas.
    # Assigns the existent user group "mygroup" for RBAC authentication
   
    response=create_cluster_mode_disabled_rbac(
        CacheNodeType='cache.m6g.large',
        EngineVersion='8.0',
        NumCacheClusters=3,
        ReplicationGroupDescription='Valkey cluster mode disabled with replicas',
        ReplicationGroupId='valkey202104',
        UserGroupIds=[
            'mygroup'
        ],
        SecurityGroupIds=[
            'sg-7cc73803'
        ],
        CacheSubnetGroupName='default'
    )

    logging.info(response)
```

要运行该程序，请输入以下命令：

 `python ClusterModeDisabledWithRBAC.py`

有关更多信息，请参阅 [在 ElastiCache 中管理集群](Clusters.md)。

### 创建已启用集群模式的集群
<a name="ElastiCache-Getting-Started-Tutorials-Cluster-Enabled"></a>

复制以下程序并将其粘贴到名为 *ClusterModeEnabled.py* 的文件中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None):
    """Creates an ElastiCache Cluster with cluster mode enabled

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90.
    If not specified, cluster will be created with 1 shard.
    :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Name for the cluster
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return 'ReplicationGroupId parameter is required'
    
    response = client.create_replication_group(
        AutomaticFailoverEnabled=True,
        CacheNodeType=CacheNodeType,
        Engine='valkey',
        EngineVersion=EngineVersion,
        ReplicationGroupDescription=ReplicationGroupDescription,
        ReplicationGroupId=ReplicationGroupId,
    #   Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup)
        NumNodeGroups=NumNodeGroups,
        ReplicasPerNodeGroup=ReplicasPerNodeGroup,
        CacheParameterGroupName='default.valkey7.2.cluster.on'
    )

    return response


# Creates a cluster mode enabled 
response = create_cluster_mode_enabled(
    CacheNodeType='cache.m6g.large',
    EngineVersion='6.0',
    ReplicationGroupDescription='Valkey cluster mode enabled with replicas',
    ReplicationGroupId='valkey20210',
#   Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup)
    NumNodeGroups=2,
    ReplicasPerNodeGroup=1,
)

logging.info(response)
```

要运行该程序，请输入以下命令：

 `python ClusterModeEnabled.py`

有关更多信息，请参阅 [在 ElastiCache 中管理集群](Clusters.md)。

### 创建采用 TLS 和 RBAC 且已启用集群模式的集群
<a name="ElastiCache-Getting-Started-Tutorials-Cluster-RBAC"></a>

为确保安全性，在创建已启用集群模式的集群时，您可以使用传输层安全性 (TLS) 和基于角色的访问控制 (RBAC)。与 Valkey 或 Redis OSS AUTH（如果对客户端令牌进行身份验证，则所有经过身份验证的客户端都对复制组具有完全的访问权限）不同的是，RBAC 让您能够通过用户组控制集群访问权限。这些用户组旨在作为一种管理对复制组的访问权限的方式。有关更多信息，请参阅 [基于角色的访问控制（RBAC）](Clusters.RBAC.md)。

复制以下程序并将其粘贴到名为 *ClusterModeEnabledWithRBAC.py* 的文件中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'):
    """Creates an ElastiCache Cluster with cluster mode enabled and RBAC

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90.
    If not specified, cluster will be created with 1 shard.
    :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Name for the cluster.
    :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled.
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return 'ReplicationGroupId parameter is required'
    elif not isinstance(UserGroupIds,(list)):
        return {'Error': 'UserGroupIds parameter is required and must be a list'}

    params={'AutomaticFailoverEnabled': True, 
            'CacheNodeType': CacheNodeType, 
            'Engine': 'valkey', 
            'EngineVersion': EngineVersion, 
            'ReplicationGroupDescription': ReplicationGroupDescription, 
            'ReplicationGroupId': ReplicationGroupId, 
            'SnapshotRetentionLimit': 30, 
            'TransitEncryptionEnabled': True, 
            'UserGroupIds':UserGroupIds,
            'NumNodeGroups': NumNodeGroups,
            'ReplicasPerNodeGroup': ReplicasPerNodeGroup,
            'CacheParameterGroupName': CacheParameterGroupName
        }

    # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit.
    if isinstance(SecurityGroupIds,(list)):
        params.update({'SecurityGroupIds':SecurityGroupIds})
    if CacheSubnetGroupName:
        params.update({'CacheSubnetGroupName':CacheSubnetGroupName})

    response = client.create_replication_group(**params)
    return response

if __name__ == '__main__':
    # Creates a cluster mode enabled cluster
    response = create_cluster_mode_enabled(
        CacheNodeType='cache.m6g.large',
        EngineVersion='7.2',
        ReplicationGroupDescription='Valkey cluster mode enabled with replicas',
        ReplicationGroupId='valkey2021',
    #   Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup)
        NumNodeGroups=2,
        ReplicasPerNodeGroup=1,
        UserGroupIds=[
            'mygroup'
        ],
        SecurityGroupIds=[
            'sg-7cc73803'
        ],
        CacheSubnetGroupName='default'

    )
    
    logging.info(response)
```

要运行该程序，请输入以下命令：

 `python ClusterModeEnabledWithRBAC.py`

有关更多信息，请参阅 [在 ElastiCache 中管理集群](Clusters.md)。

### 检查是否 users/usergroup 存在，否则请创建它们
<a name="ElastiCache-Getting-Started-Tutorials-Users"></a>

通过 RBAC，您可以使用访问字符串创建用户并为其分配特定权限。您可以将用户分配到与特定角色（管理员、人力资源）对应的用户组，然后将这些用户组部署到一个或多 ElastiCache 个 Redis OSS 复制组中。这样，您可以在使用相同 Valkey 或 Redis OSS 复制组的客户端之间建立安全边界，并阻止客户端彼此访问数据。有关更多信息，请参阅 [基于角色的访问控制（RBAC）](Clusters.RBAC.md)。

复制以下程序并将其粘贴到名为 *UserAndUserGroups.py* 的文件中。更新用于提供凭证的机制。此示例中的凭证显示为可替换凭证，并分配了未声明的项目。避免对凭证进行硬编码。

此示例使用了包含用户权限的访问字符串。有关访问字符串的更多信息，请参阅[使用访问字符串指定权限](Clusters.RBAC.md#Access-string)。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def check_user_exists(UserId):
    """Checks if UserId exists

    Returns True if UserId exists, otherwise False
    :param UserId: ElastiCache User ID
    :return: True|False
    """
    try:
        response = client.describe_users(
            UserId=UserId,
        )
        if response['Users'][0]['UserId'].lower() == UserId.lower():
            return True
    except Exception as e:
        if e.response['Error']['Code'] == 'UserNotFound':
            logging.info(e.response['Error'])
            return False
        else:
            raise

def check_group_exists(UserGroupId):
    """Checks if UserGroupID exists

    Returns True if Group ID exists, otherwise False
    :param UserGroupId: ElastiCache User ID
    :return: True|False
    """

    try:
        response = client.describe_user_groups(
            UserGroupId=UserGroupId
        )
        if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower():
            return True
    except Exception as e:
        if e.response['Error']['Code'] == 'UserGroupNotFound':
            logging.info(e.response['Error'])
            return False
        else:
            raise

def create_user(UserId=None,UserName=None,Password=None,AccessString=None):
    """Creates a new user

    Returns the ARN for the newly created user or the error message
    :param UserId: ElastiCache user ID. User IDs must be unique
    :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique.
    :param Password: Password for user. Must have at least 16 chars.
    :param AccessString: Access string with the permissions for the user. 
    :return: user ARN
    """
    try:
        response = client.create_user(
            UserId=UserId,
            UserName=UserName,
            Engine='Redis',
            Passwords=[Password],
            AccessString=AccessString,
            NoPasswordRequired=False
        )
        return response['ARN']
    except Exception as e:
        logging.info(e.response['Error'])
        return e.response['Error']

def create_group(UserGroupId=None, UserIds=None):
    """Creates a new group.
    A default user is required (mandatory) and should be specified in the UserIds list

    Return: Group ARN
    :param UserIds: List with user IDs to be associated with the new group. A default user is required
    :param UserGroupId: The ID (name) for the group
    :return: Group ARN
    """
    try:
        response = client.create_user_group(
            UserGroupId=UserGroupId,
            Engine='Redis',
            UserIds=UserIds
        )
        return response['ARN']
    except Exception as e:
        logging.info(e.response['Error'])


if __name__ == '__main__':
    
    groupName='mygroup2'
    userName = 'myuser2'
    userId=groupName+'-'+userName

    # Creates a new user if the user ID does not exist.
    for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]:
        if not check_user_exists(tmpUserId):
            response=create_user(UserId=tmpUserId, UserName=EXAMPLE,Password=EXAMPLE,AccessString='on ~* +@all')
            logging.info(response)
        # assigns the new user ID to the user group
    if not check_group_exists(groupName):
        UserIds = [ userId , groupName+'-default']
        response=create_group(UserGroupId=groupName,UserIds=UserIds)
        logging.info(response)
```

要运行该程序，请输入以下命令：

 `python UserAndUserGroups.py`

## 教程：连接到 ElastiCache
<a name="ElastiCache-Getting-Started-Tutorials-Connecting"></a>

以下示例使用 Valkey 或 Redis OSS 客户端进行连接。 ElastiCache

**Topics**
+ [

### 连接到已禁用集群模式的集群
](#ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-disabled)
+ [

### 连接到已启用集群模式的集群
](#ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-enabled)

### 连接到已禁用集群模式的集群
<a name="ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-disabled"></a>

复制以下程序并将其粘贴到名为 *ConnectClusterModeDisabled.py* 的文件中。更新用于提供凭证的机制。此示例中的凭证显示为可替换凭证，并分配了未声明的项目。避免对凭证进行硬编码。

```
from redis import Redis
import logging

logging.basicConfig(level=logging.INFO)
redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=example, password=EXAMPLE)

if redis.ping():
    logging.info("Connected to Redis")
```

要运行该程序，请输入以下命令：

 `python ConnectClusterModeDisabled.py`

### 连接到已启用集群模式的集群
<a name="ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-enabled"></a>

复制以下程序并将其粘贴到名为 *ConnectClusterModeEnabled.py* 的文件中。

```
from rediscluster import RedisCluster
import logging

logging.basicConfig(level=logging.INFO)
redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True)

if redis.ping():
    logging.info("Connected to Redis")
```

要运行该程序，请输入以下命令：

 `python ConnectClusterModeEnabled.py`

## 用法示例
<a name="ElastiCache-Getting-Started-Tutorials-Usage"></a>

以下示例使用适用于 Redis OSS 的 ElastiCache boto3 开发工具包。 ElastiCache 

**Topics**
+ [

### 设置和获取字符串
](#ElastiCache-Getting-Started-Tutorials-set-strings)
+ [

### 设置和获取包含多个项目的哈希
](#ElastiCache-Getting-Started-Tutorials-set-hash)
+ [

### 从 Pub/Sub 频道发布（写入）和订阅（读取）
](#ElastiCache-Getting-Started-Tutorials-pub-sub)
+ [

### 从流中写入和读取
](#ElastiCache-Getting-Started-Tutorials-read-write-stream)

### 设置和获取字符串
<a name="ElastiCache-Getting-Started-Tutorials-set-strings"></a>

复制以下程序并将其粘贴到名为 *SetAndGetStrings.py* 的文件中。

```
import time
import logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')

keyName='mykey'
currTime=time.ctime(time.time())

# Set the key 'mykey' with the current date and time as value. 
# The Key will expire and removed from cache in 60 seconds.
redis.set(keyName, currTime, ex=60)

# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)

# Retrieve the key value and current TTL
keyValue=redis.get(keyName)
keyTTL=redis.ttl(keyName)

logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))
```

要运行该程序，请输入以下命令：

 `python SetAndGetStrings.py`

### 设置和获取包含多个项目的哈希
<a name="ElastiCache-Getting-Started-Tutorials-set-hash"></a>

复制以下程序并将其粘贴到名为 *SetAndGetHash.py* 的文件中。

```
import logging
import time

logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')

keyName='mykey'
keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()}

# Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). 
redis.hset(keyName, mapping=keyValues)

# Set the key to expire and removed from cache in 60 seconds.
redis.expire(keyName, 60)

# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)

# Retrieves all the fields and current TTL
keyValues=redis.hgetall(keyName)
keyTTL=redis.ttl(keyName)

logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))
```

要运行该程序，请输入以下命令：

 `python SetAndGetHash.py`

### 从 Pub/Sub 频道发布（写入）和订阅（读取）
<a name="ElastiCache-Getting-Started-Tutorials-pub-sub"></a>

复制以下程序并将其粘贴到名为 *PubAndSub.py* 的文件中。

```
import logging
import time

def handlerFunction(message):
    """Prints message got from PubSub channel to the log output

    Return None
    :param message: message to log
    """
    logging.info(message)

logging.basicConfig(level=logging.INFO)
redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True)


# Creates the subscriber connection on "mychannel"
subscriber = redis.pubsub()
subscriber.subscribe(**{'mychannel': handlerFunction})

# Creates a new thread to watch for messages while the main process continues with its routines
thread = subscriber.run_in_thread(sleep_time=0.01)

# Creates publisher connection on "mychannel"
redis.publish('mychannel', 'My message')

# Publishes several messages. Subscriber thread will read and print on log.
while True:
    redis.publish('mychannel',time.ctime(time.time()))
    time.sleep(1)
```

要运行该程序，请输入以下命令：

 `python PubAndSub.py`

### 从流中写入和读取
<a name="ElastiCache-Getting-Started-Tutorials-read-write-stream"></a>

复制以下程序并将其粘贴到名为 *ReadWriteStream.py* 的文件中。

```
from redis import Redis
import redis.exceptions as exceptions
import logging
import time
import threading

logging.basicConfig(level=logging.INFO)

def writeMessage(streamName):
    """Starts a loop writting the current time and thread name to 'streamName'

    :param streamName: Stream (key) name to write messages.
    """
    fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None}
    while True:
        fieldsDict['myvalue'] = time.ctime(time.time())
        redis.xadd(streamName,fieldsDict)
        time.sleep(1)

def readMessage(groupName=None,streamName=None):
    """Starts a loop reading from 'streamName'
    Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution.
    Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread.

    :param groupName: stream group were multiple threads will read.
    :param streamName: Stream (key) name where messages will be read.
    """

    readerID=threading.currentThread().getName()
    while True:
        try:
            # Check if the stream has any message
            if redis.xlen(streamName)>0:
                # Check if if the messages are new (not acknowledged) or not (already processed)
                streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1)
                if len(streamData) > 0:
                    msgId,message = streamData[0][1][0]
                    logging.info("{}: Got {} from ID {}".format(readerID,message,msgId))
                    #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message.
                    redis.xack(streamName,groupName,msgId)
                    logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID))
                    redis.xdel(streamName,msgId)
            else:
                pass
        except:
            raise
            
        time.sleep(0.5)

# Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read.
try:
    redis.xgroup_create('mystream','myworkergroup',mkstream=True)
except exceptions.ResponseError as e:
    logging.info("Consumer group already exists. Will continue despite the error: {}".format(e))
except:
    raise

# Starts 5 writer threads.
for writer_no in range(5):
    writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True)
    writerThread.start()

# Starts 10 reader threads
for reader_no in range(10):
    readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True)
    readerThread.daemon = True
    readerThread.start()

# Keep the code running for 30 seconds
time.sleep(30)
```

要运行该程序，请输入以下命令：

 `python ReadWriteStream.py`