Как я могу сообщить о ходе выполнения запроса PLINQ?

2

Я хотел бы сообщить о прогрессе от продолжительного запроса PLINQ.

Я не могу найти какой-либо собственный метод LINQ, который позволяет мне сделать это (как было реализовано для отмены).

Я прочитал эту статью, которая показывает аккуратную функцию расширения для регулярного сериализованного запроса.

Я тестировал поведение, используя приведенный ниже код.

var progress = new BehaviorSubject<int>(0);
DateTime start = DateTime.Now;
progress.Subscribe(x => { Console.WriteLine(x); });
Enumerable.Range(1,1000000)
    //.WithProgressReporting(i => progress.OnNext(i)) //Beginning Progress
    .AsParallel()
    .AsOrdered()
    //.WithProgressReporting(i => progress.OnNext(i)) //Middle Progress reporting
    .Select(v => { Thread.Sleep(1); return v * v; })
    //.WithProgressReporting(i => progress.OnNext(i)) //End Progress Reporting
    .ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");

Редактировать:
Сообщение о прогрессе из середины с использованием расширения IEnumerable<T> устраняет параллелизм.

Отчетность с конца не сообщает о каком-либо прогрессе, в то время как параллельные вычисления вычисляются, а затем быстро сообщает обо всем прогрессе в самом конце. Я предполагаю, что это прогресс компиляции результатов параллельных вычислений в список.

Первоначально я думал, что отчеты о прогрессе с самого начала заставляют LINQ работать беспараллельно. Поспав об этом и прочитав комментарии Питера Дунихо, я вижу, что на самом деле он работает параллельно, но я получаю столько отчетов о прогрессе, что их обработка приводит к значительному замедлению моего теста/приложения.

Существует ли способ, который является параллельным/поточно-безопасным, чтобы сообщать о прогрессе из PLINQ с шагом, позволяющим пользователю знать, что происходит прогресс, не оказывая существенного влияния на время выполнения метода?

  • 0
    Ваш вопрос не очень понятен. Почему WithProgressReporting() метод WithProgressReporting() вашим целям? Как правило, вы начинаете с IEnumerable<T> любом случае ... просто оберните исходный IEnumerable<T> вызовом WithProgressReporting() и вызовите AsParallel() для этого, как вы это делали в своих тестах. В конечном итоге пропускная способность будет одинаковой, независимо от того, сообщаете ли вы о прогрессе в источнике или результате. Вам нужно быть более конкретным: опубликовать минимальный воспроизводимый пример и точно объяснить, какой результат вы ожидаете, и что вы получаете вместо этого.
Теги:
linq
parallel-processing

2 ответа

1

Этот ответ может быть не таким элегантным, но он выполняет свою работу.

При использовании PLINQ ваша коллекция обрабатывается несколькими потоками, поэтому эти потоки используются для сообщения о результатах выполнения в нескольких (и не в порядке) отчетах о ходе выполнения, например, 0% 1% 5% 4% 3% и т.д.

Вместо этого мы можем использовать эти несколько потоков для обновления общей переменной, хранящей прогресс. В моем примере это локальная переменная completed. Затем мы порождаем другой поток, используя Task.Run() чтобы сообщать об этой переменной прогресса с интервалом 0,5 с.

Класс расширения:

static class Extensions
    public static ParallelQuery<T> WithProgressReporting<T>(this ParallelQuery<T> sequence, Action increment)
    {
        return sequence.Select(x =>
        {
            increment?.Invoke();
            return x;
        });
    }
}

Код:

static void Main(string[] args)
    {
        long completed = 0;
        Task.Run(() =>
        {
            while (completed < 100000)
            {
                Console.WriteLine((completed * 100 / 100000) + "%");
                Thread.Sleep(500);
            }
        });
        DateTime start = DateTime.Now;
        var output = Enumerable.Range(1, 100000)
            .AsParallel()
            .WithProgressReporting(()=>Interlocked.Increment(ref completed))
            .Select(v => { Thread.Sleep(1); return v * v; })
            .ToList();
        Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
        Console.ReadKey();
    }
0

Это расширение может быть расположено либо в начале, либо в конце запроса LINQ. Если позиционируется в начале, немедленно начнет сообщать о прогрессе, но ошибочно сообщит 100%, прежде чем работа будет выполнена Если позиционируется в конце, будет точно сообщать о конце запроса, но будет откладывать отчет о ходе выполнения до завершения первого элемента источника.

public static ParallelQuery<TSource> WithProgressReporting<TSource>(this ParallelQuery<TSource> source,
    long itemsCount, IProgress<double> progress)
{
    int countShared = 0;
    return source.Select(item =>
    {
        int countLocal = Interlocked.Increment(ref countShared);
        progress.Report(countLocal / (double)itemsCount);
        return item;
    });
}

Пример использования:

var progress = new Progress<double>(); // Progress captures the System.Threading.SynchronizationContext at construction.
progress.ProgressChanged += (object sender, double e) =>
{
    Console.WriteLine($"Progress: {e:0%}");
};
var numbers = Enumerable.Range(1, 10);
var sum = numbers
.AsParallel()
.WithDegreeOfParallelism(3)
.Select(n => { Thread.Sleep(500); return n; }) // Pretend we are doing something heavy
.WithProgressReporting(10, progress) // <--- the extension method
.Sum();
Console.WriteLine($"Sum: {sum}");

Выход:

Изображение 174551

Есть некоторые скачки, потому что иногда рабочие потоки вытесняют друг друга.

Класс System.Progress<T> имеет приятную функцию, которая вызывает событие ProgressChanged в захваченном контексте (обычно в потоке пользовательского интерфейса), поэтому элементы управления пользовательским интерфейсом можно безопасно обновлять. С другой стороны, в консольном приложении событие вызывается в ThreadPool, который, вероятно, будет полностью использован параллельным запросом, поэтому событие будет запускаться с некоторой задержкой (ThreadPool порождает новые потоки каждые 500 мсек). По этой причине я ограничил параллелизм до 3 в этом примере, чтобы сохранить свободную ветку для отчетов о прогрессе (у меня есть четырехъядерный компьютер).

Ещё вопросы

Сообщество Overcoder
Наверх
Меню