

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用安装有 Delta Lake 的集群
<a name="Deltausing-cluster"></a>

**Topics**
+ [

# 将 Delta Lake 集群与 Flink 结合使用
](Deltacluster-flink.md)
+ [

# 将 Delta Lake 集群与 Trino 结合使用
](Deltacluster-trino.md)
+ [

# 将 Delta Lake 集群与 Spark 结合使用
](Deltausing-cluster-spark.md)
+ [

# 使用带有 Spark 和 Amazon Glue 的三角洲湖集群
](Deltacluster-spark-glue.md)

# 将 Delta Lake 集群与 Flink 结合使用
<a name="Deltacluster-flink"></a>

从 Amazon EMR 6.11 版本开始，您可以将 Delta Lake 与您的 Flink 集群结合使用。以下示例使用在 Amazon EMR Flink 集群上使用 Delta Lake。 Amazon CLI 

**注意**  
当你将 Delta Lake 与 Flink 集群配合使用时，亚马逊 EMR 支持 Flink DataStream API。

## 创建 Delta Lake 集群
<a name="Deltacreate-a-delta-cluster"></a>

1. 创建文件 `delta_configurations.json` 并输入以下内容：

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. 使用以下配置创建集群。在该 URL 中，将 `example Amazon S3 bucket path` 和 `subnet ID` 替换为您自己的值。

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## 初始化 Flink yarn 会话
<a name="Deltainit-flink-yarn"></a>

要初始化 Flink yarn 会话，请运行以下命令：

```
flink-yarn-session -d
```

## 使用 Delta Lake 创建 Flink 作业
<a name="Deltabuild-flink-with-delta-lake"></a>

以下示例展示如何使用 sbt 或 Maven 在 Delta Lake 中构建 Flink 作业。

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html) 是 Scala 的构建工具，当您处理小型项目时，只需很少甚至不需要配置即可使用。

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org) 是 Apache Software Foundation 推出的开源构建自动化工具。使用 Maven，您可以在 Amazon EMR 上使用 Delta Lake 构建、发布和部署 Flink 作业。

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## 通过 Flink Datastream API 写入 Delta 表
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

使用以下示例创建要写入 DeltaSink 到带有 a 的表中 `deltaTablePath:`

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## 通过 Flink Datastream API 从 Delta 表中读取
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

使用以下示例创建要从表中读 DeltaSource 取的有界值，其中带有 `deltaTablePath:`

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## 使用对 Delta Lake 独立版的多集群支持创建接收器
<a name="Deltasink-creation-with-multi-cluster"></a>

使用以下示例创建 DeltaSink [支持`deltaTablePath`和多集群](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup)的待写入表：

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## 运行 Flink 作业
<a name="Deltarun-flink-job"></a>

使用下列命令以运行您的作业：

```
flink run FlinkJob.jar
```

# 将 Delta Lake 集群与 Trino 结合使用
<a name="Deltacluster-trino"></a>

从 Amazon EMR 6.9.0 及更高版本开始，您可以将 Delta Lake 与您的 Trino 集群结合使用。

在本教程中，我们将使用在 Amazon EMR Trino 集群上使用 Delta Lake。 Amazon CLI 

## 
<a name="Deltacluster-trino-create"></a>

**创建 Delta Lake 集群**

1. 创建文件 `delta_configurations.json`，然后为您选择的目录设置值。例如，假设您想将 Hive 元存储作为目录使用，则您的文件应包含以下内容：

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}},  
       {"Classification":"trino-connector-delta",  
       "Properties":{"hive.metastore.uri":"thrift://localhost:9083"}}]
   ```

   如果您想使用 Amazon Glue 目录作为商店，您的文件应包含以下内容：

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}},  
       {"Classification":"trino-connector-delta",  
       "Properties":{"hive.metastore":"glue"}}]
   ```

