使用 Firehose API 和适用于 PHP 的 Amazon SDK 版本 3 来创建传输流
Amazon Data Firehose 支持将实时数据发送到其他 Amazon 服务,包括 Amazon Kinesis Data Streams、Amazon S3、Amazon OpenSearch Service(OpenSearch Service)和 Amazon Redshift,或者发送到 Splunk。使用传输流创建一个数据创建器,以在您每次添加数据时将数据传送到配置的目的地。
以下示例演示如何:
-
使用 CreateDeliveryStream 创建传输流。
-
使用 DescribeDeliveryStream 获取有关单个传输流的详细信息。
-
使用 ListDeliveryStreams 列出传输流。
-
使用 PutRecord 将数据发送到传输流。
-
使用 DeleteDeliveryStream 删除传输流。
适用于 PHP 的 Amazon SDKGitHub 上提供了
凭证
运行示例代码之前,请配置您的 Amazon 凭证,如 通过适用于 PHP 的 Amazon SDK 版本 3 使用 Amazon 进行身份验证 中所述。然后导入 适用于 PHP 的 Amazon SDK,如 安装适用于 PHP 的 Amazon SDK 版本 3 中所述。
有关使用 Amazon Data Firehose 的更多信息,请参阅 Amazon Kinesis Data Firehose 开发人员指南。
使用 Kinesis 数据流来创建传输流
要建立将数据放入现有 Kinesis 数据流的传输流,请使用 CreateDeliveryStream 操作。
这使开发人员能够将现有 Kinesis 服务迁移到 Firehose。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; $stream_type = "KinesisStreamAsSource"; $kinesis_stream = "arn:aws:kinesis:us-east-2:0123456789:stream/my_stream_name"; $role = "arn:aws:iam::0123456789:policy/Role"; try { $result = $firehoseClient->createDeliveryStream([ 'DeliveryStreamName' => $name, 'DeliveryStreamType' => $stream_type, 'KinesisStreamSourceConfiguration' => [ 'KinesisStreamARN' => $kinesis_stream, 'RoleARN' => $role, ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
使用 Amazon S3 存储桶来创建传输流
要建立将数据放入现有 Amazon S3 存储桶的传输流,请使用 CreateDeliveryStream 操作。
提供目标参数,如目标参数中所述。然后,确保向 Firehose 授予访问 Amazon S3 存储桶的权限,如向 Kinesis Data Firehose 授予访问 Amazon S3 目标的权限中所述。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_S3_stream_name"; $stream_type = "DirectPut"; $s3bucket = 'arn:aws:s3:::bucket_name'; $s3Role = 'arn:aws:iam::0123456789:policy/Role'; try { $result = $firehoseClient->createDeliveryStream([ 'DeliveryStreamName' => $name, 'DeliveryStreamType' => $stream_type, 'S3DestinationConfiguration' => [ 'BucketARN' => $s3bucket, 'CloudWatchLoggingOptions' => [ 'Enabled' => false, ], 'RoleARN' => $s3Role ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
使用 OpenSearch Service 来创建传输流
要建立将数据放入 OpenSearch Service 集群的 Firehose 传输流,请使用 CreateDeliveryStream 操作。
提供目标参数,如目标参数中所述。确保向 Firehose 授予访问 OpenSearch Service 集群的权限,如向 Kinesis Data Firehose 授予访问 Amazon ES 目标的权限中所述。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_ES_stream_name"; $stream_type = "DirectPut"; $esDomainARN = 'arn:aws:es:us-east-2:0123456789:domain/Name'; $esRole = 'arn:aws:iam::0123456789:policy/Role'; $esIndex = 'root'; $esType = 'PHP_SDK'; $s3bucket = 'arn:aws:s3:::bucket_name'; $s3Role = 'arn:aws:iam::0123456789:policy/Role'; try { $result = $firehoseClient->createDeliveryStream([ 'DeliveryStreamName' => $name, 'DeliveryStreamType' => $stream_type, 'ElasticsearchDestinationConfiguration' => [ 'DomainARN' => $esDomainARN, 'IndexName' => $esIndex, 'RoleARN' => $esRole, 'S3Configuration' => [ 'BucketARN' => $s3bucket, 'CloudWatchLoggingOptions' => [ 'Enabled' => false, ], 'RoleARN' => $s3Role, ], 'TypeName' => $esType, ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
检索传输流
要获取有关现有 Firehose 传输流的详细信息,请使用 DescribeDeliveryStream 操作。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; try { $result = $firehoseClient->describeDeliveryStream([ 'DeliveryStreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
列出已连接到 Kinesis Data Streams 的现有传输流
要列出所有将数据发送到 Kinesis Data Streams 的现有 Firehose 传输流,请使用 ListDeliveryStreams 操作。
导入。
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); try { $result = $firehoseClient->listDeliveryStreams([ 'DeliveryStreamType' => 'KinesisStreamAsSource', ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
列出将数据发送到其他 Amazon 服务的现有传输流
要列出所有将数据发送到 Amazon S3、OpenSearch Service、Amazon Redshift,或者发送到 Splunk 的现有 Firehose 传输流,请使用 ListDeliveryStreams 操作。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); try { $result = $firehoseClient->listDeliveryStreams([ 'DeliveryStreamType' => 'DirectPut', ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
将数据发送到现有 Firehose 传输流
要通过 Firehose 传输流将数据发送到指定目的地,请在创建 Firehose 传输流之后使用 PutRecord 操作。
在将数据发送到 Firehose 传输流之前,请使用 DescribeDeliveryStream 来了解该传输流是否处于活动状态。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; $content = '{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}'; try { $result = $firehoseClient->putRecord([ 'DeliveryStreamName' => $name, 'Record' => [ 'Data' => $content, ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }
删除 Firehose 传输流
要删除 Firehose 传输流,请使用 DeleteDeliveryStreams 操作。这也将删除您已发送到该传输流的所有数据。
导入
require 'vendor/autoload.php'; use Aws\Exception\AwsException;
示例代码
$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; try { $result = $firehoseClient->deleteDeliveryStream([ 'DeliveryStreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }