Ниже приведен код искровой scala, который будет печатать один столбец DataSet [Row]:
import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
.appName("Spark DataValidation")
.config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
.getOrCreate()
val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID"
val pkValues = spark
.read
.json(kafkaPath)
.select("message.data.*")
.select(pk)
.distinct()
pkValues.show()
Вывод кода:
+--------------+
|APPLICATION_ID|
+--------------+
| 388|
| 447|
| 346|
| 861|
| 361|
| 557|
| 482|
| 518|
| 432|
| 422|
| 533|
| 733|
| 472|
| 457|
| 387|
| 394|
| 786|
| 458|
+--------------+
Вопрос:
Как преобразовать этот файл данных в переменную String, разделенную запятой?
Ожидаемый результат:
val data:String= "388,447,346,861,361,557,482,518,432,422,533,733,472,457,387,394,786,458"
Пожалуйста, предложите, как преобразовать DataFrame [Row] или Dataset в одну строку.
Я не думаю, что хорошая идея, так как dataFrame является распределенным объектом и может быть бесполезным. Collect
приведет все данные к драйверу, поэтому вы должны выполнить эту операцию осторожно.
Вот что вы можете сделать с помощью dataFrame (два варианта):
df.select("APPLICATION_ID").rdd.map(r => r(0)).collect.mkString(",")
df.select("APPLICATION_ID").collect.mkString(",")
Результат с тестовым dataFrame всего тремя строками:
String = 388,447,346
Изменение: с помощью DataSet вы можете сделать прямо:
ds.collect.mkString(",")
Использовать collect_list:
import org.apache.spark.sql.functions._
val data = pkValues.select(collect_list(col(pk))) // collect to one row
.as[Array[Long]] // set encoder, so you will have strongly-typed Dataset
.take(1)(0) // get the first row - result will be Array[Long]
.mkString(",") // and join all values
Тем не менее, это довольно плохая идея для выполнения сбора или взятия всех строк. Вместо этого вы можете сохранить pkValues где-нибудь с .write
? Или сделать это аргументом для другой функции, чтобы сохранить распределенные вычисления
Изменение: только что заметил, что @SCouto отправил другой ответ сразу после меня. Сбор также будет правильным, с функцией collect_list у вас будет одно преимущество - вы можете легко сгруппировать, если хотите, и, т.е. Групповые ключи для четных и нечетных. Это зависит от того, какое решение вы предпочитаете, проще с коллекцией или одной линией дольше, но более мощным
collect_set
а затем удалить distinct
. Вместо take(1)(0)
вы можете просто использовать first
.rdd
или даже сделатьdf.select("APPLICATION_ID").as[String].collect.mkString(",")