Pyspark - отдельные записи на основе 2 столбцов в датафрейме

1

У меня есть 2 кадра данных, например df1 и df2.

Данные df1 поступают из базы данных, а df2 - это новые данные, которые я получаю от своего клиента. Мне нужно обработать новые данные и выполнить UPSERTs зависимости от того, UPSERTs ли новая запись или существующая запись.

Пример вывода данных:

df1= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 01","NJ"),("xxx2","81A01","TERR NAME 01","NJ"),("xxx3","81A01","TERR NAME 01","NJ"),("xxx4","81A01","TERR NAME 01","CA"),("xx5","81A01","TERR NAME 01","ME")], ["zip_code","territory_code","territory_name","state"])
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code","territory_code","territory_name","state"])

df1.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
|    xxx1|         81A01|  TERR NAME 01|   NJ|
|    xxx2|         81A01|  TERR NAME 01|   NJ|
|    xxx3|         81A01|  TERR NAME 01|   NJ|
|    xxx4|         81A01|  TERR NAME 01|   CA|
|    xxx5|         81A01|  TERR NAME 01|   ME|
+---------------------------------------------

# Print out information about this data
df2.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
|    xxx1|         81A01|  TERR NAME 55|   NY|
|    xxx2|         81A01|  TERR NAME 55|   NY|
|    x103|         81A01|  TERR NAME 01|   NJ|
+---------------------------------------------

Ожидаемые результаты: Мне нужно сравнить df2 с фреймом df1. Создайте 2 новых набора данных, основанных на приведенном выше сравнении, то есть обновляемые записи и записи, которые необходимо добавить/добавить в базу данных.

если zip_code & territory_code одинаковы, то это UPDATE, иначе это ВСТАВИТЬ в базу данных.

Например: Новый вывод dataframe для INSERT:

 +--------+--------------+--------------+-----+
 |zip_code|territory_code|territory_name|state|
 +--------+--------------+--------------+-----+
 |    x103|         81A01|  TERR NAME 01|   NJ|
 +---------------------------------------------

Новый фрейм данных для UPDATE:

+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
|    xxx1|         81A01|  TERR NAME 55|   NY|
|    xxx2|         81A01|  TERR NAME 55|   NY|
+---------------------------------------------

Может кто-нибудь, пожалуйста, помогите мне? И я использую AWS Glue.

ОБНОВЛЕНИЕ: РЕШЕНИЕ (Использование объединения и вычитания)

df3 = df1.join(df2, (df1.zip_code == df2.zip_code_new) & (df1.territory_code == df2.territory_code_new))
df5=df3.drop("zip_code", "territory_code", "territory_name", "state")
df5.show()

+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
|        x103|             81A01|      TERR NAME 01|       NJ|
+------------+------------------+------------------+---------+

df4=df2.subtract(df5)
df4.show()

+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
|    xxx1    |         81A01    |  TERR NAME 55    |   NY    |
|    xxx2    |         81A01    |  TERR NAME 55    |   NY    |
+------------------------------------------------------------+

Для обновления базы данных RDS я использую pymysql/Mysqldb:

db = MySQLdb.connect("xxxx.rds.amazonaws.com", "username", "password", "databasename")
cursor = db.cursor()

#cursor.execute("REPLACE INTO table SELECT * FROM table_stg")
insertQry = "INSERT INTO table VALUES('xxx1','81A01','TERR NAME 55','NY') ON DUPLICATE KEY UPDATE territory_name='TERR NAME 55', state='NY'"
n=cursor.execute(insertQry)
db.commit()
cursor.fetchall()
db.close()

Спасибо

Теги:
pyspark
amazon-rds
pyspark-sql
aws-glue

2 ответа

1
Лучший ответ

Вот эскиз решения:

  1. проецируйте оба фрейма на свой уникальный ключ (zip_code и территорию)

  2. Используйте искровой фрейм api для вычисления пересечения и разности между обоими кадрами. См. Эту ссылку: Как получить разницу между двумя DataFrames?

  3. выполнить обновление для пересечения ключей

  4. сделать вставку для разницы (в пределах нового фрейма данных, а не в пределах существующих данных)

В scala это выглядело бы примерно так - и это должно быть очень похоже на python:

import org.apache.spark.sql.SparkSession

case class ZipTerr(zip_code: String, territory_code: String, 
    territory_name: String, state:String)

