使用 Firehose API 和适用于 PHP 的 Amazon SDK 版本 3 来创建传输流 - 适用于 PHP 的 Amazon SDK
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Firehose API 和适用于 PHP 的 Amazon SDK 版本 3 来创建传输流

Amazon Data Firehose 支持将实时数据发送到其他 Amazon 服务,包括 Amazon Kinesis Data Streams、Amazon S3、Amazon OpenSearch Service(OpenSearch Service)和 Amazon Redshift,或者发送到 Splunk。使用传输流创建一个数据创建器,以在您每次添加数据时将数据传送到配置的目的地。

以下示例演示如何:

适用于 PHP 的 Amazon SDKGitHub 上提供了的所有示例代码。

凭证

运行示例代码之前,请配置您的 Amazon 凭证,如 通过适用于 PHP 的 Amazon SDK 版本 3 使用 Amazon 进行身份验证 中所述。然后导入 适用于 PHP 的 Amazon SDK,如 安装适用于 PHP 的 Amazon SDK 版本 3 中所述。

有关使用 Amazon Data Firehose 的更多信息,请参阅 Amazon Kinesis Data Firehose 开发人员指南

使用 Kinesis 数据流来创建传输流

要建立将数据放入现有 Kinesis 数据流的传输流,请使用 CreateDeliveryStream 操作。

这使开发人员能够将现有 Kinesis 服务迁移到 Firehose。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; $stream_type = "KinesisStreamAsSource"; $kinesis_stream = "arn:aws:kinesis:us-east-2:0123456789:stream/my_stream_name"; $role = "arn:aws:iam::0123456789:policy/Role"; try { $result = $firehoseClient->createDeliveryStream([ 'DeliveryStreamName' => $name, 'DeliveryStreamType' => $stream_type, 'KinesisStreamSourceConfiguration' => [ 'KinesisStreamARN' => $kinesis_stream, 'RoleARN' => $role, ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

使用 Amazon S3 存储桶来创建传输流

要建立将数据放入现有 Amazon S3 存储桶的传输流,请使用 CreateDeliveryStream 操作。

提供目标参数,如目标参数中所述。然后,确保向 Firehose 授予访问 Amazon S3 存储桶的权限,如向 Kinesis Data Firehose 授予访问 Amazon S3 目标的权限中所述。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_S3_stream_name"; $stream_type = "DirectPut"; $s3bucket = 'arn:aws:s3:::bucket_name'; $s3Role = 'arn:aws:iam::0123456789:policy/Role'; try { $result = $firehoseClient->createDeliveryStream([ 'DeliveryStreamName' => $name, 'DeliveryStreamType' => $stream_type, 'S3DestinationConfiguration' => [ 'BucketARN' => $s3bucket, 'CloudWatchLoggingOptions' => [ 'Enabled' => false, ], 'RoleARN' => $s3Role ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

使用 OpenSearch Service 来创建传输流

要建立将数据放入 OpenSearch Service 集群的 Firehose 传输流,请使用 CreateDeliveryStream 操作。

提供目标参数,如目标参数中所述。确保向 Firehose 授予访问 OpenSearch Service 集群的权限,如向 Kinesis Data Firehose 授予访问 Amazon ES 目标的权限中所述。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_ES_stream_name"; $stream_type = "DirectPut"; $esDomainARN = 'arn:aws:es:us-east-2:0123456789:domain/Name'; $esRole = 'arn:aws:iam::0123456789:policy/Role'; $esIndex = 'root'; $esType = 'PHP_SDK'; $s3bucket = 'arn:aws:s3:::bucket_name'; $s3Role = 'arn:aws:iam::0123456789:policy/Role'; try { $result = $firehoseClient->createDeliveryStream([ 'DeliveryStreamName' => $name, 'DeliveryStreamType' => $stream_type, 'ElasticsearchDestinationConfiguration' => [ 'DomainARN' => $esDomainARN, 'IndexName' => $esIndex, 'RoleARN' => $esRole, 'S3Configuration' => [ 'BucketARN' => $s3bucket, 'CloudWatchLoggingOptions' => [ 'Enabled' => false, ], 'RoleARN' => $s3Role, ], 'TypeName' => $esType, ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

检索传输流

要获取有关现有 Firehose 传输流的详细信息,请使用 DescribeDeliveryStream 操作。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; try { $result = $firehoseClient->describeDeliveryStream([ 'DeliveryStreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

列出已连接到 Kinesis Data Streams 的现有传输流

要列出所有将数据发送到 Kinesis Data Streams 的现有 Firehose 传输流,请使用 ListDeliveryStreams 操作。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); try { $result = $firehoseClient->listDeliveryStreams([ 'DeliveryStreamType' => 'KinesisStreamAsSource', ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

列出将数据发送到其他 Amazon 服务的现有传输流

要列出所有将数据发送到 Amazon S3、OpenSearch Service、Amazon Redshift,或者发送到 Splunk 的现有 Firehose 传输流,请使用 ListDeliveryStreams 操作。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); try { $result = $firehoseClient->listDeliveryStreams([ 'DeliveryStreamType' => 'DirectPut', ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

将数据发送到现有 Firehose 传输流

要通过 Firehose 传输流将数据发送到指定目的地,请在创建 Firehose 传输流之后使用 PutRecord 操作。

在将数据发送到 Firehose 传输流之前,请使用 DescribeDeliveryStream 来了解该传输流是否处于活动状态。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; $content = '{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}'; try { $result = $firehoseClient->putRecord([ 'DeliveryStreamName' => $name, 'Record' => [ 'Data' => $content, ], ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }

删除 Firehose 传输流

要删除 Firehose 传输流,请使用 DeleteDeliveryStreams 操作。这也将删除您已发送到该传输流的所有数据。

导入

require 'vendor/autoload.php'; use Aws\Exception\AwsException;

示例代码

$firehoseClient = new Aws\Firehose\FirehoseClient([ 'profile' => 'default', 'version' => '2015-08-04', 'region' => 'us-east-2' ]); $name = "my_stream_name"; try { $result = $firehoseClient->deleteDeliveryStream([ 'DeliveryStreamName' => $name, ]); var_dump($result); } catch (AwsException $e) { // output error message if fails echo $e->getMessage(); echo "\n"; }