1. 使用以下配置创建集群，将 **example Amazon S3 bucket path** 和 **subnet ID** 替换为您自己的值。

   ```
   aws emr create-cluster 
       --release-label emr-6.9.0   
       --applications Name=Trino  
       --configurations file://delta_configurations.json   
       --region us-east-1  --name My_Spark_Delta_Cluster  
       --log-uri  s3://amzn-s3-demo-bucket/  
       --instance-type m5.xlarge  
       --instance-count 2   
       --service-role EMR_DefaultRole_V2  
       --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## 初始化 Delta Lake 的 Trino 会话
<a name="Deltainitialize-trino"></a>

要初始化 Trino 会话，请运行以下命令。

```
trino-cli --catalog delta
```

## 写入 Delta Lake 表
<a name="Deltatrino-write-table"></a>

使用以下 SQL 命令创建并写入您的表：

```
SHOW SCHEMAS;

CREATE TABLE default.delta_table (id  int, data varchar, category varchar) WITH 
( location =  's3://amzn-s3-demo-bucket/<prefix>');

INSERT INTO default.delta_table VALUES  (1,'a','c1'), (2,'b','c2'), (3,'c','c3');
```

## 从 Delta Lake 表中读取
<a name="Deltatrino-read-table"></a>

使用以下 SQL 命令从您的表中读取：

```
SELECT * from default.delta_table;
```

# 将 Delta Lake 集群与 Spark 结合使用
<a name="Deltausing-cluster-spark"></a>

从 Amazon EMR 版本 6.9.0 开始，您可以将 Delta Lake 与 Spark 集群结合使用，无需引导操作。对于 Amazon EMR 6.8.0 及更早版本，您可以使用引导操作来预安装需要的依赖项。

以下示例使用在 Amazon EMR Spark 集群上使用 Delta Lake。 Amazon CLI 

要将 Amazon EMR 上的 Delta Lake 与配合使用 Amazon Command Line Interface，请先创建一个集群。有关如何使用指定 Delta Lake 分类的信息 Amazon Command Line Interface，请参阅[在创建集群 Amazon Command Line Interface 时使用](https://docs.amazonaws.cn/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli)[提供配置或在创建集群时使用 Java SDK](https://docs.amazonaws.cn/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk) 提供配置。

1. 创建文件 `configurations.json` 并输入以下内容：

   ```
   [{"Classification":"delta-defaults",  "Properties":{"delta.enabled":"true"} }]
   ```

1. 使用以下配置创建集群，将示例 Amazon S3 **bucket path** 和 **subnet ID** 替换为您自己的值。

   ```
   aws emr create-cluster 
        --release-label  emr-6.9.0  
        --applications Name=Spark  
        --configurations file://delta_configurations.json   
        --region us-east-1  
        --name My_Spark_Delta_Cluster  
        --log-uri  s3://amzn-s3-demo-bucket/  
        --instance-type m5.xlarge  
        --instance-count 2   
        --service-role EMR_DefaultRole_V2  
        --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

   或者，您可以创建一个 Amazon EMR 集群和 Spark 应用程序，并在 Spark 作业中使用以下文件作为 JAR 依赖项：

   ```
   /usr/share/aws/delta/lib/delta-core.jar,
   /usr/share/aws/delta/lib/delta-storage.jar,    
   /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
   ```
**注意**  
如果您使用的是 Amazon EMR 7.0.0 或更高版本，请改用。`/usr/share/aws/delta/lib/delta-spark.jar` `/usr/share/aws/delta/lib/delta-core.jar`

   有关更多信息，请参阅[提交应用程序](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)。

   要将 jar 依赖项包含在 Spark 任务中，您可以将以下配置属性添加到 Spark 应用程序中：

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

   有关 Spark 任务依赖项的更多信息，请参阅 [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)（依赖项管理）。

   如果您使用的是 Amazon EMR 7.0.0 或更高版本，请改为添加配置。`/usr/share/aws/delta/lib/delta-spark.jar`

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

## 初始化 Delta Lake 的 Spark 会话
<a name="Deltainitialize-spark-session"></a>

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交，或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Delta Lake。

------
#### [ spark-shell ]

1. 使用 SSH 连接到主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.amazonaws.cn/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳，请`spark-shell`替换为`pyspark`。

   ```
   spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   如果您运行 Amazon EMR 6.15.0 或更高版本，则还必须使用以下配置，将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。

   ```
   spark-shell \  
     --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \  
     --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
     --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ spark-submit ]

