Задача выглядит так. У нас есть куча потоков, которые запускают некоторый метод. Его нормально запускать его одновременно, но только если некоторые условия заполнены. Если нет - поток должен ждать.
Вот пример того, о чем я говорю.
У нас есть трудоемкий метод doStuff, который принимает экземпляр Key в качестве параметра. Он работает нормально в многопоточном окружении, только если предоставленные ключи не равны. Он не работает, если одновременно обрабатываются два равных ключа. Нам нужно написать код, который останавливает потоки с равными ключами от вызова этого метода одновременно. Я написал три реализации: через ConcurrentHashMap с этими ключами, через AtomicIntegerArray ключевых индексов и через простой синхронизированный блок, который анализирует набор ключей под процесс.
public class KeyProblem {
static class Key {
private int index;
Key() {
this.index = (int) (Math.random() * 10) % 10;
}
public int getIndex() {
return index;
}
@Override
public String toString() {
return "Key{" +
"index=" + index +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key = (Key) o;
if (index != key.index) return false;
return true;
}
@Override
public int hashCode() {
return index;
}
}
private static ConcurrentHashMap<Key, Object> keysProcessedRightNowForCheck = new ConcurrentHashMap<Key, Object>();
public static void doStuff(Key key) {
Object sentinel = keysProcessedRightNowForCheck.putIfAbsent(key, new Object());
if (sentinel != null) {
System.out.println("ERROR! Equal keys! " + key + " " + Thread.currentThread().getName());
}
try {
System.out.println(String.format(" started by %s with %s ", Thread.currentThread().getName(), key));
Thread.sleep(500);
System.out.println(String.format(" finished by %s with %s ", Thread.currentThread().getName(), key));
} catch (InterruptedException e) {
}
keysProcessedRightNowForCheck.remove(key);
}
//first version: via ConcurrentHashMap
private static ConcurrentHashMap map = new ConcurrentHashMap();
private static Object waiter = new Object();
public static void viaConcurrentHashMap(Key key) throws InterruptedException {
while (map.putIfAbsent(key, new Object()) != null) {
synchronized (waiter) {
System.out.println("wait with key " + key);
waiter.wait();
System.out.println("done waiting with key " + key);
}
}
System.out.println("started stuff with " + key);
doStuff(key);
map.remove(key);
synchronized (waiter) {
System.out.println("notified after stuff with " + key);
waiter.notifyAll();
System.out.println("done waiting with key " + key);
}
}
//second version: via AtomicIntegerArray for a fixed number of keys
private static AtomicIntegerArray keyProcessed = new AtomicIntegerArray(10);
public static void viaAtomicIntegerArray(Key key) throws InterruptedException {
while (!keyProcessed.compareAndSet(key.getIndex(), 0, 1)) {
synchronized (waiter) {
System.out.println("wait with key " + key);
waiter.wait();
}
}
doStuff(key);
keyProcessed.decrementAndGet(key.getIndex());
synchronized (waiter) {
System.out.println("notified after stuff with " + key);
waiter.notifyAll();
}
}
//third version: via a simple lock
private static Object lock = new Object();
private static Set<Key> keys = new HashSet<Key>();
public static void viaSimpleSynchronized(Key key) throws InterruptedException {
synchronized (lock) {
while (keys.contains(key)) {
lock.wait();
}
keys.add(key);
}
doStuff(key);
synchronized (lock) {
keys.remove(key);
lock.notifyAll();
}
}
private static CyclicBarrier barrier;
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final int MAX = 100;
List<Key> keys = new ArrayList<Key>() {{
for (int i = 0; i < MAX; i++) add(new Key());
}};
barrier = new CyclicBarrier(MAX + 1);
long start = System.currentTimeMillis();
for (final Key key : keys) {
Thread t = new Thread() {
public void run() {
try {
// viaConcurrentHashMap(key);
viaSimpleSynchronized(key);
// viaAtomicIntegerArray(key);
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
};
t.start();
}
barrier.await();
System.out.println("system time [ms] " + (System.currentTimeMillis() - start));
//7 for array
}
}
Для 100 потоков время работы составляет около 7 с, для третьей версии - немного медленнее, как ожидалось.
Наконец, вопросы:
1) Является ли мой код правильным и потокобезопасным?
2) Можете ли вы предложить лучшую реализацию?
3) Есть ли какие-то классы в java.util.concurrent, которые решают эту задачу обобщенным образом? Я имею в виду какой-то барьер, который пропускает поток только в том случае, если выполняется какое-то условие.
Если вы можете позволить себе хранить все ключи в памяти одновременно и как одиночные (по крайней мере, то, как работает ваш пример), то кажется, что очень простое решение было бы следующим:
synchronized (key) {}
блока. Ваш метод viaSimpleSynchronized
является единственным, который не нарушен. Остальные два страдают от одной и той же ошибки: вы предполагаете, что выполнение действия и вхождение в synchronized
блок впоследствии тесно связаны друг с другом, в то время как на самом деле они являются отдельными, несвязанными действиями.
Например:
while (map.putIfAbsent(key, new Object()) != null) {
// in-between right at this point the other thread can do both, remove
// the key AND execute its synchronized(waiter) { waiter.notifyAll(); }
synchronized (waiter) {
System.out.println("wait with key " + key);
waiter.wait(); // therefore this can enter a wait lasting forever
System.out.println("done waiting with key " + key);
}
}
Вы не заметили такую ошибку с вашими испытаниями с большим количеством потоков, выполняющих множество операций, так как каждый неконтактный поток освобождает все ожидающие потоки при вызове функции notifyAll()
скрывающей ошибку, когда потоки ожидают уведомления, которое уже произошло.
Итак, здесь у нас есть код, который может неожиданно нарушаться, когда мало активности.
Как правило, когда вы выполняете проверку вне synchronized
блока, даже если он использует поточно-безопасную конструкцию, такую как ConcurrentHashMap
, вы должны повторно проверить условие в synchronized
блоке.
Единственным исключением являются условия, которые, как известно, никогда не возвращаются (например, флаг "готовый", который будет идти один раз с false
на true
но никогда не возвращается к false
).