Низкая производительность обработки пары СДР с сильно искаженными данными

1

У меня есть пара RDD с миллионами пар ключ-значение, где каждое значение представляет собой список, который может содержать один элемент или миллиарды элементов. Это приводит к низкой производительности, поскольку большие группы будут блокировать узлы кластера в течение нескольких часов, тогда как группы, которые занимают несколько секунд, не могут обрабатываться параллельно, поскольку весь кластер уже занят.

Есть ли способ улучшить это?

РЕДАКТИРОВАТЬ:

Операция, которая вызывает у меня проблемы, представляет собой flatMap где flatMap весь список для данного ключа. Ключ не тронут, и операция сравнивает каждый элемент в списке с остальной частью списка, что занимает огромное количество времени, но, к сожалению, это нужно сделать. Это означает, что список WHOLE должен находиться в одном и том же узле одновременно. В результате RDD будет содержать подсписку в зависимости от значения, вычисленного в flatMap.

Я не могу использовать широковещательные переменные в этом случае, так как между разными парами ключ-значение не будут использоваться общие данные. Что касается разделителя, то, согласно книге O'Reilly Learning Spark, такая операция не принесет пользы от разделителя, так как не задействована никакая перетасовка (хотя я не уверен, что это правда). Может ли секвенсор помочь в этой ситуации?

ВТОРОЙ РЕДАКТИРОВАНИЕ:

Это пример моего кода:

public class MyFunction implements FlatMapFunction
    <Tuple2<String, Iterable<Bean>>, ComparedPerson>  {


public Iterable<ProcessedBean> call(Tuple2<Key, Iterable<Bean>> input) throws Exception {
    List<ProcessedBean> output = new ArrayList<ProcessedBean>();
    List<Bean> listToProcess = CollectionsUtil.makeList(input._2());

    // In some cases size == 2, in others size > 100.000
    for (int i = 0; i < listToProcess.size() - 1; i++) {
        for (int j = i + 1; j < listToProcess.size(); j++) {
            ProcessedBean processed = processData(listToProcess.get(i), listToProcess.get(j));

            if (processed != null) {
                output.add(processed);
            }
        }
    }

    return output;
}

Двойной для будет цикл n(n-1)/2 раза, но этого нельзя избежать.

  • 0
    Вы можете попробовать отобразить (k, longValueSeq) => Seq [(k, singleV]), где дальнейшая обработка будет равномерной.
  • 0
    не могли бы вы добавить код к вопросу? После прочтения вашей правки кажется, что могут быть и другие варианты для достижения того же самого.
Показать ещё 6 комментариев
Теги:
apache-spark
rdd

3 ответа

1
Лучший ответ

Если "processData" стоит дорого, возможно, вы можете распараллелить этот шаг и получить там некоторый выигрыш.

В псевдокоде это будет что-то вроде:

def processData(bean1:Bean, bean2:Bean):Option[ProcessedData] = { ... }

val rdd:RDD[(Key, List[Bean])] = ...

val pairs:RDD[(Bean, Bean)] = rdd.flatMap((key, beans) => {
    val output = mutable.List[ProcessedBean]()
    val len = beans.length
    for (var i=0; i < len - 1; i++) {
        for (var j=i+1; j < len; j++) {
            output.add((beans(i), beans(j)))
        }
    }
    output
}).repartition(someNumber)

val result:RDD[ProcessedBean] = pairs
    .map(beans => processData(beans._1, beans._2))
    .filter(_.isDefined)
    .map(_.get)

Шаг FlatMap по-прежнему будет ограничен вашим самым большим списком, и при переделке вы будете тасовать, но перемещение этапа processData за пределы этого шага N ^ 2 может привести к некоторому параллелизму.

  • 0
    Это может сработать, поскольку шаг processData действительно дорог. Однако, если я создам новый RDD со всеми парами, размер этого RDD может оказаться слишком большим ...
4

Порядок обработки ключей не влияет на общее время вычислений. Единственная проблема из-за отклонения (некоторые значения небольшие, другие большие). Я могу себе представить, что в конце обработки: одна большая задача все еще работает, пока все остальные узлы уже завершены.

Если это то, что вы видите, вы можете попытаться увеличить количество разделов. Это уменьшит размер задач, поэтому более высокая задача в конце будет менее вероятной.

Широковещательные переменные и разделители не помогут в производительности. Я думаю, вы должны сосредоточиться на том, чтобы сделать шаг сравнения "все к цели" максимально эффективным. (Или еще лучше, избегайте этого. Я не думаю, что квадратичные алгоритмы действительно устойчивы в больших данных.)

  • 1
    То, что вы только что описали, является именно нашей главной проблемой. Вы рекомендуете использовать repartition или coalesce ?
  • 0
    Неважно: repartition просто вызывает coalesce :) (см. Код ). Лучше всего, если вы сможете избежать repartition / coalesce и создать RDD с большим количеством разделов с самого начала.
1

Скос как это часто зависит от домена. Вы можете создать свои данные о ценности как RDD и присоединиться к нему. Или вы можете попробовать использовать широковещательные переменные. Или вы можете написать пользовательский разделитель, который может помочь разделить данные по-разному.

Но, в конечном счете, это будет зависеть от вычислений и особенностей данных.

  • 0
    Спасибо за ваш ответ, но второй подход не будет работать в моем случае. Что касается первого, я не уверен, что вы имеете в виду, и я не уверен, как использовать разделитель в этом случае. Пожалуйста, смотрите мое редактирование.

Ещё вопросы

Сообщество Overcoder
Наверх
Меню