Amazon Glue 使用适用于 Ruby 的 SDK 的示例 - Amazon 适用于 Ruby 的 SDK
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Glue 使用适用于 Ruby 的 SDK 的示例

以下代码示例向您展示了如何使用with来执行操作和实现常见场景 Amazon Glue。 Amazon SDK for Ruby

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景和跨服务示例的上下文查看操作。

场景 是展示如何通过在同一服务中调用多个函数来完成特定任务的代码示例。

每个示例都包含一个指向的链接 GitHub,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

操作

以下代码示例演示了如何使用 CreateCrawler

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Creates a new crawler with the specified configuration. # # @param name [String] The name of the crawler. # @param role_arn [String] The ARN of the IAM role to be used by the crawler. # @param db_name [String] The name of the database where the crawler stores its metadata. # @param db_prefix [String] The prefix to be added to the names of tables that the crawler creates. # @param s3_target [String] The S3 path that the crawler will crawl. # @return [void] def create_crawler(name, role_arn, db_name, db_prefix, s3_target) @glue_client.create_crawler( name: name, role: role_arn, database_name: db_name, targets: { s3_targets: [ { path: s3_target } ] } ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create crawler: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考CreateCrawler中的。

以下代码示例演示了如何使用 CreateJob

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: "glueetl", script_location: script_location, python_version: "3" }, glue_version: "3.0" ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考CreateJob中的。

以下代码示例演示了如何使用 DeleteCrawler

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Deletes a crawler with the specified name. # # @param name [String] The name of the crawler to delete. # @return [void] def delete_crawler(name) @glue_client.delete_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete crawler #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考DeleteCrawler中的。

以下代码示例演示了如何使用 DeleteDatabase

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Removes a specified database from a Data Catalog. # # @param database_name [String] The name of the database to delete. # @return [void] def delete_database(database_name) @glue_client.delete_database(name: database_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete database: \n#{e.message}") end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考DeleteDatabase中的。

以下代码示例演示了如何使用 DeleteJob

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Deletes a job with the specified name. # # @param job_name [String] The name of the job to delete. # @return [void] def delete_job(job_name) @glue_client.delete_job(job_name: job_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考DeleteJob中的。

以下代码示例演示了如何使用 DeleteTable

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Deletes a table with the specified name. # # @param database_name [String] The name of the catalog database in which the table resides. # @param table_name [String] The name of the table to be deleted. # @return [void] def delete_table(database_name, table_name) @glue_client.delete_table(database_name: database_name, name: table_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考DeleteTable中的。

以下代码示例演示了如何使用 GetCrawler

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific crawler. # # @param name [String] The name of the crawler to retrieve information about. # @return [Aws::Glue::Types::Crawler, nil] The crawler object if found, or nil if not found. def get_crawler(name) @glue_client.get_crawler(name: name) rescue Aws::Glue::Errors::EntityNotFoundException @logger.info("Crawler #{name} doesn't exist.") false rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get crawler #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考GetCrawler中的。

以下代码示例演示了如何使用 GetDatabase

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific database. # # @param name [String] The name of the database to retrieve information about. # @return [Aws::Glue::Types::Database, nil] The database object if found, or nil if not found. def get_database(name) response = @glue_client.get_database(name: name) response.database rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get database #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考GetDatabase中的。

以下代码示例演示了如何使用 GetJobRun

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves data for a specific job run. # # @param job_name [String] The name of the job run to retrieve data for. # @return [Glue::Types::GetJobRunResponse] def get_job_run(job_name, run_id) @glue_client.get_job_run(job_name: job_name, run_id: run_id) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考GetJobRun中的。

以下代码示例演示了如何使用 GetJobRuns

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves a list of job runs for the specified job. # # @param job_name [String] The name of the job to retrieve job runs for. # @return [Array<Aws::Glue::Types::JobRun>] def get_job_runs(job_name) response = @glue_client.get_job_runs(job_name: job_name) response.job_runs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考GetJobRuns中的。

