Импорт нескольких CSV в DataFrame с разными заголовками в Spark

1

У меня есть несколько Csv, которые все переменные изменяются следующим образом:

cloudiness.csv

    +---+---+----------+-------------------+
    |_c0| ID|cloudiness|           datetime|
    +---+---+----------+-------------------+
    |  0|  3|       1.0|2013-11-08 00:00:00|
    |  1|303|       2.0|2013-11-08 00:00:00|
    |  2|306|       3.0|2013-11-08 00:00:00|

temperature.csv

    +---+---+-----------+-------------------+
    |_c0| ID|temperature|           datetime|
    +---+---+-----------+-------------------+
    |  0|  3|        3.0|2013-11-08 00:00:00|
    |  1|303|        4.0|2013-11-08 00:00:00|
    |  2|306|        5.0|2013-11-08 00:00:00|

.. и так далее, (7 o 8 o этих файлов).

Я должен объединить их в один только DataFrame, используя Spark (R, Python или Scala) следующим образом:

    +---+---+-----------+----------+-------------------+
    |_c0| ID|temperature|cloudiness|           datetime|
    +---+---+-----------+----------+-------------------+
    |  0|  3|        3.0|       1.0|2013-11-08 00:00:00|
    |  1|303|        4.0|       2.0|2013-11-08 00:00:00|
    |  2|306|        5.0|       3.0|2013-11-08 00:00:00|

Я попробовал spark.read, но это занимает слишком много времени, файлы по 3 ГБ каждый. Каков наилучший способ сделать это?

  • 2
    Я бы сказал, что в pyspark используйте spark.read.load('csv') а затем выполните операцию соединения с использованием pyspark. Это стандартный способ сделать это.
  • 0
    Объединение занимает очень много времени, по крайней мере 20 минут, проблема с моим кластером, так?
Показать ещё 2 комментария
Теги:
csv
dataframe
apache-spark

2 ответа

0

не могли бы вы обратиться ниже пример кода:

import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map

val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))

val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")


val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))

val combinedDF = depsDF
  .withColumn("empNames",lookup(lookupMap)($"EmpID"))
0

Стандартный способ заключается в объединении кадров данных.

когда вы читаете файлы csv, используя нижеприведенный фрагмент

val read_csv1 = sc.textFile("Путь HDFS для чтения файла")

RDD будет создан, и вы сможете присоединиться к другим CSV. Если вы отметите, что проблема с эффективностью работы. позволь мне дать тебе другой путь.

  • 0
    Хорошо спасибо. Какой другой путь?

Ещё вопросы

Сообщество Overcoder
Наверх
Меню