从 2025 年 11 月 1 日起,Amazon Redshift 将不再支持创建新的 Python UDF。如果您想要使用 Python UDF,请在该日期之前创建 UDF。现有的 Python UDF 将继续正常运行。有关更多信息,请参阅博客文章
支持的数据类型
Spark 连接器支持 Amazon Redshift 中的以下数据类型。有关 Amazon Redshift 中支持的数据类型的完整列表,请参阅数据类型。如果某个数据类型不在下表中,则 Spark 连接器不支持该数据类型。
| 数据类型 | 别名 |
|---|---|
| SMALLINT | INT2 |
| INTEGER | INT、INT4 |
| BIGINT | INT8 |
| DECIMAL | NUMERIC |
| REAL | FLOAT4 |
| DOUBLE PRECISION | FLOAT8、FLOAT |
| BOOLEAN | BOOL |
| CHAR | CHARACTER、NCHAR、BPCHAR |
| VARCHAR | CHARACTER VARYING、NVARCHAR、TEXT |
| DATE | |
| TIMESTAMP | Timestamp without time zone |
| TIMESTAMPTZ | Timestamp with time zone |
| SUPER | |
| TIME | TIME WITHOUT TIME ZONE |
| TIMETZ | Time with time zone |
| VARBYTE | VARBINARY,BINARY VARYING |
复杂的数据类型
您可以使用 spark 连接器在 Redshift SUPER 数据类型列中读写 Spark 复杂数据类型,如 ArrayType、MapType 和 StructType。如果您在读取操作期间提供架构,则该列中的数据将在 Spark 中转换为相应的复杂类型,包括任何嵌套类型。此外,如果启用 autopushdown,嵌套属性、映射值和数组索引的投影将下推到 Redshift,这样,当只访问一部分数据时,就不再需要卸载整个嵌套数据结构。
从连接器写入 DataFrame 时,任何类型为 MapType(使用 StringType)、StructType 或 ArrayType 的列都会写入 Redshift SUPER 数据类型列。在写入这些嵌套数据结构时,tempformat 参数必须为类型 CSV、CSV GZIP 或 PARQUET。使用 AVRO 将导致异常。写入一个键类型不是 StringType 的 MapType 数据结构也会导致异常。
StructType
以下示例演示如何使用包含结构的 SUPER 数据类型创建表
create table contains_super (a super);
然后,您可以使用连接器,使用下面示例中的类似架构,从表中的 SUPER 列 a 查询 StringType 字段 hello。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a.hello")
以下示例演示如何向列 a 写入结构。
import org.apache.spark.sql.types._ import org.apache.spark.sql._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil) val data = sc.parallelize(Seq(Row(Row("world")))) val mydf = sqlContext.createDataFrame(data, schema) mydf.write.format("io.github.spark_redshift_community.spark.redshift"). option("url", jdbcUrl). option("dbtable", tableName). option("tempdir", tempS3Dir). option("tempformat", "CSV"). mode(SaveMode.Append).save
MapType
如果您更喜欢使用 MapType 来表示数据,那么您可以在架构中使用 MapType 数据结构,并检索映射中与键对应的值。请注意,MapType 数据结构中的所有键都必须是 String 类型,并且所有值都必须是相同的类型,例如 int。
以下示例演示如何获取列 a 中键 hello 的值。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", MapType(StringType, IntegerType))::Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a['hello']")
ArrayType
如果该列包含数组而不是结构,则可以使用连接器查询数组中的第一个元素。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", ArrayType(IntegerType)):: Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a[0]")
限制
通过 spark 连接器使用复杂数据类型有以下限制:
-
所有嵌套的结构字段名称和映射键必须为小写。如果查询带有大写字母的复杂字段名称,可以尝试省略架构,并使用
from_jsonspark 函数在本地转换返回的字符串来作为解决方法。 -
在读取或写入操作中使用的任何映射字段都必须只有
StringType键。 -
只有
CSV、CSV GZIP和PARQUET是支持将复杂类型写入 Redshift 的临时格式值。尝试使用AVRO会引发异常。