以下代码示例演示了如何使用 GetTables

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves a list of tables in the specified database. # # @param db_name [String] The name of the database to retrieve tables from. # @return [Array<Aws::Glue::Types::Table>] def get_tables(db_name) response = @glue_client.get_tables(database_name: db_name) response.table_list rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get tables #{db_name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考GetTables中的。

以下代码示例演示了如何使用 ListJobs

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves a list of jobs in AWS Glue. # # @return [Aws::Glue::Types::ListJobsResponse] def list_jobs @glue_client.list_jobs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not list jobs: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考ListJobs中的。

以下代码示例演示了如何使用 StartCrawler

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Starts a crawler with the specified name. # # @param name [String] The name of the crawler to start. # @return [void] def start_crawler(name) @glue_client.start_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not start crawler #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考StartCrawler中的。

以下代码示例演示了如何使用 StartJobRun

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 Amazon SDK for Ruby API 参考StartJobRun中的。

场景

以下代码示例展示了如何:

  • 创建爬网程序,爬取公有 Amazon S3 存储桶并生成包含 CSV 格式的元数据的数据库。

  • 列出您的中的数据库和表的相关信息 Amazon Glue Data Catalog。

  • 创建任务,从 S3 存储桶提取 CSV 数据,转换数据,然后将 JSON 格式的输出加载到另一个 S3 存储桶中。

  • 列出有关作业运行的信息,查看转换后的数据,并清除资源。

有关更多信息,请参阅教程: Amazon Glue Studio 入门

适用于 Ruby 的 SDK
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建一个封装场景中使用的 Amazon Glue 函数的类。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific crawler. # # @param name [String] The name of the crawler to retrieve information about. # @return [Aws::Glue::Types::Crawler, nil] The crawler object if found, or nil if not found. def get_crawler(name) @glue_client.get_crawler(name: name) rescue Aws::Glue::Errors::EntityNotFoundException @logger.info("Crawler #{name} doesn't exist.") false rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get crawler #{name}: \n#{e.message}") raise end # Creates a new crawler with the specified configuration. # # @param name [String] The name of the crawler. # @param role_arn [String] The ARN of the IAM role to be used by the crawler. # @param db_name [String] The name of the database where the crawler stores its metadata. # @param db_prefix [String] The prefix to be added to the names of tables that the crawler creates. # @param s3_target [String] The S3 path that the crawler will crawl. # @return [void] def create_crawler(name, role_arn, db_name, db_prefix, s3_target) @glue_client.create_crawler( name: name, role: role_arn, database_name: db_name, targets: { s3_targets: [ { path: s3_target } ] } ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create crawler: \n#{e.message}") raise end # Starts a crawler with the specified name. # # @param name [String] The name of the crawler to start. # @return [void] def start_crawler(name) @glue_client.start_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not start crawler #{name}: \n#{e.message}") raise end # Deletes a crawler with the specified name. # # @param name [String] The name of the crawler to delete. # @return [void] def delete_crawler(name) @glue_client.delete_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete crawler #{name}: \n#{e.message}") raise end # Retrieves information about a specific database. # # @param name [String] The name of the database to retrieve information about. # @return [Aws::Glue::Types::Database, nil] The database object if found, or nil if not found. def get_database(name) response = @glue_client.get_database(name: name) response.database rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get database #{name}: \n#{e.message}") raise end # Retrieves a list of tables in the specified database. # # @param db_name [String] The name of the database to retrieve tables from. # @return [Array<Aws::Glue::Types::Table>] def get_tables(db_name) response = @glue_client.get_tables(database_name: db_name) response.table_list rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get tables #{db_name}: \n#{e.message}") raise end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: "glueetl", script_location: script_location, python_version: "3" }, glue_version: "3.0" ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end # Retrieves a list of jobs in AWS Glue. # # @return [Aws::Glue::Types::ListJobsResponse] def list_jobs @glue_client.list_jobs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not list jobs: \n#{e.message}") raise end # Retrieves a list of job runs for the specified job. # # @param job_name [String] The name of the job to retrieve job runs for. # @return [Array<Aws::Glue::Types::JobRun>] def get_job_runs(job_name) response = @glue_client.get_job_runs(job_name: job_name) response.job_runs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Retrieves data for a specific job run. # # @param job_name [String] The name of the job run to retrieve data for. # @return [Glue::Types::GetJobRunResponse] def get_job_run(job_name, run_id) @glue_client.get_job_run(job_name: job_name, run_id: run_id) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Deletes a job with the specified name. # # @param job_name [String] The name of the job to delete. # @return [void] def delete_job(job_name) @glue_client.delete_job(job_name: job_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Deletes a table with the specified name. # # @param database_name [String] The name of the catalog database in which the table resides. # @param table_name [String] The name of the table to be deleted. # @return [void] def delete_table(database_name, table_name) @glue_client.delete_table(database_name: database_name, name: table_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Removes a specified database from a Data Catalog. # # @param database_name [String] The name of the database to delete. # @return [void] def delete_database(database_name) @glue_client.delete_database(name: database_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete database: \n#{e.message}") end # Uploads a job script file to an S3 bucket. # # @param file_path [String] The local path of the job script file. # @param bucket_resource [Aws::S3::Bucket] The S3 bucket resource to upload the file to. # @return [void] def upload_job_script(file_path, bucket_resource) File.open(file_path) do |file| bucket_resource.client.put_object({ body: file, bucket: bucket_resource.name, key: file_path }) end rescue Aws::S3::Errors::S3UploadFailedError => e @logger.error("S3 could not upload job script: \n#{e.message}") raise end end

