步骤 5:使用 Apache Cassandra Spark Connector 写入和读取 Amazon Keyspaces 数据 - Amazon Keyspaces(Apache Cassandra 兼容)
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 5:使用 Apache Cassandra Spark Connector 写入和读取 Amazon Keyspaces 数据

在本步骤中,您首先需要使用 Spark Cassandra Connector 将示例文件中的数据加载到 DataFrame 中。接下来,您需要将数据从 DataFrame 写入您的 Amazon Keyspaces 表中。您也可以单独使用这一部分执行其他操作,比如将数据迁移到 Amazon Keyspaces 表中。最后,您需要使用 Spark Cassandra Connector 将表中的数据读入到一个 DataFrame 中。您也可以单独使用这一部分执行其他操作,比如从 Amazon Keyspaces 表中读取数据,以便使用 Apache Spark 执行数据分析。

  1. 启动 Spark Shell,如以下示例所示。请注意,此示例使用的是 Sigv4 身份验证。

    ./spark-shell --files application.conf --conf spark.cassandra.connection.config.profile.path=application.conf --packages software.aws.mcs:aws-sigv4-auth-cassandra-java-driver-plugin:4.0.5,com.datastax.spark:spark-cassandra-connector_2.12:3.1.0 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
  2. 使用以下代码导入 Spark Cassandra Connector。

    import org.apache.spark.sql.cassandra._
  3. 要从 CSV 文件中读取数据并将其存储在 DataFrame 中,您可以使用以下示例代码。

    var df = spark.read.option("header","true").option("inferSchema","true").csv("keyspaces_sample_table.csv")

    您可以使用以下命令来显示结果。

    scala> df.show();

    输出应如下所示:

    +----------------+----+-----------+----+------------------+--------------------+-------------+ | award|year| category|rank| author| book_title| publisher| +----------------+----+-----------+----+------------------+--------------------+-------------+ |Kwesi Manu Prize|2020| Fiction| 1| Akua Mansa| Where did you go?|SomePublisher| |Kwesi Manu Prize|2020| Fiction| 2| John Stiles| Yesterday|Example Books| |Kwesi Manu Prize|2020| Fiction| 3| Nikki Wolf|Moving to the Cha...| AnyPublisher| | Wolf|2020|Non-Fiction| 1| Wang Xiulan| History of Ideas|Example Books| | Wolf|2020|Non-Fiction| 2|Ana Carolina Silva| Science Today|SomePublisher| | Wolf|2020|Non-Fiction| 3| Shirley Rodriguez|The Future of Sea...| AnyPublisher| | Richard Roe|2020| Fiction| 1| Alejandro Rosalez| Long Summer|SomePublisher| | Richard Roe|2020| Fiction| 2| Arnav Desai| The Key|Example Books| | Richard Roe|2020| Fiction| 3| Mateo Jackson| Inside the Whale| AnyPublisher| +----------------+----+-----------+----+------------------+--------------------+-------------+

    您可以确认 DataFrame 中的数据的架构,如以下示例所示。

    scala> df.printSchema

    输出应如下所示:

    root |-- award: string (nullable = true) |-- year: integer (nullable = true) |-- category: string (nullable = true) |-- rank: integer (nullable = true) |-- author: string (nullable = true) |-- book_title: string (nullable = true) |-- publisher: string (nullable = true)
  4. 使用以下命令将 DataFrame 中的数据写入 Amazon Keyspaces 表中。

    df.write.cassandraFormat("book_awards", "catalog").mode("APPEND").save()
  5. 要确认数据已保存,您可以将其读回到一个数据框,如以下示例所示。

    var newDf = spark.read.cassandraFormat("book_awards", "catalog").load()

    然后,你可以显示现在包含在数据框中的数据。

    scala> newDf.show()

    该命令应生成如下所示的输出。

    +--------------------+------------------+----------------+-----------+-------------+----+----+ | book_title| author| award| category| publisher|rank|year| +--------------------+------------------+----------------+-----------+-------------+----+----+ | Long Summer| Alejandro Rosalez| Richard Roe| Fiction|SomePublisher| 1|2020| | History of Ideas| Wang Xiulan| Wolf|Non-Fiction|Example Books| 1|2020| | Where did you go?| Akua Mansa|Kwesi Manu Prize| Fiction|SomePublisher| 1|2020| | Inside the Whale| Mateo Jackson| Richard Roe| Fiction| AnyPublisher| 3|2020| | Yesterday| John Stiles|Kwesi Manu Prize| Fiction|Example Books| 2|2020| |Moving to the Cha...| Nikki Wolf|Kwesi Manu Prize| Fiction| AnyPublisher| 3|2020| |The Future of Sea...| Shirley Rodriguez| Wolf|Non-Fiction| AnyPublisher| 3|2020| | Science Today|Ana Carolina Silva| Wolf|Non-Fiction|SomePublisher| 2|2020| | The Key| Arnav Desai| Richard Roe| Fiction|Example Books| 2|2020| +--------------------+------------------+----------------+-----------+-------------+----+----+