Расширение java ThreadLocal для сброса значений во всех потоках

1

После просмотра this question, я думаю, что хочу обернуть ThreadLocal, чтобы добавить поведение reset.

Я хочу иметь что-то похожее на ThreadLocal, с помощью метода, который я могу вызывать из любого потока, чтобы вернуть все значения обратно к одному и тому же значению. Пока у меня есть это:

public class ThreadLocalFlag {

    private ThreadLocal<Boolean> flag;
    private List<Boolean> allValues = new ArrayList<Boolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<Boolean>() {
            @Override protected Boolean initialValue() {
                Boolean value = false;
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get();
    }

    public void set(Boolean value) {
        flag.set(value);
    }

    public void setAll(Boolean value) {
        for (Boolean tlValue : allValues) {
            tlValue = value;
        }
    }
}

Я обеспокоен тем, что автобоксирование примитива может означать, что копии, которые я сохранил в списке, не будут ссылаться на те же переменные, на которые ссылается ThreadLocal, если я попытаюсь их установить. Я еще не тестировал этот код, и с чем-то сложным, как это, я ищу совет экспертов, прежде чем продолжить этот путь.

Кто-то спросит: "Зачем вы это делаете?". Я работаю в среде, где есть другие потоки, которые обращаются к моему коду, и у меня нет ссылок на них. Периодически я хочу обновить значение в переменной ThreadLocal, которую они используют, поэтому для выполнения этого обновления требуется, чтобы поток, который использует переменную, обновил. Мне просто нужен способ уведомлять все эти потоки, что их переменная ThreadLocal устарела.


Мне льстит, что в последнее время возникает новая критика в отношении этого трехлетнего вопроса, хотя я чувствую, что его тон немного меньше, чем профессионал. Решение, которое я предоставил, в это время работало без инцидентов. Тем не менее, должны быть лучшие пути для достижения цели, которая вызвала этот вопрос, и я предлагаю критикам дать ответ, который явно лучше. С этой целью я постараюсь более четко понять проблему, которую пытаюсь решить.

Как я упоминал ранее, я использовал фреймворк, в котором несколько потоков используют мой код вне моего контроля. Эта структура была QuickFIX/J, и я реализовал Application interface. Этот интерфейс определяет крючки для обработки сообщений FIX, и в моем использовании инфраструктура была настроена на многопоточность, так что каждое соединение FIX с приложением можно было обрабатывать одновременно.

Однако в инфраструктуре QuickFIX/J используется только один экземпляр моей реализации этого интерфейса для всех потоков. Я не контролирую, как начинаются потоки, и каждый обслуживает другое соединение с различными конфигурационными данными и другим состоянием. Естественно, что некоторые из этих состояний, к которым часто обращаются, но редко обновляются, живут в различных ThreadLocal, которые загружают свое начальное значение после того, как среда начала поток.

В других организациях у нас был библиотечный код, позволяющий нам регистрироваться для обратных вызовов для уведомления деталей конфигурации, которые изменяются во время выполнения. Я хотел зарегистрироваться для этого обратного вызова, и когда я его получил, я хотел, чтобы все потоки знали, что пришло время перезагрузить значения этих ThreadLocal s, поскольку они, возможно, изменились. Этот обратный вызов происходит из потока, который я не контролирую, точно так же, как потоки QuickFIX/J.

В моем решении ниже используется ThreadLocalFlag (завернутый ThreadLocal<AtomicBoolean>) только для того, чтобы сигнализировать другим потокам, что настало время обновить их значения. Обратный вызов вызывает setAll(true), а потоки QuickFIX/J вызывают set(false), когда они начинают свое обновление. Я проигнорировал проблемы concurrency ArrayList, потому что только время добавления списка во время запуска, и мой пример использования был меньше, чем размер по умолчанию в списке.

Я предполагаю, что одна и та же задача может быть выполнена с использованием других методов взаимодействия между потоками, но для того, что она делает, это казалось более практичным. Я приветствую другие решения.

Теги:
thread-local

3 ответа

-1
Лучший ответ

Я разочарован качеством ответов, полученных по этому вопросу; Я нашел свое решение.

Сегодня я написал свой тестовый пример и нашел единственную проблему с кодом в моем вопросе: Boolean. Boolean не изменяет, поэтому мой список ссылок не приносит мне никакой пользы. Я просмотрел этот вопрос и изменил свой код на использование AtomicBoolean, и теперь все работает так, как ожидалось.

public class ThreadLocalFlag {

