Несовместимые моменты времени при передаче данных между двумя потоками

0

У меня есть часть кода, которую я использую для тестирования различных контейнеров (например, deque и круговой буфер) при передаче данных от производителя (поток 1) потребителю (поток 2). Данные представлены структурой с парой временных меток. Первая временная метка берется перед нажатием на продюсер, а вторая берется, когда данные выталкиваются потребителем. Контейнер защищен спин-блокировкой pthread.

Машина запускает redhat 5.5 с ядром 2.6.18 (старое!), Это 4-ядерная система с отключением hyperthreading. gcc 4.7 с флагом -std = С++ 11 использовался во всех тестах.

Производитель получает блокировку, маркирует данные и помещает их в очередь, разблокирует и спит в цикле занятости в течение 2 микросекунд (единственный надежный способ, с которым я нашел спать ровно 2 микрона в этой системе).

Потребительские блокировки, всплывающие данные, отметки времени и генерируют некоторую статистику (средняя задержка и стандартное отклонение). Статистика печатается каждые 5 секунд (M - среднее значение, M2 - std dev) и сброс. Я использовал gettimeofday() для получения временных меток, а это означает, что средний номер задержки можно рассматривать как процент задержек, превышающих 1 микросекунду.

В большинстве случаев вывод выглядит следующим образом:

    CNT=2500000 M=0.00935 M2=0.910238
    CNT=2500000 M=0.0204112 M2=1.57601
    CNT=2500000 M=0.0045016 M2=0.372065

но иногда (вероятно, 1 испытание из 20) вот так:

    CNT=2500000 M=0.523413 M2=4.83898
    CNT=2500000 M=0.558525 M2=4.98872
    CNT=2500000 M=0.581157 M2=5.05889

(обратите внимание, что среднее число намного хуже, чем в первом случае, и оно никогда не восстанавливается по мере запуска программы).

Я был бы признателен за мысли о том, почему это может произойти. Благодарю.

#include <iostream>
#include <string.h>
#include <stdexcept>
#include <sys/time.h>
#include <deque>
#include <thread>
#include <cstdint>
#include <cmath>
#include <unistd.h>
#include <xmmintrin.h> // _mm_pause()

int64_t timestamp() {
    struct timeval tv;
    gettimeofday(&tv, 0);
    return 1000000L * tv.tv_sec + tv.tv_usec;
}

//running mean and a second moment
struct StatsM2 {
    StatsM2() {}
    double m = 0;
    double m2 = 0;
    long count = 0;
    inline void update(long x, long c) {
        count = c;
        double delta = x - m;
        m += delta / count;
        m2 += delta * (x - m);
    }
    inline void reset() {
        m = m2 = 0;
        count = 0;
    }
    inline double getM2() { // running second moment
        return (count > 1) ? m2 / (count - 1) : 0.;
    }
    inline double getDeviation() {
        return std::sqrt(getM2() );
    }
    inline double getM() { // running mean
        return m;
    }
};

// pause for usec microseconds using busy loop
int64_t busyloop_microsec_sleep(unsigned long usec) {
    int64_t t, tend;
    tend = t = timestamp();
    tend += usec;
    while (t < tend) {
        t = timestamp();
    }
    return t;
}

struct Data {
    Data() : time_produced(timestamp() ) {}
    int64_t time_produced;
    int64_t time_consumed;
};

int64_t sleep_interval = 2;
StatsM2 statsm2;
std::deque<Data> queue;
bool producer_running = true;
bool consumer_running = true;
pthread_spinlock_t spin;

void producer() {
    producer_running = true;
    while(producer_running) {
        pthread_spin_lock(&spin);
        queue.push_back(Data() );
        pthread_spin_unlock(&spin);
        busyloop_microsec_sleep(sleep_interval);
    }
}

void consumer() {
    int64_t count = 0;
    int64_t print_at = 1000000/sleep_interval * 5;
    Data data;
    consumer_running = true;
    while (consumer_running) {
        pthread_spin_lock(&spin);
        if (queue.empty() ) {
            pthread_spin_unlock(&spin);
            // _mm_pause();
            continue;
        }
        data = queue.front();
        queue.pop_front();
        pthread_spin_unlock(&spin);
        ++count;
        data.time_consumed = timestamp();
        statsm2.update(data.time_consumed - data.time_produced, count);
        if (count >= print_at) {
            std::cerr << "CNT=" << count << " M=" << statsm2.getM() << " M2=" << statsm2.getDeviation() << "\n";
            statsm2.reset();
            count = 0;
        }
    }
}

