Saya memiliki file data json bersarang yang kompleks seperti di bawah ini dan saya mencoba menggunakan data dan mengonversinya sebagai

Per kelas di bawah ini

case class DeviceData (id: Int, device: String)

Dimana id = 0 dan

device = "{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"

Tapi saya terjebak pada langkah pertama itu sendiri saat mengkonsumsi data dan mengubahnya menjadi bingkai data sederhana dan mendapatkan kesalahan _corrupt_record. Tolong beri tahu kesalahan apa yang saya buat. Saya menggunakan Spark versi 2.4.5

Ekspor1.json

0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"

Dan kode percikan saya seperti di bawah ini

package sparkWCExample.spWCExample

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
import java.util.Formatter.DateTime
import org.apache.spark.sql.types._  // include the Spark Types to define our schema
import org.apache.spark.sql.functions._ // include the Spark helper functions
import org.apache.spark.sql.functions.to_timestamp

case class DeviceData (id: Int, device: String)

object DatasetExample {

  def main(args: Array[String]) {
    println("Start now")
    val conf = new SparkConf().setAppName("Spark Scala WordCount Example").setMaster("local[1]")
    val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    import spark.implicits._

val readJSONDF = spark.read.json(sc.wholeTextFiles("C:\\Sankha\\Study\\data\\complex-nested-json\\export1.json").values).toDF()
  println(readJSONDF.show())
}
}

Saya mendapatkan pengecualian

+--------------------+
|     _corrupt_record|
+--------------------+
|0,"{""device_id""...|
+--------------------+
1
Spark-shell 13 Maret 2020, 21:28

1 menjawab

Jawaban Terbaik

sc.wholeTextFiles membuat PairRDD dengan kuncinya adalah nama file dan nilai isi seluruh file. Detail selengkapnya dapat ditemukan di sini.

Anda mungkin ingin menggunakan spark.read.text dan kemudian bagi baris setelahnya:

val df = spark.read.text("export1.json")
  .map(row => {
    val s = row.getAs[String](0)
    val index = s.indexOf(',')
    DeviceData(s.substring(0, index).toInt, s.substring(index+1))
  })
df.show

Cetakan

+---+--------------------+
| id|              device|
+---+--------------------+
|  0|"{""device_id"": ...|
|  1|"{""device_id"": ...|
|  2|"{""device_id"": ...|
+---+--------------------+
1
werner 13 Maret 2020, 22:40