创建运行场景的类。

class GlueCrawlerJobScenario def initialize(glue_client, glue_service_role, glue_bucket, logger) @glue_client = glue_client @glue_service_role = glue_service_role @glue_bucket = glue_bucket @logger = logger end def run(crawler_name, db_name, db_prefix, data_source, job_script, job_name) wrapper = GlueWrapper.new(@glue_client, @logger) new_step(1, "Create a crawler") puts "Checking for crawler #{crawler_name}." crawler = wrapper.get_crawler(crawler_name) if crawler == false puts "Creating crawler #{crawler_name}." wrapper.create_crawler(crawler_name, @glue_service_role.arn, db_name, db_prefix, data_source) puts "Successfully created #{crawler_name}:" crawler = wrapper.get_crawler(crawler_name) puts JSON.pretty_generate(crawler).yellow end print "\nDone!\n".green new_step(2, "Run a crawler to output a database.") puts "Location of input data analyzed by crawler: #{data_source}" puts "Outputs: a Data Catalog database in CSV format containing metadata on input." wrapper.start_crawler(crawler_name) puts "Starting crawler... (this typically takes a few minutes)" crawler_state = nil while crawler_state != "READY" custom_wait(15) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler[0]["state"] print "Status check: #{crawler_state}.".yellow end print "\nDone!\n".green new_step(3, "Query the database.") database = wrapper.get_database(db_name) puts "The crawler created database #{db_name}:" print "#{database}".yellow puts "\nThe database contains these tables:" tables = wrapper.get_tables(db_name) tables.each_with_index do |table, index| print "\t#{index + 1}. #{table['name']}".yellow end print "\nDone!\n".green new_step(4, "Create a job definition that runs an ETL script.") puts "Uploading Python ETL script to S3..." wrapper.upload_job_script(job_script, @glue_bucket) puts "Creating job definition #{job_name}:\n" response = wrapper.create_job(job_name, "Getting started example job.", @glue_service_role.arn, "s3://#{@glue_bucket.name}/#{job_script}") puts JSON.pretty_generate(response).yellow print "\nDone!\n".green new_step(5, "Start a new job") job_run_status = nil job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["name"], @glue_bucket.name ) puts "Job #{job_name} started. Let's wait for it to run." until ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"].include?(job_run_status) custom_wait(10) job_run = wrapper.get_job_runs(job_name) job_run_status = job_run[0]["job_run_state"] print "Status check: #{job_name}/#{job_run_id} - #{job_run_status}.".yellow end print "\nDone!\n".green new_step(6, "View results from a successful job run.") if job_run_status == "SUCCEEDED" puts "Data from your job run is stored in your S3 bucket '#{@glue_bucket.name}'. Files include:" begin # Print the key name of each object in the bucket. @glue_bucket.objects.each do |object_summary| if object_summary.key.include?("run-") print "#{object_summary.key}".yellow end end # Print the first 256 bytes of a run file desired_sample_objects = 1 @glue_bucket.objects.each do |object_summary| if object_summary.key.include?("run-") if desired_sample_objects > 0 sample_object = @glue_bucket.object(object_summary.key) sample = sample_object.get(range: "bytes=0-255").body.read puts "\nSample run file contents:" print "#{sample}".yellow desired_sample_objects -= 1 end end end rescue Aws::S3::Errors::ServiceError => e logger.error( "Couldn't get job run data. Here's why: %s: %s", e.response.error.code, e.response.error.message ) raise end end print "\nDone!\n".green new_step(7, "Delete job definition and crawler.") wrapper.delete_job(job_name) puts "Job deleted: #{job_name}." wrapper.delete_crawler(crawler_name) puts "Crawler deleted: #{crawler_name}." wrapper.delete_table(db_name, tables[0]["name"]) puts "Table deleted: #{tables[0]["name"]} in #{db_name}." wrapper.delete_database(db_name) puts "Database deleted: #{db_name}." print "\nDone!\n".green end end def main banner("../../helpers/banner.txt") puts "######################################################################################################".yellow puts "# #".yellow puts "# EXAMPLE CODE DEMO: #".yellow puts "# AWS Glue #".yellow puts "# #".yellow puts "######################################################################################################".yellow puts "" puts "You have launched a demo of AWS Glue using the AWS for Ruby v3 SDK. Over the next 60 seconds, it will" puts "do the following:" puts " 1. Create a crawler." puts " 2. Run a crawler to output a database." puts " 3. Query the database." puts " 4. Create a job definition that runs an ETL script." puts " 5. Start a new job." puts " 6. View results from a successful job run." puts " 7. Delete job definition and crawler." puts "" confirm_begin billing security puts "\e[H\e[2J" # Set input file names job_script_filepath = "job_script.py" resource_names = YAML.load_file("resource_names.yaml") # Instantiate existing IAM role. iam = Aws::IAM::Resource.new(region: "us-east-1") iam_role_name = resource_names["glue_service_role"] iam_role = iam.role(iam_role_name) # Instantiate existing S3 bucket. s3 = Aws::S3::Resource.new(region: "us-east-1") s3_bucket_name = resource_names["glue_bucket"] s3_bucket = s3.bucket(s3_bucket_name) scenario = GlueCrawlerJobScenario.new( Aws::Glue::Client.new(region: "us-east-1"), iam_role, s3_bucket, @logger ) random_int = rand(10 ** 4) scenario.run( "doc-example-crawler-#{random_int}", "doc-example-database-#{random_int}", "doc-example-#{random_int}-", "s3://crawler-public-us-east-1/flight/2016/csv", job_script_filepath, "doc-example-job-#{random_int}" ) puts "-" * 88 puts "You have reached the end of this tour of AWS Glue." puts "To destroy CDK-created resources, run:\n cdk destroy" puts "-" * 88 end

创建一个 ETL 脚本,用于在作业运行期间 Amazon Glue 提取、转换和加载数据。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()