Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Why is this JSON giving null values when I deserialize and try to extract values

I’m using Spark Structured Streaming to consume messages sent from a few Kafka topics. The json string is structured like this and I want to extract ‘created_at’, ‘text’ and ‘tag’:

{"data":
  {"created_at":"***",
   "id":"***",
   "text":"***"},
 "matching_rules":
   [{"id":"***",
     "tag":"***"}]
}

I wrote the following schema:

val DFschema = StructType(Array(
      StructField("data", StructType(Array(
        StructField("created_at", TimestampType),
        StructField("text", StringType)))),
      StructField("matching_rules", StructType(Array(
        StructField("tag", StringType)
      )))
    ))

When I use the schema with from_json() I can successfully extract ‘created_at’ and ‘text’ as columns with non-null values using getField(), but when I try to do the same for ‘tag’ its column is populated with nulls:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

val kafkaDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("failOnDataLoss", "false")
      .option("subscribe", topics)
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(col("key"), from_json($"value", DFschema).alias("structdata"))
      .select($"key",
        $"structdata.data".getField("created_at").alias("created_at"),
        $"structdata.data".getField("text").alias("text"),
        $"structdata.matching_rules".getField("tag").alias("topic")
      )
      .withColumn("hour", date_format(col("created_at"), "HH"))
      .withColumn("date", date_format(col("created_at"), "yyyy-MM-dd"))

Looking at the json I see that ‘id’ and ‘tag’ are wrapped in square brackets and this leads me to suspect I’ve left out a datatype in the schema, but I’m not experienced enough to know what. Appreciate the help.

>Solution :

For arrays, you have to wrap your StructType with ArrayType as below:

val DFschema = StructType(Array(
  StructField("data", StructType(Array(
    StructField("created_at", TimestampType),
    StructField("id", StringType),
    StructField("text", StringType)))),
  StructField("matching_rules", ArrayType(StructType(Array(
    StructField("tag", StringType),
    StructField("id", StringType))
  ))))
)

Another alternative is to use (under the assumption that content is your column):

ds = ds.withColumn("content", 
  expr("from_json(content, 'STRUCT<data:STRUCT<created_at:STRING,id:STRING,text:STRING>,matching_rules:ARRAY<STRUCT<id:STRING,tag:STRING>>>')")
)

You can ask Spark to generate the schema through schema_of_json.

Good luck!

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading