У меня есть следующий сценарий:
Thread A отправит ряд задач как в Thread B, так и в C. (точное количество задач неизвестно)
Для каждой из задач поток A будет отправлять его в B и C одновременно (асинхронно), а затем, если любой из B или C завершит задачу успешно или оба сработали, A продолжит отправку следующей задачи. Идея здесь заключается в том, чтобы избежать блокировки как можно больше. т.е. для той же задачи, когда B закончил ее, пока C все еще обрабатывает, A может немедленно отправить следующую задачу и не нужно ждать, пока C получит результат.
он ожидал, что более медленный в B и C может пропустить некоторые задачи, пока задача выполняется другой. Например, B может завершить задачу t1 t2 t3 t4, а C только закончить t1 t4 из-за этого, когда C принял t2 и t3, по-прежнему обрабатывает t1.
есть ли подходящая для этого конструкция синхронизации потоков? Я проверяю java.util.concurrent.Phaser
, но кажется, что это не соответствует моей потребности. Любые комментарии приветствуются, спасибо заранее.
Это было бы проще, если бы вы использовали Future
или actors вместо Threads как строительный блок. Выполнение этого непосредственно над потоками может привести к возникновению многих проблем, поскольку вам необходимо позаботиться о деталях, например о очередности входящих сообщений. Другая проблема заключается в том, что я не понимаю, что то, что вы хотите решить, зеркалирует то, о чем вы просите, - где значение в выполнении одной и той же задачи дважды, на 2 разных потоках? Это просто неправильно.
Здесь наивная неблокирующая реализация, чтобы получить представление о том, что задействовано, НО НЕ СДЕЛАЙТЕ ЭТО в реальном коде (действительно рассматривайте абстракции более высокого уровня):
val queue = new AtomicReference(Queue.empty[Runnable])
def worker() = new Thread(new Runnable {
@tailrec
def run() = {
val currentQueue = queue.get
if (currentQueue.nonEmpty) {
val (task, updatedQueue) = currentQueue.dequeue
try {
task.run()
} catch {
case NonFatal(ex) =>
ex.printStackTrace()
}
// if this fails, then another worker succeeded
queue.compareAndSet(currentQueue, updatedQueue)
// process next task in queue
if (updatedQueue.nonEmpty) run()
}
}
})
@tailrec
def submitTask(task: Runnable): Unit = {
val currentQueue = queue.get
val newQueue = currentQueue.enqueue(task)
if (!queue.compareAndSet(currentQueue, newQueue))
submitTask(task)
else if (currentQueue.isEmpty) {
// because of the CAS above, only 2 workers will be
// active at the same time
worker().start()
worker().start()
}
}
Я бы использовал вариант балансировки рабочих нагрузок через узлы с помощью Akka 2, за исключением того, что вместо одного рабочего элемента на одного работника он назначает текущий рабочий элемент каждому работнику, который запрашивает работу до тех пор, пока рабочий элемент не будет завершен.
Это может показаться излишним, но этот шаблон очень хорошо масштабируется и легко настраивается. У меня есть два проекта в "Production", используя их, и есть другие. Этот подход является "тянущим" подходом.
Подход "подталкивания" с использованием участников потребует от действующих лиц, которые обрабатывают последнее сообщение только в своем почтовом ящике. Затем главный актер передавал все сообщения всем действующим субъектам.
Подумайте об использовании какого-либо Executor
, например Executors.newFixedThreadPool()
. Это позволяет:
Future
интерфейс, возвращаемый для каждой представленной задачи, позволяет проверить, успешно ли выполнена задача, все еще выполняется или не выполняется;Future.get()
блокируется, пока задача не завершится/не завершится;Я думаю, что эти функции охватывают требования поведения, которые вы написали.