受管控表代码示例 - Amazon Lake Formation
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

受管控表代码示例

以下 PySpark 和 Java 代码示例演示如何启动和提交事务,以及在出现异常时如何停止和回滚事务。

注意

对于写入 Apache Parquet,Amazon Glue ETL 仅支持为针对动态帧进行优化的自定义 Parquet 编写器类型指定选项来写入受管表。使用 parquet 格式写入受管表时,应在表参数中添加值为 true 的键 useGlueParquetWriter

PySpark

此示例显示了Amazon Glue任务的 ETL 脚本。它将数据从非受管控表复制到受管控表。需要按照 Amazon Athena 上的说明创建受管表或使用 Amazon Athena 的说明创建受管控表。

import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) db = "retail" tbl = "inventory" tx_id = glueContext.start_transaction(False) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = db, table_name = tbl, transformation_ctx = "datasource0") datasource0.show() dest_path = "s3://path_to_sales_retail_data/" try: glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "retail", table_name = "inventory-governed", additional_options = { "transactionId":tx_id } ) glueContext.commit_transaction(tx_id) except Exception: glueContext.cancel_transaction(tx_id) raise job.commit()

下一个示例演示了在Amazon Glue流式 ETL 任务中使用事务。

import sys from awsglue.transforms import ApplyMapping from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) data_frame_DataSource0 = glueContext.create_data_frame.from_catalog( database = "demo", table_name = "kinesis_cloudtrail_demo", transformation_ctx = "DataSource0", additional_options = { "startingPosition": "TRIM_HORIZON", "inferSchema": "true" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame") dynamic_frame = ApplyMapping.apply( frame = dynamic_frame, mappings = [ ("eventversion", "string", "eventversion", "string"), ("eventtime", "string", "eventtime", "string"), ("eventsource", "string", "eventsource", "string"), ("eventname", "string", "eventname", "string"), ("awsregion", "string", "awsregion", "string"), ("sourceipaddress", "string", "sourceipaddress", "string"), ("useragent", "string", "useragent", "string"), ("errorcode", "string", "errorcode", "string"), ("errormessage", "string", "errormessage", "string"), ("requestid", "string", "requestid", "string"), ("eventid", "string", "eventid", "string"), ("eventtype", "string", "eventtype", "string"), ("apiversion", "string", "apiversion", "string"), ("readonly", "boolean", "readonly", "string"), ("recipientaccountid", "string", "recipientaccountid", "string"), ("sharedeventid", "string", "sharedeventid", "string"), ("vpcendpointid", "string", "vpcendpointid", "string") ], transformation_ctx = "ApplyMapping" ) table_name = "cloudtrail_demo_sample" txId = glueContext.begin_transaction(False) try: glueContext.write_dynamic_frame.from_catalog( frame = dynamic_frame, database = "demo", table_name = table_name, additional_options = { "transactionId":txId } ) glueContext.commit_transaction(txId) except Exception: glueContext.cancel_transaction(txId) raise glueContext.forEachBatch( frame = data_frame_DataSource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://my_checkpoint_bucket/cloudtrail_demo_sample/" } ) job.commit()
Java

以下两个 Java 示例对事务中的受管控表使用getTableObjectsupdateTableObjects对象 API 操作。有关这些 API 操作的信息,请参阅 API 文档

第一个示例将有关新分区的信息添加到现有的受管控表中。

导入
import com.amazonaws.services.lakeformation.AWSLakeFormation; import com.amazonaws.services.lakeformation.AWSLakeFormationClientBuilder; import com.amazonaws.services.lakeformation.model.CancelTransactionRequest; import com.amazonaws.services.lakeformation.model.AddObjectInput; import com.amazonaws.services.lakeformation.model.StartTransactionRequest; import com.amazonaws.services.lakeformation.model.StartTransactionResult; import com.amazonaws.services.lakeformation.model.CommitTransactionRequest; import com.amazonaws.services.lakeformation.model.UpdateTableObjectsRequest; import com.amazonaws.services.lakeformation.model.WriteOperation;
代码
AWSLakeFormation lake = AWSLakeFormationClientBuilder.standard().build(); // Begin transaction BeginTransactionResult startTransactionResult = lake.startTransaction(new StartTransactionRequest()); String transactionId = startTransactionResult.getTransactionId(); // Construct a write operation WriteOperation write = new WriteOperation() .withAddObject(new AddObjectInput() .withPartitionValues("par0", "par1") .withUri("s3://bucket/prefix") .withETag("eTag") .withSize(100L)); // Save a partition table object try { lake.updateTableObjects(new UpdateTableObjectsRequest() .withDatabaseName(databaseName) .withTableName(tableName) .withTransactionId(transactionId) .withWriteOperations(write)); // Commit transaction lake.commitTransaction(new CommitTransactionRequest().withTransactionId(transactionId)); } catch (Exception e) { // Abort transaction lake.cancelTransaction(new CancelTransactionRequest().withTransactionId(transactionId)); }

下一个示例检索与受管控表关联的所有 Amazon S3 对象并对其进行处理。

导入
import com.amazonaws.services.lakeformation.AWSLakeFormation; import com.amazonaws.services.lakeformation.AWSLakeFormationClientBuilder; import com.amazonaws.services.lakeformation.model.StartTransactionRequest; import com.amazonaws.services.lakeformation.model.StartTransactionResult; import com.amazonaws.services.lakeformation.model.CommitTransactionRequest; import com.amazonaws.services.lakeformation.model.GetTableObjectsRequest; import com.amazonaws.services.lakeformation.model.GetTableObjectsResult; import com.amazonaws.services.lakeformation.model.PartitionObjects; import com.amazonaws.services.lakeformation.model.TableObject;
代码
AWSLakeFormation lake = AWSLakeFormationClientBuilder.standard().build(); // Start read only transaction StartTransactionResult startTransactionResult = lake.startTransaction(new StartTransactionRequest().withReadOnly(true)); String transactionId = startTransactionResult.getTransactionId(); // Read all table objects from a table GetTableObjectsResult getTableObjectsResult = lake.getTableObjects( new GetTableObjectsRequest() .withTransactionId(transactionId) .withDatabaseName(databaseName) .withTableName(tableName)); for (PartitionObjects partitionObjects: getTableObjectsResult.getObjects()) { for (TableObject tableObject: partitionObjects.getObjects()) { // do something with the data } } // Commit transaction lake.commitTransaction(new CommitTransactionRequest().withTransactionId(transactionId));