1. Connect to the primary node using SSH. 有关更多信息，请参阅《Amazon EMR 管理指南**》中的[使用 SSH 连接到主节点](https://docs.amazonaws.cn/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以启动 Delta Lake 的 Spark 会话。

   ```
   spark-submit  
   —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
   —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   如果您运行 Amazon EMR 6.15.0 或更高版本，则还必须使用以下配置，将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。

   ```
   spark-submit \  `
   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension 
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
   --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ EMR Studio notebooks ]

要使用 Amazon EMR Studio Notebooks 初始化 Spark 会话，请使用 Amazon EMR Notebook 中的 **%%configure** 魔术命令配置 Spark 会话，如下例所示。有关更多信息，请参阅 *Amazon EMR 管理指南*中的[使用 EMR Notebooks 魔法命令](https://docs.amazonaws.cn/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)。

```
%%configure -f
{
  "conf": {
    "spark.sql.extensions":  "io.delta.sql.DeltaSparkSessionExtension",
     "spark.sql.catalog.spark_catalog":  "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}
```

如果您运行 Amazon EMR 6.15.0 或更高版本，则还必须使用以下配置，将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。

```
%%configure -f   
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}
```

------

## 写入 Delta Lake 表
<a name="Deltawrite-to-table"></a>

以下示例说明如何创建 DataFrame 并将其写为 Delta Lake 数据集。此示例演示如何使用 Spark Shell 处理数据集，同时使用 SSH 作为默认 hadoop 用户连接到主节点。

**注意**  
要将代码示例粘贴到 Spark Shell 中，请在提示符处键入 :paste，粘贴示例，然后按 CTRL \$1 D。

------
#### [ PySpark ]

Spark 包含基于 Python 的 Shell `pyspark`，您可以用它来设计以 Python 编写的 Spark 程序的原型。就像使用 `spark-shell` 一样，在主节点上调用 `pyspark`。

```
## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])

## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.writeTo("delta_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
 
// Create a DataFrame
val data = Seq(("100",  "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date",  "last_update_time")

// Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.write.format("delta").mode("append").saveAsTable("delta_table")
```

------
#### [ SQL ]

```
-- Create a Delta  Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string, 
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';

-- insert data into the table
INSERT INTO delta_table VALUES  ("100", "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01", "2015-01-01T13:51:40.519832Z");
```

------

## 从 Delta Lake 表中读取
<a name="Deltaread-from-table"></a>

------
#### [ PySpark ]

```
ddf = spark.table("delta_table")
ddf.show()
```

------
#### [ Scala ]

```
val ddf =  spark.table("delta_table")
ddf.show()
```

------
#### [ SQL ]

```
SELECT * FROM delta_table;
```

------

# 使用带有 Spark 和 Amazon Glue 的三角洲湖集群
<a name="Deltacluster-spark-glue"></a>

要使用 Glue Amazon Catalog 作为 Delta Lake 表格的元数据仓，请按照以下步骤创建一个集群。有关使用指定 Delta Lake 分类的信息 Amazon Command Line Interface，请参阅[在创建集群 Amazon Command Line Interface 时使用](https://docs.amazonaws.cn/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli)[提供配置或在创建集群时使用 Java SDK](https://docs.amazonaws.cn/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk) 提供配置。

**创建 Delta Lake 集群**

1. 创建文件 `configurations.json` 并输入以下内容：

   ```
   [{"Classification":"delta-defaults",  
   "Properties":{"delta.enabled":"true"}},
   {"Classification":"spark-hive-site",
   "Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]
   ```

1. 使用以下配置创建集群，将 **example Amazon S3 bucket path** 和 **subnet ID** 替换为您自己的值。

   ```
   aws emr create-cluster 
       --release-label  emr-6.9.0  
       --applications Name=Spark  
       --configurations file://delta_configurations.json 
       --region us-east-1  
       --name My_Spark_Delta_Cluster  
       --log-uri  s3://amzn-s3-demo-bucket/  
       --instance-type m5.xlarge  
       --instance-count 2   
       --service-role EMR_DefaultRole_V2  
       --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```