    private ThreadLocal<AtomicBoolean> flag;
    private List<AtomicBoolean> allValues = new ArrayList<AtomicBoolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<AtomicBoolean>() {
            @Override protected AtomicBoolean initialValue() {
                AtomicBoolean value = new AtomicBoolean();
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get().get();
    }

    public void set(boolean value) {
        flag.get().set(value);
    }

    public void setAll(boolean value) {
        for (AtomicBoolean tlValue : allValues) {
            tlValue.set(value);
        }
    }
}

Тестовый пример:

public class ThreadLocalFlagTest {

    private static ThreadLocalFlag flag = new ThreadLocalFlag();
    private static boolean runThread = true;

    @AfterClass
    public static void tearDownOnce() throws Exception {
        runThread = false;
        flag = null;
    }

    /**
     * @throws Exception if there is any issue with the test
     */
    @Test
    public void testSetAll() throws Exception {
        startThread("ThreadLocalFlagTest-1", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-2", true);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-3", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-4", true);
        try {
            Thread.sleep(8000L); //watch the alternating values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(true);
        try {
            Thread.sleep(8000L); //watch the true values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(false);
        try {
            Thread.sleep(8000L); //watch the false values
        } catch (InterruptedException e) {
            //ignore
        }
    }

    private void startThread(String name, boolean value) {
        Thread t = new Thread(new RunnableCode(value));
        t.setName(name);
        t.start();
    }

    class RunnableCode implements Runnable {

        private boolean initialValue;

        RunnableCode(boolean value) {
            initialValue = value;
        }

        @Override
        public void run() {
            flag.set(initialValue);
            while (runThread) {
                System.out.println(Thread.currentThread().getName() + ": " + flag.get());
                try {
                    Thread.sleep(4000L);
                } catch (InterruptedException e) {
                    //ignore
                }
            }
        }
    }
}
  • 1
    это не полное решение, так как List не является потокобезопасным, если вы думаете, что ваш код либо настолько тривиален, что то, что вы делаете, это глупо излишне, либо вы просто не понимаете, что означает недетерминированный , и вы будете вскоре, когда это вводит странные состояния из-за того, что вещи перезаписываются недетерминированным образом. У вас так называемый контрольный пример в корне ошибочный. Любой, кто придет в будущем, прочитав это, должен воспринимать этот ответ как урок, что не следует делать !
  • 0
    @JarrodRoberson и что с того, что, просто оберните список с блоком synchronized () в этом списке, и он будет работать как очаровательный, все решение будет.
8

Взаимодействие с объектами в потоке ThreadLocal через потоки

Я скажу, что это плохая идея. ThreadLocal является специальным классом, который предлагает преимущества скорости и эффективности потоков при правильном использовании. Попытка общаться по потокам с помощью ThreadLocal ставит перед собой цель использовать класс в первую очередь.

Если вам нужен доступ к объекту по нескольким потокам, для этого используются инструменты, в частности потокобезопасные коллекции в java.util.collect.concurrent ConcurrentHashMap, который можно использовать для репликации ThreadLocal с помощью Thread объектов в качестве ключей, например:

ConcurrentHashMap<Thread, AtomicBoolean> map = new ConcurrentHashMap<>();

// pass map to threads, let them do work, using Thread.currentThread() as the key

// Update all known thread flags
for(AtomicBoolean b : map.values()) {
    b.set(true);
}

Более четкий, более сжатый и избегает разведения ThreadLocal в какой-то ужасный монстр Франкенштейна того, кем он когда-то был.

Уведомление потоков о том, что их данные устарели

Мне просто нужен способ оповестить все эти потоки о том, что их переменная ThreadLocal устарела.

Если ваша цель - просто уведомить другие потоки, что что-то изменилось, вам вообще не нужен ThreadLocal. Просто используйте одиночный AtomicBoolean и поделитесь им со всеми вашими задачами, как и ваш ThreadLocal<AtomicBoolean>. Поскольку имя подразумевает обновления для AtomicBoolean, это атомарные и видимые поперечные потоки. Еще лучше было бы использовать реальную помощь синхронизации, например CyclicBarrier или Phaser, но для простых случаев использования нет вреда при использовании AtomicBoolean.

Создание обновляемого "ThreadLocal"

Все сказанное, если вы действительно хотите реализовать глобально обновляемый ThreadLocal, ваша реализация излишне нарушена. Тот факт, что вы не столкнулись с проблемами с ним, - это просто совпадение, и в будущем рефакторинг может ввести жесткие диагностические ошибки или сбои. То, что он "работал без инцидентов", означает, что ваши тесты неполны.

  • Прежде всего, ArrayList не является потокобезопасным. Вы просто не можете использовать его (без внешней синхронизации), когда несколько потоков могут взаимодействовать с ним, даже если они будут делать это в разное время. То, что вы не видите никаких проблем сейчас, просто совпадение.
  • Сохранение объектов как List предотвращает удаление устаревших значений. Если вы вызываете ThreadLocal.set(), он будет добавляться в ваш список, не удаляя прежнее значение, что приводит к утечке памяти и потенциальным неожиданным побочным эффектам, если вы ожидаете, что эти объекты станут недоступными после прекращения потока, как это обычно бывает с ThreadLocal. Ваш случай использования избегает этой проблемы по совпадению, но по-прежнему нет необходимости использовать List.

Вот реализация IterableThreadLocal, которая надежно хранит и обновляет все существующие экземпляры значений ThreadLocal и работает для любого типа, который вы решите использовать:

import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;

import com.google.common.collect.MapMaker;

/**
 * Class extends ThreadLocal to enable user to iterate over all objects
 * held by the ThreadLocal instance.  Note that this is inherently not
 * thread-safe, and violates both the contract of ThreadLocal and much
 * of the benefit of using a ThreadLocal object.  This class incurs all
 * the overhead of a ConcurrentHashMap, perhaps you would prefer to
 * simply use a ConcurrentHashMap directly instead?
 * 
 * If you do really want to use this class, be wary of its iterator.
 * While it is as threadsafe as ConcurrentHashMap iterator, it cannot
 * guarantee that all existing objects in the ThreadLocal are available
 * to the iterator, and it cannot prevent you from doing dangerous
 * things with the returned values.  If the returned values are not
 * properly thread-safe, you will introduce issues.
 */
public class IterableThreadLocal<T> extends ThreadLocal<T>
                                    implements Iterable<T> {
    private final ConcurrentMap<Thread,T> map;

    public IterableThreadLocal() {
        map = new MapMaker().weakKeys().makeMap();
    }

    @Override
    public T get() {
        T val = super.get();
        map.putIfAbsent(Thread.currentThread(), val);
        return val;
    }

    @Override
    public void set(T value) {
        map.put(Thread.currentThread(), value);
        super.set(value);
    }

    /**
     * Note that this method fundamentally violates the contract of
     * ThreadLocal, and exposes all objects to the calling thread.
     * Use with extreme caution, and preferably only when you know
     * no other threads will be modifying / using their ThreadLocal
     * references anymore.
     */
    @Override
    public Iterator<T> iterator() {
        return map.values().iterator();
    }
}

Как вы можете надеяться, это немного больше, чем обертка вокруг ConcurrentHashMap и несет все те же накладные расходы, что и использование одного напрямую, но скрытое в реализации ThreadLocal, которое пользователи обычно ожидают быстрого и поточно-безопасный. Я реализовал его для демонстрационных целей, но я действительно не могу рекомендовать его использовать в любых настройках.

  • 0
    Я думаю, что вы упустили момент. Мне не нужен итератор, и я не хотел одновременного доступа к одним и тем же значениям между потоками, за исключением понятия действия сброса. Потоковая безопасность ArrayList не является проблемой, поскольку она используется только локально.
  • 2
    Извините, но я думаю, что вы упустили момент. Безопасность потоков ArrayList , к сожалению, является проблемой, поскольку несколько потоков могут пытаться добавить к ней (и, в частности, изменить ее размер) одновременно. Ваша реализация может по совпадению избежать этого в данный момент, но это очень опасный паттерн. При работе с несколькими потоками вы хотите свести к минимуму объем связи между потоками, обеспечивая безопасное для потока Iterable менее рискованно, чем разделять ArrayList между потоками. Поскольку все, что вы делаете в своем примере - это циклический просмотр списка, вам нужен только Iterable .
Показать ещё 7 комментариев
1

Это не будет хорошей идеей делать это, поскольку вся точка локального хранилища потоков - это нулевая локальность значения, которое она содержит, то есть вы можете быть уверены, что ни одна другая нить, кроме вашего собственного потока, не сможет коснуться Значение. Если другие потоки могут касаться локального значения вашего потока, это больше не будет "потоковым локальным", и это нарушит контракт модели памяти локального хранилища потоков.

Либо вы должны использовать что-то другое, чем ThreadLocal (например, a ConcurrentHashMap), чтобы сохранить это значение, или вам нужно найти способ запланировать обновление соответствующих потоков.

Вы можете использовать google guava map maker для создания статического окончательного ConcurrentWeakReferenceIdentityHashmap со следующим типом: Map<Thread, Map<String, Object>>, где вторая карта - это ConcurrentHashMap. Таким образом, вы бы приблизились к ThreadLocal, за исключением того, что вы можете выполнять итерацию по карте.

Ещё вопросы

Сообщество Overcoder
Наверх
Меню