Ошибка вызова updateStateByKey в Spark Streaming

1

У меня этот общий метод в Scala

def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S],
Optional[S]])   : JavaPairDStream[K, S] = { ... }

Когда я называю это на Java, оба они не скомпилируются:

1

JavaPairDStream<String, Integer> stateDstream =
pairs.<Integer>updateStateByKey(...);

2

JavaPairDStream<String, Integer> stateDstream =
pairs.updateStateByKey(...);

Как правильно вызвать метод?

Сообщения об ошибках:

The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>,
int) in the type JavaPairDStream<String,Integer> is not applicable for
the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>,
HashPartitioner, JavaPairRDD<String,Integer>)

Отредактировано: весь вызов функции (Java 8):

final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
    (values, state) -> {
      Integer newSum = state.or(0);
      for (Integer value : values) {
        newSum += value;
      }
      return Optional.of(newSum);
    };



JavaPairDStream<String, Integer> stateDstream = pairs.updateStateByKey(
    updateFunction
    ,
    new HashPartitioner(context.defaultParallelism()), initialRDD);

Отредактировано: выяснилось, что генерики не являются проблемой, но параметры не соответствуют сигнатуре метода.

  • 0
    И весь код (для вызовов функций в Java)! Тем не менее, первый путь, без сомнения, неверен
  • 0
    Обновлено сообщение об ошибке
Показать ещё 4 комментария
Теги:
apache-spark

1 ответ

1
Лучший ответ

Проблема в том, что вы передаете initialRDD, а метод updateStateByKey не имеет этого параметра.

Ближайшая подпись:

updateStateByKey[S](updateFunc: Function2[List[V], Optional[S], Optional[S]], 
  partitioner: Partitioner): JavaPairDStream[K, S] 

Ещё вопросы

Сообщество Overcoder
Наверх
Меню