У меня этот общий метод в Scala
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S],
Optional[S]]) : JavaPairDStream[K, S] = { ... }
Когда я называю это на Java, оба они не скомпилируются:
JavaPairDStream<String, Integer> stateDstream =
pairs.<Integer>updateStateByKey(...);
JavaPairDStream<String, Integer> stateDstream =
pairs.updateStateByKey(...);
Как правильно вызвать метод?
Сообщения об ошибках:
The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>,
int) in the type JavaPairDStream<String,Integer> is not applicable for
the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>,
HashPartitioner, JavaPairRDD<String,Integer>)
Отредактировано: весь вызов функции (Java 8):
final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = state.or(0);
for (Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
};
JavaPairDStream<String, Integer> stateDstream = pairs.updateStateByKey(
updateFunction
,
new HashPartitioner(context.defaultParallelism()), initialRDD);
Отредактировано: выяснилось, что генерики не являются проблемой, но параметры не соответствуют сигнатуре метода.
Проблема в том, что вы передаете initialRDD
, а метод updateStateByKey
не имеет этого параметра.
Ближайшая подпись:
updateStateByKey[S](updateFunc: Function2[List[V], Optional[S], Optional[S]],
partitioner: Partitioner): JavaPairDStream[K, S]