Я пытался много раз продумать ответ на этот вопрос, изучая pandas. Я использую SAS для повседневной работы, и это отлично подходит для его поддержки из-за ядра. Однако SAS является ужасным как часть программного обеспечения по многим другим причинам.
В один прекрасный день я надеюсь заменить мое использование SAS на python и pandas, но в настоящее время мне не хватает встроенного рабочего процесса для больших наборов данных. Я не говорю о "больших данных", для которых требуется распределенная сеть, а файлы слишком велики, чтобы вписаться в память, но достаточно мала, чтобы поместиться на жесткий диск.
Моя первая мысль - использовать HDFStore
для хранения больших наборов данных на диске и вытащить только те части, которые мне нужны, в dataframes для анализа. Другие упомянули MongoDB как более легкую в использовании альтернативу. Мой вопрос таков:
Каковы наиболее эффективные рабочие процессы для выполнения следующих действий:
Примеры в реальном мире будут высоко оценены, особенно от тех, кто использует pandas для "больших данных".
Изменить - пример того, как я хотел бы, чтобы это работало:
Я пытаюсь найти оптимальный способ выполнения этих действий. Чтение ссылок о pandas и pytables кажется, что добавление нового столбца может быть проблемой.
Изменить - Отвечая на вопросы Джеффа конкретно:
if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'
. Результатом этих операций является новый столбец для каждой записи в моем наборе данных.Редко я когда-либо добавлял строки в набор данных. Я почти всегда создаю новые столбцы (переменные или функции в языке статистики/машинного обучения).
Я обычно использую десятки гигабайт данных именно таким образом например У меня есть таблицы на диске, которые я читаю через запросы, создаю данные и добавляю назад.
Стоит прочитать документы и в конце этого потока для нескольких предложений о том, как хранить данные.
Подробности, которые повлияют на то, как вы храните свои данные, например:
Дайте как можно больше деталей; и я могу помочь вам развить структуру.
Убедитесь, что установлен pandas не менее 0.10.1
.
Прочитайте итерирование файлов chunk-by-chunk и несколько табличных запросов.
Так как pytables оптимизирован для работы по строкам (именно это вы запрашиваете), мы создадим таблицу для каждой группы полей. Таким образом, легко выбрать небольшую группу полей (которая будет работать с большой таблицей, но это более эффективно сделать это таким образом... Я думаю, что я смогу исправить это ограничение в будущем... это более интуитивно понятный):
(Ниже приведен псевдокод.)
import numpy as np
import pandas as pd
# create a store
store = pd.HDFStore('mystore.h5')
# this is the key to your storage:
# this maps your fields to a specific group, and defines
# what you want to have as data_columns.
# you might want to create a nice class wrapping this
# (as you will want to have this map and its inversion)
group_map = dict(
A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
B = dict(fields = ['field_10',...... ], dc = ['field_10']),
.....
REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),
)
group_map_inverted = dict()
for g, v in group_map.items():
group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
Чтение в файлах и создание хранилища (по существу, выполнение append_to_multiple
):
for f in files:
# read in the file, additional options hmay be necessary here
# the chunksize is not strictly necessary, you may be able to slurp each
# file into memory in which case just eliminate this part of the loop
# (you can also change chunksize if necessary)
for chunk in pd.read_table(f, chunksize=50000):
# we are going to append to each table by group
# we are not going to create indexes at this time
# but we *ARE* going to create (some) data_columns
# figure out the field groupings
for g, v in group_map.items():
# create the frame for this group
frame = chunk.reindex(columns = v['fields'], copy = False)
# append it
store.append(g, frame, index=False, data_columns = v['dc'])
Теперь у вас есть все таблицы в файле (на самом деле вы могли бы хранить их в отдельных файлах, если хотите, вам нужно было бы добавить имя файла в group_map, но, вероятно, это не обязательно).
Вот как вы получаете столбцы и создаете новые:
frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
# select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows
# do calculations on this frame
new_frame = cool_function_on_frame(frame)
# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
Когда вы готовы к post_processing:
# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
О data_columns вам фактически не нужно определять ЛЮБОЕ data_columns; они позволяют вам подбирать строки на основе столбца. Например. что-то вроде:
store.select(group, where = ['field_1000=foo', 'field_1001>0'])
Они могут быть наиболее интересны для вас на этапе подготовки окончательного отчета (по существу, столбец данных отделен от других столбцов, что может повлиять на эффективность, если вы определите много).
Вы также можете захотеть:
Сообщите мне, когда у вас есть вопросы!
Я думаю, что в приведенных выше ответах отсутствует простой подход, который я нашел очень полезным.
Когда у меня есть файл, который слишком велик для загрузки в память, я разбиваю файл на несколько меньших файлов (по строке или столбцам)
Пример. В случае 30-дневной стоимости торговых данных размером ~ 30 ГБ я разбиваю его на файл в день размером ~ 1 ГБ. Затем я обрабатываю каждый файл отдельно и суммирую результаты в конце
Одно из самых больших преимуществ - это возможность параллельной обработки файлов (либо нескольких потоков, либо процессов)
Другим преимуществом является то, что манипулирование файлами (например, добавление/удаление дат в примере) может выполняться с помощью обычных команд оболочки, что невозможно в более сложных/сложных форматах файлов
Этот подход не охватывает все сценарии, но очень полезен для многих из них.
Если ваши наборы данных находятся между 1 и 20 ГБ, вы должны получить рабочую станцию с 48 ГБ оперативной памяти. Затем Pandas может хранить весь набор данных в ОЗУ. Я знаю, что это не тот ответ, который вы ищете здесь, но делать научные вычисления на ноутбуке с 4 ГБ ОЗУ не является разумным.
Теперь, через два года после вопроса, существует эквивалент "вне ядра" pandas: dask. Это отлично! Несмотря на то, что он не поддерживает все функции pandas, вы можете получить очень далеко от него.
Я знаю, что это старый поток, но я думаю, что библиотека Blaze стоит проверить. Он создан для этих типов ситуаций.
Из документов:
Blaze расширяет возможности использования NumPy и Pandas для распределенных и внеочередных вычислений. Blaze предоставляет интерфейс, аналогичный интерфейсу NumPy ND-Array или Pandas DataFrame, но сопоставляет эти знакомые интерфейсы с множеством других вычислительных движков, таких как Postgres или Spark.
Изменить:. Кстати, это поддерживается ContinuumIO и Трэвисом Олифантом, автором NumPy.
Это относится к пимонго. Я также прототипировал использование sql-сервера, sqlite, HDF, ORM (SQLAlchemy) в python. Прежде всего pymongo - это база данных на основе документов, поэтому каждый человек будет документом (dict
атрибутов). Многие люди составляют коллекцию, и у вас может быть множество коллекций (людей, фондовый рынок, доход).
pd.dateframe → pymongo Примечание: я использую chunksize
в read_csv
, чтобы сохранить его с 5 до 10k записей (pymongo сбрасывает сокет, если он больше)
aCollection.insert((a[1].to_dict() for a in df.iterrows()))
запрос: gt = больше, чем...
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
.find()
возвращает итератор, поэтому я обычно использую ichunked
для измельчения в меньшие итераторы.
Как насчет объединения, так как я обычно получаю 10 источников данных, чтобы вставить их вместе:
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
то (в моем случае иногда мне приходится сначала aJoinDF
нажимать aJoinDF
перед его "слиянием".)
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
Затем вы можете записать новую информацию в свою основную коллекцию с помощью метода обновления ниже. (логический сбор против физических источников данных).
collection.update({primarykey:foo},{key:change})
При меньших поисках просто денормализовать. Например, у вас есть код в документе, и вы просто добавляете текст кода поля и выполняете поиск dict
при создании документов.
Теперь у вас есть хороший набор данных, основанный на человеке, вы можете развязать свою логику в каждом случае и сделать больше атрибутов. Наконец, вы можете прочитать в pandas свои ключевые индикаторы с 3 по максимальную память и выполнить разведку/анализ/анализ данных. Это работает для меня для 3 миллионов записей с цифрами/большим текстом/категориями/кодами/float/...
Вы также можете использовать два метода, встроенные в MongoDB (MapReduce и агрегатную структуру). См. здесь дополнительную информацию об общей структуре, поскольку она кажется более простой, чем MapReduce, и выглядит удобной для быстрой работы агрегата. Заметьте, мне не нужно было определять свои поля или отношения, и я могу добавлять элементы в документ. В текущем состоянии быстро меняющегося numpy, pandas, набора инструментов python, MongoDB помогает мне просто работать:)
In [96]: test.insert((a[1].to_dict() for a in df.iterrows())) --------------- InvalidDocument: Cannot encode object: 0
. Есть идеи, что может быть не так? Мой dataframe состоит из всех типов int64 и очень прост.
Я заметил это немного позже, но я работаю с аналогичной проблемой (модели предоплаты ипотеки). Мое решение состояло в том, чтобы пропустить слой pandas HDFStore и использовать прямые pytables. Я сохраняю каждый столбец как отдельный массив HDF5 в своем конечном файле.
Мой основной рабочий процесс - сначала получить CSV файл из базы данных. Я gzip его, так что он не такой огромный. Затем я конвертирую это в файл HDF5, ориентированный на ряд, итерируя его в python, преобразовывая каждую строку в реальный тип данных и записывая ее в файл HDF5. Это занимает несколько десятков минут, но не использует память, поскольку работает только последовательно. Затем я "транспонирую" файл HDF5, ориентированный на строку, в файл HDF5, ориентированный на столбцы.
Транспортировка таблицы выглядит так:
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
# Get a reference to the input data.
tb = h_in.getNode(table_path)
# Create the output group to hold the columns.
grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
for col_name in tb.colnames:
logger.debug("Processing %s", col_name)
# Get the data.
col_data = tb.col(col_name)
# Create the output array.
arr = h_out.createCArray(grp,
col_name,
tables.Atom.from_dtype(col_data.dtype),
col_data.shape)
# Store the data.
arr[:] = col_data
h_out.flush()
Чтение обратно в этом случае выглядит следующим образом:
def read_hdf5(hdf5_path, group_path="/data", columns=None):
"""Read a transposed data set from a HDF5 file."""
if isinstance(hdf5_path, tables.file.File):
hf = hdf5_path
else:
hf = tables.openFile(hdf5_path)
grp = hf.getNode(group_path)
if columns is None:
data = [(child.name, child[:]) for child in grp]
else:
data = [(child.name, child[:]) for child in grp if child.name in columns]
# Convert any float32 columns to float64 for processing.
for i in range(len(data)):
name, vec = data[i]
if vec.dtype == np.float32:
data[i] = (name, vec.astype(np.float64))
if not isinstance(hdf5_path, tables.file.File):
hf.close()
return pd.DataFrame.from_items(data)
Теперь я обычно запускаю это на машине с тонны памяти, поэтому я не могу быть достаточно осторожным с использованием моей памяти. Например, по умолчанию операция загрузки считывает весь набор данных.
Это обычно работает для меня, но это немного неуклюже, и я не могу использовать волшебную магию pytables.
Изменить: реальным преимуществом этого подхода по сравнению с параметрами pitsables по умолчанию является то, что я могу затем загрузить данные в R, используя h5r, который не может обрабатывать таблицы. Или, по крайней мере, я не смог заставить его загружать гетерогенные таблицы.
Один трюк, который я нашел полезным для использования "больших данных", - это уменьшить объем данных, уменьшив точность поплавка до 32-бит. Это не применимо во всех случаях, но во многих приложениях 64-битная точность переполнена, и экономия памяти 2x стоит того. Сделать очевидную точку еще более очевидной:
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB
>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
Еще одна вариация
Многие операции, выполняемые в pandas, также могут выполняться как db-запрос (sql, mongo)
Использование RDBMS или mongodb позволяет выполнять некоторые из агрегатов в DB Query (который оптимизирован для больших данных и эффективно использует кеш и индексы)
Позже вы можете выполнить пост-обработку с помощью pandas.
Преимущество этого метода заключается в том, что вы получаете оптимизацию БД для работы с большими данными, но при этом все еще определяете логику в декларативном синтаксисе высокого уровня - и не должны иметь дело с деталями решения, что делать в памяти и что сделать из ядра.
И хотя язык запросов и pandas отличаются друг от друга, обычно не сложно перевести часть логики из одной в другую.
Как уже отмечали другие, через несколько лет появился "неосновный" панд-эквивалент: dask. Хотя dask не является заменой панд и всей его функциональностью, он выделяется по нескольким причинам:
Dask - это гибкая библиотека параллельных вычислений для аналитических вычислений, оптимизированная для динамического планирования задач для интерактивных вычислительных нагрузок коллекций Big Data, таких как параллельные массивы, фреймы данных и списки, которые расширяют общие интерфейсы, такие как итераторы NumPy, Pandas или Python, чем память или распределенные среды и шкалы от ноутбуков до кластеров.
Даск подчеркивает следующие достоинства:
- Знакомый: Предоставляет параллельный массив NumPy и объекты Pandas DataFrame
- Гибкость: обеспечивает интерфейс планирования задач для получения дополнительных пользовательских рабочих нагрузок и интеграции с другими проектами.
- Родной: разрешает распределенные вычисления на Pure Python с доступом к стеку PyData.
- Быстро: работает с низкими накладными расходами, низкой задержкой и минимальной сериализацией, необходимой для быстрого численного алгоритма
- Масштабирование: работает устойчиво на кластерах с 1000 ядрами. Масштабирование: тривиально настраивать и запускать на ноутбуке в одном процессе
- Отзывчивый: разработанный с учетом интерактивных вычислений, он обеспечивает быструю обратную связь и диагностику, чтобы помочь людям
и добавить простой пример кода:
import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
заменяет некоторый код pandas следующим образом:
import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
и, что особенно примечательно, обеспечивает через интерфейс concurrent.futures общий для представления пользовательских задач:
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()
Рассмотрим Ruffus, если вы проделаете простой путь создания конвейера данных, который разбит на несколько меньших файлов.
Здесь стоит упомянуть и Рэя,
это распределенная структура вычислений, которая имеет свою собственную реализацию для панд распределенным способом.
Просто замените импорт pandas, и код должен работать так:
# import pandas as pd
import ray.dataframe as pd
#use pd as usual
можно прочитать здесь подробнее:
Недавно я столкнулся с подобной проблемой. Я нашел просто чтение данных в кусках и добавление его, так как я пишу его в кусках для того же самого csv. Моя проблема заключалась в добавлении столбца даты на основе информации в другой таблице, используя значение определенных столбцов следующим образом. Это может помочь тем, кого смущает dask и hdf5, но более знакомым с pandas, как и я.
def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k
rows at a time and outputs them, appending as needed, to a single csv.
Uses the column of the raster names to get the date.
"""
df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True,
chunksize=100000) #read csv file as 100k chunks
'''Do some stuff'''
count = 1 #for indexing item in time list
for chunk in df: #for each 100k rows
newtime = [] #empty list to append repeating times for different rows
toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
while count <= toiterate.max():
for i in toiterate:
if i ==count:
newtime.append(newyears[count])
count+=1
print "Finished", str(chunknum), "chunks"
chunk["time"] = newtime #create new column in dataframe based on time
outname = "CHIRPS_tanz_time2.csv"
#append each output to same csv, using no header
chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)