使用 Kinesis Data Streams API 和 适用于 PHP 的 Amazon SDK 版本 3 来创建数据流
Amazon Kinesis Data Streams 允许发送实时数据。使用 Kinesis Data Streams 来创建数据创建器,该创建器在每次添加数据时将数据发送到配置的目的地。
有关更多信息,请参阅 Amazon Kinesis 开发人员指南中的创建和管理流。
以下示例演示如何:
-
使用 CreateAlias 创建数据流。
-
使用 DescribeStream 获取有关单个数据流的详细信息。
-
使用 ListStreams 列出现有数据流。
-
使用 PutRecord 将数据发送到现有数据流。
-
使用 DeleteStream 删除数据流。
适用于 PHP 的 Amazon SDKGitHub 上提供了
凭证
运行示例代码之前,请配置您的 Amazon 凭证,如 通过适用于 PHP 的 Amazon SDK 版本 3 使用 Amazon 进行身份验证 中所述。然后导入 适用于 PHP 的 Amazon SDK,如 安装适用于 PHP 的 Amazon SDK 版本 3 中所述。
有关使用 Amazon Kinesis 开发人员指南的更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南。
使用 Kinesis 数据流来创建数据流
建立 Kinesis 数据流,您可以使用以下代码示例来在其中发送要由 Kinesis 处理的信息。要了解更多信息,请参阅 Amazon Kinesis 开发人员指南中的创建和更新数据流。
要创建 Kinesis 数据流,请使用 CreateStream 操作。
导入。
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$kinesisClient = new Aws\Kinesis\KinesisClient([ 'profile' => 'default', 'version' => '2013-12-02', 'region' => 'us-east-2' ]); $shardCount = 2; $name = "my_stream_name"; try { $result = $kinesisClient->createStream([ 'ShardCount' => $shardCount, 'StreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
检索数据流
获取有关使用以下代码示例的现有数据流的详细信息。默认情况下,这将返回有关连接到指定 Kinesis 数据流的前 10 个分片的信息。在将数据写入 Kinesis 数据流之前,请记住检查响应中的 StreamStatus。
要检索有关指定 Kinesis 数据流的详细信息,请使用 DescribeStream 操作。
导入。
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$kinesisClient = new Aws\Kinesis\KinesisClient([ 'profile' => 'default', 'version' => '2013-12-02', 'region' => 'us-east-2' ]); $name = "my_stream_name"; try { $result = $kinesisClient->describeStream([ 'StreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
列出已连接到 Kinesis 的现有数据流
列出所选 Amazon 区域中您 Amazon Web Services 账户 的前 10 个数据流。使用返回的 `HasMoreStreams 来确定是否有更多与您的账户关联的流。
要列出 Kinesis 数据流,请使用 ListStreams 操作。
导入。
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$kinesisClient = new Aws\Kinesis\KinesisClient([ 'profile' => 'default', 'version' => '2013-12-02', 'region' => 'us-east-2' ]); try { $result = $kinesisClient->listStreams(); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
将数据发送到现有数据流
在创建数据流后,请使用以下示例来发送数据。在向其发送数据之前,请使用 DescribeStream 检查数据 StreamStatus 是否处于活动状态。
要将单条数据记录写入 Kinesis 数据流,请使用 PutRecord 操作。要将最多 500 条记录写入 Kinesis 数据流,请使用 PutRecords 操作。
导入。
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$kinesisClient = new Aws\Kinesis\KinesisClient([ 'profile' => 'default', 'version' => '2013-12-02', 'region' => 'us-east-1' ]); $name = "my_stream_name"; $content = '{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}'; $groupID = "input to a hash function that maps the partition key (and associated data) to a specific shard"; try { $result = $kinesisClient->PutRecord([ 'Data' => $content, 'StreamName' => $name, 'PartitionKey' => $groupID ]); print("<p>ShardID = " . $result["ShardId"] . "</p>"); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
删除数据流
此示例演示如何删除数据流。删除数据流也会删除您发送到数据流的所有数据。活动 Kinesis 数据流切换到 DELETING 状态,直到完成流删除操作。在处于 DELETING 状态时,流将继续处理数据。
要删除 Kinesis 数据流,请使用 DeleteStream 操作。
导入。
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$kinesisClient = new Aws\Kinesis\KinesisClient([ 'profile' => 'default', 'version' => '2013-12-02', 'region' => 'us-east-2' ]); $name = "my_stream_name"; try { $result = $kinesisClient->deleteStream([ 'StreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }