数据湖交易代码示例 - Amazon Lake Formation
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

数据湖交易代码示例

以下 PySpark Java 代码示例演示了如何启动和提交事务,以及如何在发生异常时停止和回滚事务。

PySpark

此示例显示了用于Amazon Glue任务。它将数据从不受管理的表复制到受管理的表中。需要按照中的说明创建受管理的表创建受管表或者使用 Amazon Athena。

import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) db = "retail" tbl = "inventory" tx_id = glueContext.start_transaction(False) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = db, table_name = tbl, transformation_ctx = "datasource0") datasource0.show() dest_path = "s3://path_to_sales_retail_data/" try: glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "retail", table_name = "inventory-governed", additional_options = { "transactionId":tx_id } ) glueContext.commit_transaction(tx_id) except Exception: glueContext.cancel_transaction(tx_id) raise job.commit()

下面的示例演示了在Amazon Glue流式处理 ETL 作业。

import sys from awsglue.transforms import ApplyMapping from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) data_frame_DataSource0 = glueContext.create_data_frame.from_catalog( database = "demo", table_name = "kinesis_cloudtrail_demo", transformation_ctx = "DataSource0", additional_options = { "startingPosition": "TRIM_HORIZON", "inferSchema": "true" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame") dynamic_frame = ApplyMapping.apply( frame = dynamic_frame, mappings = [ ("eventversion", "string", "eventversion", "string"), ("eventtime", "string", "eventtime", "string"), ("eventsource", "string", "eventsource", "string"), ("eventname", "string", "eventname", "string"), ("awsregion", "string", "awsregion", "string"), ("sourceipaddress", "string", "sourceipaddress", "string"), ("useragent", "string", "useragent", "string"), ("errorcode", "string", "errorcode", "string"), ("errormessage", "string", "errormessage", "string"), ("requestid", "string", "requestid", "string"), ("eventid", "string", "eventid", "string"), ("eventtype", "string", "eventtype", "string"), ("apiversion", "string", "apiversion", "string"), ("readonly", "boolean", "readonly", "string"), ("recipientaccountid", "string", "recipientaccountid", "string"), ("sharedeventid", "string", "sharedeventid", "string"), ("vpcendpointid", "string", "vpcendpointid", "string") ], transformation_ctx = "ApplyMapping" ) table_name = "cloudtrail_demo_sample" txId = glueContext.begin_transaction(False) try: glueContext.write_dynamic_frame.from_catalog( frame = dynamic_frame, database = "demo", table_name = table_name, additional_options = { "transactionId":txId } ) glueContext.commit_transaction(txId) except Exception: glueContext.cancel_transaction(txId) raise glueContext.forEachBatch( frame = data_frame_DataSource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://my_checkpoint_bucket/cloudtrail_demo_sample/" } ) job.commit()
Java

以下两个 Java 示例使用getTableObjectsupdateTableObjects事务中受管理表的对象 API 操作。有关这些 API 操作的信息,请参阅API 文档.

第一个示例将有关新分区的信息添加到现有的受管理表中。

导入
import com.amazonaws.services.lakeformation.AWSLakeFormation; import com.amazonaws.services.lakeformation.AWSLakeFormationClientBuilder; import com.amazonaws.services.lakeformation.model.CancelTransactionRequest; import com.amazonaws.services.lakeformation.model.AddObjectInput; import com.amazonaws.services.lakeformation.model.StartTransactionRequest; import com.amazonaws.services.lakeformation.model.StartTransactionResult; import com.amazonaws.services.lakeformation.model.CommitTransactionRequest; import com.amazonaws.services.lakeformation.model.UpdateTableObjectsRequest; import com.amazonaws.services.lakeformation.model.WriteOperation;
代码
AWSLakeFormation lake = AWSLakeFormationClientBuilder.standard().build(); // Begin transaction BeginTransactionResult startTransactionResult = lake.startTransaction(new StartTransactionRequest()); String transactionId = startTransactionResult.getTransactionId(); // Construct a write operation WriteOperation write = new WriteOperation() .withAddObject(new AddObjectInput() .withPartitionValues("par0", "par1") .withUri("s3://bucket/prefix") .withETag("eTag") .withSize(100L)); // Save a partition table object try { lake.updateTableObjects(new UpdateTableObjectsRequest() .withDatabaseName(databaseName) .withTableName(tableName) .withTransactionId(transactionId) .withWriteOperations(write)); // Commit transaction lake.commitTransaction(new CommitTransactionRequest().withTransactionId(transactionId)); } catch (Exception e) { // Abort transaction lake.cancelTransaction(new CancelTransactionRequest().withTransactionId(transactionId)); }

下一个示例将检索与受管理表关联的所有 Amazon S3 对象并对其进行处理。

导入
import com.amazonaws.services.lakeformation.AWSLakeFormation; import com.amazonaws.services.lakeformation.AWSLakeFormationClientBuilder; import com.amazonaws.services.lakeformation.model.StartTransactionRequest; import com.amazonaws.services.lakeformation.model.StartTransactionResult; import com.amazonaws.services.lakeformation.model.CommitTransactionRequest; import com.amazonaws.services.lakeformation.model.GetTableObjectsRequest; import com.amazonaws.services.lakeformation.model.GetTableObjectsResult; import com.amazonaws.services.lakeformation.model.PartitionObjects; import com.amazonaws.services.lakeformation.model.TableObject;
代码
AWSLakeFormation lake = AWSLakeFormationClientBuilder.standard().build(); // Start read only transaction StartTransactionResult startTransactionResult = lake.startTransaction(new StartTransactionRequest().withReadOnly(true)); String transactionId = startTransactionResult.getTransactionId(); // Read all table objects from a table GetTableObjectsResult getTableObjectsResult = lake.getTableObjects( new GetTableObjectsRequest() .withTransactionId(transactionId) .withDatabaseName(databaseName) .withTableName(tableName)); for (PartitionObjects partitionObjects: getTableObjectsResult.getObjects()) { for (TableObject tableObject: partitionObjects.getObjects()) { // do something with the data } } // Commit transaction lake.commitTransaction(new CommitTransactionRequest().withTransactionId(transactionId));