将文件和 Python 库导入 Amazon Athena for Apache Spark - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将文件和 Python 库导入 Amazon Athena for Apache Spark

本文档提供有关如何将文件和 Python 库导入 Amazon Athena for Apache Spark 的示例。

注意事项和限制

  • Python 版本 - 目前,Athena for Spark 使用 Python 版本 3.9.16。请注意,Python 包对次要 Python 版本敏感。

  • Athena for Spark 架构 - Athena for Spark 在 ARM64 架构上使用 Amazon Linux 2。请注意,某些 Python 库不会为此架构分发二进制文件。

  • 二进制共享对象(SO)- 由于 SparkContext addPyFile 方法检测不到二进制共享对象,因此无法在 Athena for Spark 中用于添加取决于共享对象的 Python 包。

  • 弹性分布式数据集(RDD)- 不支持 RDD

  • Dataframe.foreach - 不支持 PySpark DataFrame.foreach 方法。

示例

这些示例使用以下约定。

  • 占位符 Amazon S3 位置 s3://DOC-EXAMPLE-BUCKET。将该项替换为您自己的 S3 存储桶位置。

  • 从 Unix Shell 执行的所有代码块均显示为 directory_name $。例如,目录 /tmp 中的命令 ls 及其输出显示如下:

    /tmp $ ls

    输出

    file1 file2

导入文本文件以用于计算

本节中的示例显示如何导入文本文件以用于 Athena for Spark 笔记本中的计算。

将文件写入本地临时目录后将其添加到笔记本中

以下示例显示如何将文件写入本地临时目录、将其添加到笔记本并对其进行测试。

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

输出

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

从 Amazon S3 导入文件

以下示例显示如何将文件从 Amazon S3 导入到笔记本中并对其进行测试。

将文件从 Amazon S3 导入到笔记本中
  1. 创建一个名为 test.txt 的文件,其中有一行包含值 5

  2. 将文件添加到 Amazon S3 中的存储桶。此示例使用位置 s3://DOC-EXAMPLE-BUCKET

  3. 使用以下代码将文件导入到笔记本中并对其进行测试。

    from pyspark import SparkFiles sc.addFile('s3://DOC-EXAMPLE-BUCKET/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    输出

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

添加 Python 文件

本节中的示例显示如何将 Python 文件和库添加到 Athena 中的 Spark 笔记本。

添加 Python 文件并注册 UDF

以下示例显示如何将 Python 文件从 Amazon S3 添加到笔记本并注册 UDF。

将 Python 文件添加到笔记本并注册 UDF
  1. 使用您自己的 Amazon S3 位置创建包含以下内容的 s3://DOC-EXAMPLE-BUCKET/file1.py 文件:

    def xyz(input): return 'xyz - udf ' + str(input);
  2. 在同一 S3 位置创建包含以下内容的 s3://DOC-EXAMPLE-BUCKET/file2.py 文件:

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. 在 Athena for Spark 笔记本中,运行以下命令。

    sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/file1.py') sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    输出

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

导入 Python .zip 文件

您可以使用 Python addPyFileimport 方法,将 Python .zip 文件导入笔记本。

注意

导入 Athena Spark 的 .zip 文件可能仅包含 Python 包。例如,不支持包含基于 C 的文件的包。

将 Python .zip 文件导入笔记本
  1. 在本地计算机上的桌面目录(例如 \tmp)中,创建一个名为 moduletest 的目录。

  2. moduletest 目录中,创建一个名为 hello.py 的文件,该文件包含以下内容:

    def hi(input): return 'hi ' + str(input);
  3. 在同一目录中,添加一个名为 __init__.py 的空文件。

    如果列出目录内容,则它们应类似于以下内容。

    /tmp $ ls moduletest __init__.py hello.py
  4. 使用 zip 命令将两个模块文件放入名为 moduletest.zip 的文件中。

    moduletest $ zip -r9 ../moduletest.zip *
  5. .zip 文件上传到 Amazon S3 中的存储桶。

  6. 使用以下代码将 Python .zip 文件导入笔记本。

    sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    输出

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

将 Python 库的两个版本作为单独的模块导入

以下代码示例显示如何将两个不同版本的 Python 库作为两个单独的模块从 Amazon S3 中的某个位置添加和导入。该代码会从 S3 添加每个库文件,将其导入,然后打印库版本以验证导入。

sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

输出

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

输出

3.17.6

从 PyPI 导入 Python .zip 文件

此示例使用 pip 命令从 Python 程序包索引(PyPI)下载 bpabel/piglatin 项目的 Python .zip 文件。

从 PyPI 导入 Python .zip 文件
  1. 在本地桌面上,使用以下命令创建名为 testpiglatin 的目录并创建虚拟环境。

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    输出

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. 创建名为 unpacked 的子目录以保存项目。

    testpiglatin $ mkdir unpacked
  3. 使用 pip 命令将该项目安装到 unpacked 目录。

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    输出

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. 检查目录的内容。

    testpiglatin $ ls

    输出

    bin lib pyvenv.cfg unpacked
  5. 更改为 unpacked 目录并显示内容。

    testpiglatin $ cd unpacked unpacked $ ls

    输出

    piglatin piglatin-1.0.6.dist-info
  6. 使用 zip 命令将 piglatin 项目的内容放入名为 library.zip 的文件中。

    unpacked $ zip -r9 ../library.zip *

    输出

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (可选)使用以下命令在本地测试导入。

    1. 将 Python 路径设置为 library.zip 文件位置然后启动 Python。

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      输出

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. 导入库并运行测试命令。

      >>> import piglatin >>> piglatin.translate('hello')

      输出

      'ello-hay'
  8. 使用以下命令从 Amazon S3 添加 .zip 文件,将其导入 Athena 中的笔记本,然后对其进行测试。

    sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    输出

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

从 PyPI 导入具有依赖项的 Python .zip 文件

此示例从 PyPI 导入 md2gemini 程序包,该程序包将 markdown 中的文本转换为 Gemini 文本格式。该程序包具有以下依赖项

cjkwrap mistune wcwidth
导入具有依赖项的 Python .zip 文件
  1. 在本地计算机上,使用以下命令创建名为 testmd2gemini 的目录并创建虚拟环境。

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. 创建名为 unpacked 的子目录以保存项目。

    testmd2gemini $ mkdir unpacked
  3. 使用 pip 命令将该项目安装到 unpacked 目录。

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    输出

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. 更改为 unpacked 目录并检查内容。

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    输出

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. 使用 zip 命令将 md2gemini 项目的内容放入名为 md2gemini.zip 的文件中。

    unpacked $ zip -r9 ../md2gemini *

    输出

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (可选)使用以下命令测试库是否可以在您的本地计算机上运行。

    1. 将 Python 路径设置为 md2gemini.zip 文件位置然后启动 Python。

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. 导入库并运行测试。

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      输出

      https://abc.def abc
  7. 使用以下命令从 Amazon S3 添加 .zip 文件,将其导入 Athena 中的笔记本,然后执行非 UDF 测试。

    # (non udf test) sc.addPyFile('s3://DOC-EXAMPLE-BUCKET/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    输出

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. 使用以下命令执行 UDF 测试。

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    输出

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+