Лучший способ реализовать условный барьер потока

1

Задача выглядит так. У нас есть куча потоков, которые запускают некоторый метод. Его нормально запускать его одновременно, но только если некоторые условия заполнены. Если нет - поток должен ждать.

Вот пример того, о чем я говорю.

У нас есть трудоемкий метод doStuff, который принимает экземпляр Key в качестве параметра. Он работает нормально в многопоточном окружении, только если предоставленные ключи не равны. Он не работает, если одновременно обрабатываются два равных ключа. Нам нужно написать код, который останавливает потоки с равными ключами от вызова этого метода одновременно. Я написал три реализации: через ConcurrentHashMap с этими ключами, через AtomicIntegerArray ключевых индексов и через простой синхронизированный блок, который анализирует набор ключей под процесс.

public class KeyProblem {

    static class Key {

        private int index;

        Key() {
            this.index = (int) (Math.random() * 10) % 10;
        }

        public int getIndex() {
            return index;
        }

        @Override
        public String toString() {
            return "Key{" +
                    "index=" + index +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;

            Key key = (Key) o;

            if (index != key.index) return false;

            return true;
        }

        @Override
        public int hashCode() {
            return index;
        }
    }

    private static ConcurrentHashMap<Key, Object> keysProcessedRightNowForCheck = new ConcurrentHashMap<Key, Object>();

    public static void doStuff(Key key) {
        Object sentinel = keysProcessedRightNowForCheck.putIfAbsent(key, new Object());
        if (sentinel != null) {
            System.out.println("ERROR! Equal keys! " + key + " " + Thread.currentThread().getName());
        }
        try {
            System.out.println(String.format("   started by %s with %s ", Thread.currentThread().getName(), key));
            Thread.sleep(500);
            System.out.println(String.format("   finished by %s with %s ", Thread.currentThread().getName(), key));
        } catch (InterruptedException e) {
        }
        keysProcessedRightNowForCheck.remove(key);
    }

    //first version: via ConcurrentHashMap

    private static ConcurrentHashMap map = new ConcurrentHashMap();
    private static Object waiter = new Object();

    public static void viaConcurrentHashMap(Key key) throws InterruptedException {
        while (map.putIfAbsent(key, new Object()) != null) {
            synchronized (waiter) {
                System.out.println("wait with key " + key);
                waiter.wait();
                System.out.println("done waiting with key " + key);
            }
        }
        System.out.println("started stuff with " + key);
        doStuff(key);
        map.remove(key);
        synchronized (waiter) {
            System.out.println("notified after stuff with " + key);
            waiter.notifyAll();
            System.out.println("done waiting with key " + key);
        }
    }

    //second version: via AtomicIntegerArray for a fixed number of keys

    private static AtomicIntegerArray keyProcessed = new AtomicIntegerArray(10);

    public static void viaAtomicIntegerArray(Key key) throws InterruptedException {

        while (!keyProcessed.compareAndSet(key.getIndex(), 0, 1)) {
            synchronized (waiter) {
                System.out.println("wait with key " + key);
                waiter.wait();
            }
        }

        doStuff(key);
        keyProcessed.decrementAndGet(key.getIndex());

        synchronized (waiter) {
            System.out.println("notified after stuff with " + key);
            waiter.notifyAll();
        }

    }

    //third version: via a simple lock

    private static Object lock = new Object();
    private static Set<Key> keys = new HashSet<Key>();

    public static void viaSimpleSynchronized(Key key) throws InterruptedException {
        synchronized (lock) {
            while (keys.contains(key)) {
                lock.wait();
            }
            keys.add(key);
        }

        doStuff(key);
        synchronized (lock) {
            keys.remove(key);
            lock.notifyAll();
        }
    }

    private static CyclicBarrier barrier;

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

        final int MAX = 100;

        List<Key> keys = new ArrayList<Key>() {{
            for (int i = 0; i < MAX; i++) add(new Key());
        }};

        barrier = new CyclicBarrier(MAX + 1);

        long start = System.currentTimeMillis();

        for (final Key key : keys) {
            Thread t = new Thread() {
                public void run() {
                    try {
//                        viaConcurrentHashMap(key);
                        viaSimpleSynchronized(key);
//                        viaAtomicIntegerArray(key);
                        barrier.await();
                    } catch (InterruptedException e) {
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                    }

                }
            };
            t.start();
        }

        barrier.await();
        System.out.println("system time [ms] " + (System.currentTimeMillis() - start));
        //7 for array
    }
}

Для 100 потоков время работы составляет около 7 с, для третьей версии - немного медленнее, как ожидалось.

Наконец, вопросы:

1) Является ли мой код правильным и потокобезопасным?

2) Можете ли вы предложить лучшую реализацию?

3) Есть ли какие-то классы в java.util.concurrent, которые решают эту задачу обобщенным образом? Я имею в виду какой-то барьер, который пропускает поток только в том случае, если выполняется какое-то условие.

  • 0
    Сначала не используйте rawtypes . Вы имеете в виду, что вы не можете работать с двумя одинаковыми ключами одновременно ? То есть, как только обработка с ключом заканчивается, он может начать снова с предыдущего ключа?
  • 0
    да, два потока не могут обрабатывать один и тот же ключ одновременно. после завершения какой-либо обработки может начаться любая другая обработка. когда некоторые обработки не завершены - могут обрабатываться только ДРУГИЕ ключи.
Теги:
concurrency
concurrenthashmap
java.util.concurrent

2 ответа

2
Лучший ответ

Если вы можете позволить себе хранить все ключи в памяти одновременно и как одиночные (по крайней мере, то, как работает ваш пример), то кажется, что очень простое решение было бы следующим:

  1. выберите соответствующий ключ;
  2. выполнить логику внутри synchronized (key) {} блока.
0

Ваш метод viaSimpleSynchronized является единственным, который не нарушен. Остальные два страдают от одной и той же ошибки: вы предполагаете, что выполнение действия и вхождение в synchronized блок впоследствии тесно связаны друг с другом, в то время как на самом деле они являются отдельными, несвязанными действиями.

Например:

while (map.putIfAbsent(key, new Object()) != null) {
    // in-between right at this point the other thread can do both, remove
    // the key AND execute its synchronized(waiter) { waiter.notifyAll(); }
    synchronized (waiter) {
        System.out.println("wait with key " + key);
        waiter.wait(); // therefore this can enter a wait lasting forever
        System.out.println("done waiting with key " + key);
    }
}

Вы не заметили такую ошибку с вашими испытаниями с большим количеством потоков, выполняющих множество операций, так как каждый неконтактный поток освобождает все ожидающие потоки при вызове функции notifyAll() скрывающей ошибку, когда потоки ожидают уведомления, которое уже произошло.

Итак, здесь у нас есть код, который может неожиданно нарушаться, когда мало активности.

Как правило, когда вы выполняете проверку вне synchronized блока, даже если он использует поточно-безопасную конструкцию, такую как ConcurrentHashMap, вы должны повторно проверить условие в synchronized блоке.

Единственным исключением являются условия, которые, как известно, никогда не возвращаются (например, флаг "готовый", который будет идти один раз с false на true но никогда не возвращается к false).

Ещё вопросы

Сообщество Overcoder
Наверх
Меню