case class Key(zip_code: String, territory_code: String)

val spark: SparkSession

val newData = spark.createDataFrame(List(
  ZipTerr("xxx1", "81A01", "TERR NAME 01", "NJ"),
  ZipTerr("xxx2", "81A01", "TERR NAME 01", "NJ"),
  ZipTerr("xxx3", "81A01", "TERR NAME 01", "NJ"),
  ZipTerr("xxx4", "81A01", "TERR NAME 01", "CA"),
  ZipTerr("xx5","81A01","TERR NAME 01","ME")
))

val oldData = spark.createDataFrame(List(
  ZipTerr("xxx1","81A01","TERR NAME 55","NY"),
  ZipTerr("xxx2","81A01","TERR NAME 55","NY"),
  ZipTerr("x103","81A01","TERR NAME 01","NJ")
))

val newKeys = newData.map(z => Key(z.getAs("zip_code"), z.getAs("territory_code")))
val oldKeys = oldData.map(z => Key(z.getAs("zip_code"), z.getAs("territory_code")))

val keysToInsert = newKeys.except(oldKeys)
val keysToUpdate = newKeys.intersect(oldKeys)

Помогает ли это?

Примечание: имя ваших переменных указывает на то, что вы работаете с клеевыми динамическими фреймами. Тем не менее вы назначаете для них sqlContext.createDataFrame функции sqlContext.createDataFrame.

  • 0
    Спасибо @ user152468, постараюсь это и держать вас в курсе. Есть ли способ извлечь всю запись из newData / oldData на основе ключей из keysToInsert / keyToUpdate путем объединения и удаления ненужных столбцов? Извиняюсь, если его основы, но я все еще учусь pyspark.
  • 1
    Привет, user152468, я смог получить желаемые результаты, основываясь на твоих предложениях, а также на других ссылках. Я обновил код и результаты в моем вопросе. Просто заметьте, подумайте, кроме того, что он заменен вычитанием, и в pyspark нет пересечений, насколько я гуглил. Благодарю.
Показать ещё 1 комментарий
1

Для ясности я воспроизводил здесь решение с фрагментами кода:

df1= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 01","NJ"),("xxx2","81A01","TERR NAME 01","NJ"),("xxx3","81A01","TERR NAME 01","NJ"),("xxx4","81A01","TERR NAME 01","CA"),("xx5","81A01","TERR NAME 01","ME")], ["zip_code","territory_code","territory_name","state"])
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code_new","territory_code_new","territory_name_new","state"])

df1.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
|    xxx1|         81A01|  TERR NAME 01|   NJ|
|    xxx2|         81A01|  TERR NAME 01|   NJ|
|    xxx3|         81A01|  TERR NAME 01|   NJ|
|    xxx4|         81A01|  TERR NAME 01|   CA|
|    xxx5|         81A01|  TERR NAME 01|   ME|
+---------------------------------------------

# Print out information about this data
df2.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
|    xxx1    |         81A01    |  TERR NAME 55    |   NY    |
|    xxx2    |         81A01    |  TERR NAME 55    |   NY    |
|    x103    |         81A01    |  TERR NAME 01    |   NJ    |
+------------------------------------------------------------+

Получите новые записи, которые можно вставить в mysql, используя операцию " добавить "

df3 = df1.join(df2, (df1.zip_code == df2.zip_code_new) & (df1.territory_code == df2.territory_code_new))
df5=df3.drop("zip_code", "territory_code", "territory_name", "state")
df5.show()

+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
|        x103|             81A01|      TERR NAME 01|       NJ|
+------------+------------------+------------------+---------+

Затем получите оставшиеся записи, которые необходимо обновить до базы данных mysql. Мы можем использовать arr = df1.collect(), за которым следует for r in arr: в случае чистых потребностей python, в противном случае мы можем использовать итераторы pandas, обрабатывающие каждую запись.

df4=df2.subtract(df5)
df4.show()

+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
|    xxx1    |         81A01    |  TERR NAME 55    |   NY    |
|    xxx2    |         81A01    |  TERR NAME 55    |   NY    |
+------------------------------------------------------------+

Надеюсь, это поможет кому-то в этом нуждается. Сообщите мне, есть ли более эффективные способы для итерации в области данных в приведенном выше сценарии. Спасибо

Ещё вопросы

Сообщество Overcoder
Наверх
Меню