Это продолжение этого вопроса.
Таким образом, RxJS вызывает мою асинхронную функцию со следующим событием, которое предшествовало завершению предыдущего, как только я жду в своем асинхронном методе.
Мне нужно сериализовать вызовы этой асинхронной функции.
Из этого ответа на аналогичный вопрос я понял, что мне нужно убрать мою асинхронную функцию от подписки и использовать concatMap.
Прямо сейчас мой код не компилируется со следующей ошибкой:
ошибка TS2339: свойство 'concatMap' не существует для типа 'Observable'.
Мой код (пытается настроить):
1/новый код подписки (не компилируется):
this.emitter = fromEventPattern(this.addHandler, this.removeHandler, (err, char) => [err, char]); <= unchanged
this.rxSubscription = this.rxSubscription = this.emitter.concatMap(value:any => this.handleUpdatedValuesComingFromSensor(value)).subscribe(); <= concatMap does not exist on type Observable<any>
2/асинхронная функция для вашей информации:
handleUpdatedValuesComingFromSensor = async (arr: any[]): Promise<void> => {
...
await someMethodAsync();
...
}
concatMap должен использоваться на другом типе источника, но я не могу понять это.
Заранее спасибо.
Как заявил Кос, в Rxjs v6 операторы работы с каналами стали нормой, отойдя от объединения всего вместе с .
, Я предполагаю, что, поскольку вы используете fromEventPattern
, вместо Observable.fromEventPattern
что вы используете rxjs v6+, в этом случае вам нужно обернуть concatMap()
внутри pipe()
.
this.rxSubscription = this.emitter.pipe(concatMap(value:any => this.handleUpdatedValuesComingFromSensor(value))).subscribe()
import { concatMap } from 'rxjs/operators'
затемthis.emitter.pipe( concatMap() ).subscribe()
. Это при условии, что у вас есть rxjs v.^6.0.0
. Опять же, пример использования pipe и concatMap с обещанием