Я хотел бы сообщить о прогрессе от продолжительного запроса PLINQ.
Я не могу найти какой-либо собственный метод LINQ, который позволяет мне сделать это (как было реализовано для отмены).
Я прочитал эту статью, которая показывает аккуратную функцию расширения для регулярного сериализованного запроса.
Я тестировал поведение, используя приведенный ниже код.
var progress = new BehaviorSubject<int>(0);
DateTime start = DateTime.Now;
progress.Subscribe(x => { Console.WriteLine(x); });
Enumerable.Range(1,1000000)
//.WithProgressReporting(i => progress.OnNext(i)) //Beginning Progress
.AsParallel()
.AsOrdered()
//.WithProgressReporting(i => progress.OnNext(i)) //Middle Progress reporting
.Select(v => { Thread.Sleep(1); return v * v; })
//.WithProgressReporting(i => progress.OnNext(i)) //End Progress Reporting
.ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
Редактировать:
Сообщение о прогрессе из середины с использованием расширения IEnumerable<T>
устраняет параллелизм.
Отчетность с конца не сообщает о каком-либо прогрессе, в то время как параллельные вычисления вычисляются, а затем быстро сообщает обо всем прогрессе в самом конце. Я предполагаю, что это прогресс компиляции результатов параллельных вычислений в список.
Первоначально я думал, что отчеты о прогрессе с самого начала заставляют LINQ работать беспараллельно. Поспав об этом и прочитав комментарии Питера Дунихо, я вижу, что на самом деле он работает параллельно, но я получаю столько отчетов о прогрессе, что их обработка приводит к значительному замедлению моего теста/приложения.
Существует ли способ, который является параллельным/поточно-безопасным, чтобы сообщать о прогрессе из PLINQ с шагом, позволяющим пользователю знать, что происходит прогресс, не оказывая существенного влияния на время выполнения метода?
Этот ответ может быть не таким элегантным, но он выполняет свою работу.
При использовании PLINQ ваша коллекция обрабатывается несколькими потоками, поэтому эти потоки используются для сообщения о результатах выполнения в нескольких (и не в порядке) отчетах о ходе выполнения, например, 0% 1% 5% 4% 3% и т.д.
Вместо этого мы можем использовать эти несколько потоков для обновления общей переменной, хранящей прогресс. В моем примере это локальная переменная completed
. Затем мы порождаем другой поток, используя Task.Run()
чтобы сообщать об этой переменной прогресса с интервалом 0,5 с.
Класс расширения:
static class Extensions
public static ParallelQuery<T> WithProgressReporting<T>(this ParallelQuery<T> sequence, Action increment)
{
return sequence.Select(x =>
{
increment?.Invoke();
return x;
});
}
}
Код:
static void Main(string[] args)
{
long completed = 0;
Task.Run(() =>
{
while (completed < 100000)
{
Console.WriteLine((completed * 100 / 100000) + "%");
Thread.Sleep(500);
}
});
DateTime start = DateTime.Now;
var output = Enumerable.Range(1, 100000)
.AsParallel()
.WithProgressReporting(()=>Interlocked.Increment(ref completed))
.Select(v => { Thread.Sleep(1); return v * v; })
.ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
Console.ReadKey();
}
Это расширение может быть расположено либо в начале, либо в конце запроса LINQ. Если позиционируется в начале, немедленно начнет сообщать о прогрессе, но ошибочно сообщит 100%, прежде чем работа будет выполнена Если позиционируется в конце, будет точно сообщать о конце запроса, но будет откладывать отчет о ходе выполнения до завершения первого элемента источника.
public static ParallelQuery<TSource> WithProgressReporting<TSource>(this ParallelQuery<TSource> source,
long itemsCount, IProgress<double> progress)
{
int countShared = 0;
return source.Select(item =>
{
int countLocal = Interlocked.Increment(ref countShared);
progress.Report(countLocal / (double)itemsCount);
return item;
});
}
Пример использования:
var progress = new Progress<double>(); // Progress captures the System.Threading.SynchronizationContext at construction.
progress.ProgressChanged += (object sender, double e) =>
{
Console.WriteLine($"Progress: {e:0%}");
};
var numbers = Enumerable.Range(1, 10);
var sum = numbers
.AsParallel()
.WithDegreeOfParallelism(3)
.Select(n => { Thread.Sleep(500); return n; }) // Pretend we are doing something heavy
.WithProgressReporting(10, progress) // <--- the extension method
.Sum();
Console.WriteLine($"Sum: {sum}");
Выход:
Есть некоторые скачки, потому что иногда рабочие потоки вытесняют друг друга.
Класс System.Progress<T>
имеет приятную функцию, которая вызывает событие ProgressChanged
в захваченном контексте (обычно в потоке пользовательского интерфейса), поэтому элементы управления пользовательским интерфейсом можно безопасно обновлять. С другой стороны, в консольном приложении событие вызывается в ThreadPool, который, вероятно, будет полностью использован параллельным запросом, поэтому событие будет запускаться с некоторой задержкой (ThreadPool порождает новые потоки каждые 500 мсек). По этой причине я ограничил параллелизм до 3 в этом примере, чтобы сохранить свободную ветку для отчетов о прогрессе (у меня есть четырехъядерный компьютер).
WithProgressReporting()
методWithProgressReporting()
вашим целям? Как правило, вы начинаете сIEnumerable<T>
любом случае ... просто оберните исходныйIEnumerable<T>
вызовомWithProgressReporting()
и вызовитеAsParallel()
для этого, как вы это делали в своих тестах. В конечном итоге пропускная способность будет одинаковой, независимо от того, сообщаете ли вы о прогрессе в источнике или результате. Вам нужно быть более конкретным: опубликовать минимальный воспроизводимый пример и точно объяснить, какой результат вы ожидаете, и что вы получаете вместо этого.