spark scala: преобразование набора данных OR или набора данных в строку, разделенную запятой

2

Ниже приведен код искровой scala, который будет печатать один столбец DataSet [Row]:

import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
        .appName("Spark DataValidation")
        .config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
        .getOrCreate()

val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID" 
val pkValues = spark
        .read
        .json(kafkaPath)
        .select("message.data.*")
        .select(pk)
        .distinct() 
pkValues.show()

Вывод кода:

+--------------+
|APPLICATION_ID|
+--------------+
|           388|
|           447|
|           346|
|           861|
|           361|
|           557|
|           482|
|           518|
|           432|
|           422|
|           533|
|           733|
|           472|
|           457|
|           387|
|           394|
|           786|
|           458|
+--------------+

Вопрос:

Как преобразовать этот файл данных в переменную String, разделенную запятой?

Ожидаемый результат:

val   data:String= "388,447,346,861,361,557,482,518,432,422,533,733,472,457,387,394,786,458"

Пожалуйста, предложите, как преобразовать DataFrame [Row] или Dataset в одну строку.

Теги:
apache-spark
spark-dataframe

2 ответа

3
Лучший ответ

Я не думаю, что хорошая идея, так как dataFrame является распределенным объектом и может быть бесполезным. Collect приведет все данные к драйверу, поэтому вы должны выполнить эту операцию осторожно.

Вот что вы можете сделать с помощью dataFrame (два варианта):

df.select("APPLICATION_ID").rdd.map(r => r(0)).collect.mkString(",")
df.select("APPLICATION_ID").collect.mkString(",")

Результат с тестовым dataFrame всего тремя строками:

String = 388,447,346

Изменение: с помощью DataSet вы можете сделать прямо:

ds.collect.mkString(",")
  • 1
    Большое спасибо за быстрый ответ. ds.collect.mkString (",") сработал и нашел [] в конечном значении, поэтому замените метод, чтобы удалить его
  • 2
    не нужно переходить на RDD API даже с DataFrame (поскольку он, кажется, работает с spark 2), вы можете просто пропустить .rdd или даже сделать df.select("APPLICATION_ID").as[String].collect.mkString(",")
0

Использовать collect_list:

import org.apache.spark.sql.functions._
val data = pkValues.select(collect_list(col(pk))) // collect to one row
    .as[Array[Long]] // set encoder, so you will have strongly-typed Dataset
    .take(1)(0) // get the first row - result will be Array[Long]
    .mkString(",") // and join all values

Тем не менее, это довольно плохая идея для выполнения сбора или взятия всех строк. Вместо этого вы можете сохранить pkValues где-нибудь с .write? Или сделать это аргументом для другой функции, чтобы сохранить распределенные вычисления

Изменение: только что заметил, что @SCouto отправил другой ответ сразу после меня. Сбор также будет правильным, с функцией collect_list у вас будет одно преимущество - вы можете легко сгруппировать, если хотите, и, т.е. Групповые ключи для четных и нечетных. Это зависит от того, какое решение вы предпочитаете, проще с коллекцией или одной линией дольше, но более мощным

  • 0
    поскольку ему нужны разные значения, вы также можете использовать collect_set а затем удалить distinct . Вместо take(1)(0) вы можете просто использовать first
  • 0
    У него уже есть четкие ценности, теперь он хочет собирать :)

Ещё вопросы

Сообщество Overcoder
Наверх
Меню