Using Spark 2.11, I’ve the following Dataset (read from Cassandra table):
+------------+----------------------------------------------------------+ |id |attributes | +------------+----------------------------------------------------------+ |YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}] | +------------+----------------------------------------------------------+
This is the printSchema():
root |-- id: string (nullable = true) |-- attributes: string (nullable = true)
The attributes
column is an array of JSON objects. I’m trying to explode it into Dataset but keep failing. I was trying to define schema as follow:
StructType type = new StructType() .add("id", new IntegerType(), false) .add("name", new StringType(), false) .add("score", new FloatType(), false) .add("snippets", new IntegerType(), false ); ArrayType schema = new ArrayType(type, false);
And provide it to from_json
as follow:
df = df.withColumn("val", functions.from_json(df.col("attributes"), schema));
This fails with MatchError:
Exception in thread "main" scala.MatchError: [email protected] (of class org.apache.spark.sql.types.IntegerType)
What’s the correct way to do that?
Answer
You can specify the schema this way :
val schema = ArrayType( StructType(Array( StructField("id", IntegerType, false), StructField("name", StringType, false), StructField("score", FloatType, false), StructField("snippets", IntegerType, false) )), false ) val df1 = df.withColumn("val", from_json(col("attributes"), schema)) df1.show(false) //+-----------+------------------------------------------------------+------------------------+ //|id |attributes |val | //+-----------+------------------------------------------------------+------------------------+ //|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]| //+-----------+------------------------------------------------------+------------------------+
Or for Java:
import static org.apache.spark.sql.types.DataTypes.*; StructType schema = createArrayType(createStructType(Arrays.asList( createStructField("id", IntegerType, false), createStructField("name", StringType, false), createStructField("score", FloatType, false), createStructField("snippets", StringType, false) )), false);