Scala'da belirli bir şema ile DataFrame
üzerinde oluşturmak istiyorum. JSON read kullanmayı denedim (yani boş dosyayı okumayı) ama bunun en iyi uygulama olduğunu sanmıyorum.
Aşağıdaki şemaya sahip bir veri çerçevesi istediğinizi varsayalım:
root
|-- k: string (nullable = true)
|-- v: integer (nullable = false)
Basitçe bir veri çerçevesi için şema tanımlar ve boş RDD[Satır]
kullanırsınız:
import org.apache.spark.sql.types.{
StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
val schema = StructType(
StructField("k", StringType, true) ::
StructField("v", IntegerType, false) :: Nil)
// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema)
spark.createDataFrame(sc.emptyRDD[Row], schema)
PySpark eşdeğeri neredeyse aynıdır:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])
# or df = sc.parallelize([]).toDF(schema)
# Spark < 2.0
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)
Örtük kodlayıcıları (yalnızca Scala) Tuple
gibi Product
türleriyle kullanma:
import spark.implicits._
Seq.empty[(String, Int)].toDF("k", "v")
veya vaka sınıfı:
case class KV(k: String, v: Int)
Seq.empty[KV].toDF
veya
spark.emptyDataset[KV].toDF
Spark 2.0.0'dan itibaren aşağıdakileri yapabilirsiniz.
Bir Person
case sınıfı tanımlayalım:
scala> case class Person(id: Int, name: String)
defined class Person
SparkSession implicit Encoders
öğesini içe aktarın:
scala> import spark.implicits._
import spark.implicits._
Ve boş bir Dataset[Person]
oluşturmak için SparkSession kullanın:
scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]
Ayrıca bir Şema "DSL" de kullanabilirsiniz (bkz. org.apache.spark.sql.ColumnName içinde Veri Çerçeveleri için destek fonksiyonları).
scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)
scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> emptyDF.printSchema
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
hiveContext.createDataFrame(sc.emptyRDD[Row],
ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
)
case class RawData(id: String, firstname: String, lastname: String, age: Int)
val sourceDF = createEmptyDataFrame[RawData]