int main(void) {
    if (pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE) < 0)
        exit(2);
    std::thread consumer_thread(consumer);
    std::thread producer_thread(producer);
    sleep(40);
    consumer_running = false;
    producer_running = false;
    consumer_thread.join();
    producer_thread.join();
    return 0;
}
  • 0
    Хороший комнатный обогреватель.
  • 0
    Попробуйте записать полный набор данных, которые составляют статистику, чтобы отследить это. Вполне возможно, что вы получите их из-за мусора в данных. Несмотря на то, что очередь не является поточно-ориентированной, на первый взгляд она не должна быть volatile - поскольку замок должен создавать барьеры. Тем не менее, возможно, компилятор все еще перемещает переменную - трудно сказать, не глядя на разборку. Отметьте dequeue volatile чтобы увидеть, поможет ли это.
Показать ещё 2 комментария
Теги:
queue
pthreads
spinlock

2 ответа

1

РЕДАКТИРОВАТЬ:
Я считаю, что 5 ниже - единственное, что может объяснить задержку в 1/2 секунды. Когда на одном ядре каждый будет работать в течение длительного времени и только затем переключиться на другой.
Остальные вещи в списке слишком малы, чтобы вызвать задержку в 1/2 секунды.
Вы можете использовать pthread_setaffinity_np для привязки ваших потоков к определенным ядрам. Вы можете попробовать разные комбинации и посмотреть, как меняется производительность.

РЕДАКТИРОВАТЬ № 2:
Больше вещей, о которых вам следует позаботиться: (кто сказал, что тестирование было простым...)
1. Убедитесь, что потребитель уже работает, когда производитель начинает производить. Не слишком важно в вашем случае, так как производитель действительно не работает в жесткой петле.
2. Это очень важно: вы делите по счету каждый раз, что не подходит для вашей статистики. Это означает, что первое измерение в каждом окне окна статистики намного больше, чем последнее. Чтобы измерить медиану, вы должны собрать все значения. Измерение среднего и min/max, не собирая все числа, должно дать вам достаточно хорошую картину задержки.


На самом деле это не удивительно.
1. Время берется в Data(), но затем контейнер тратит время на вызов malloc.
2. Вы используете 64 бит или 32? В 32-битном gettimeofday - системный вызов, а в 64-битном - VDSO, который не попадает в ядро ... вы можете захотеть время gettimeofday и записать дисперсию. Или зарегистрируйтесь самостоятельно, используя rdtsc.
Лучше всего было бы использовать циклы вместо микронов, потому что микроны действительно слишком велики для этого сценария... только округление до микронов очень сильно искажает вас при работе с такими небольшими масштабами вещей
3. Гарантированы ли вы, что вы не можете быть пресечены между производителем и потребителем? Наверное, нет. Но это не должно происходить очень часто на ящике, посвященном тестированию...
4. Является ли это 4 ядра на одном разъеме или 2? если это 2 гнездовая коробка, вы хотите иметь 2 потока в одном и том же сокете, или вы платите (по крайней мере) двойную для передачи данных.
5. Убедитесь, что нити не работают на одном сердечнике.
6. Если данные, которые вы передаете, и дополнительные данные (контейнерный узел) разделяют линии кэша (скорее всего) с другим узлом Data +, производитель будет задержан потребителем, когда он будет записываться в использованную временную метку. Это называется ложным совместным использованием. Вы можете устранить это путем добавления/выравнивания до 64 байтов и использования интрузивного контейнера.

0

gettimeofday не является хорошим способом профилировать накладные расходы. Настенные часы и ваш компьютер многопроцессорны. Даже если вы думаете, что у вас больше ничего не работает, у планировщика ОС всегда есть другие действия, чтобы поддерживать работу системы. Чтобы профилировать накладные расходы процесса, вы должны как минимум повысить приоритет процесса, который вы профилируете. Также используйте таймер с высоким разрешением или тики процессора, чтобы выполнить измерение времени.

Ещё вопросы

Сообщество Overcoder
Наверх
Меню