У меня есть несколько Csv, которые все переменные изменяются следующим образом:
cloudiness.csv
+---+---+----------+-------------------+
|_c0| ID|cloudiness| datetime|
+---+---+----------+-------------------+
| 0| 3| 1.0|2013-11-08 00:00:00|
| 1|303| 2.0|2013-11-08 00:00:00|
| 2|306| 3.0|2013-11-08 00:00:00|
temperature.csv
+---+---+-----------+-------------------+
|_c0| ID|temperature| datetime|
+---+---+-----------+-------------------+
| 0| 3| 3.0|2013-11-08 00:00:00|
| 1|303| 4.0|2013-11-08 00:00:00|
| 2|306| 5.0|2013-11-08 00:00:00|
.. и так далее, (7 o 8 o этих файлов).
Я должен объединить их в один только DataFrame, используя Spark (R, Python или Scala) следующим образом:
+---+---+-----------+----------+-------------------+
|_c0| ID|temperature|cloudiness| datetime|
+---+---+-----------+----------+-------------------+
| 0| 3| 3.0| 1.0|2013-11-08 00:00:00|
| 1|303| 4.0| 2.0|2013-11-08 00:00:00|
| 2|306| 5.0| 3.0|2013-11-08 00:00:00|
Я попробовал spark.read, но это занимает слишком много времени, файлы по 3 ГБ каждый. Каков наилучший способ сделать это?
не могли бы вы обратиться ниже пример кода:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
Стандартный способ заключается в объединении кадров данных.
когда вы читаете файлы csv, используя нижеприведенный фрагмент
val read_csv1 = sc.textFile("Путь HDFS для чтения файла")
RDD будет создан, и вы сможете присоединиться к другим CSV. Если вы отметите, что проблема с эффективностью работы. позволь мне дать тебе другой путь.
spark.read.load('csv')
а затем выполните операцию соединения с использованием pyspark. Это стандартный способ сделать это.