Как координировать потоки в моем случае использования

1

У меня есть следующий сценарий:

  1. Thread A отправит ряд задач как в Thread B, так и в C. (точное количество задач неизвестно)

  2. Для каждой из задач поток A будет отправлять его в B и C одновременно (асинхронно), а затем, если любой из B или C завершит задачу успешно или оба сработали, A продолжит отправку следующей задачи. Идея здесь заключается в том, чтобы избежать блокировки как можно больше. т.е. для той же задачи, когда B закончил ее, пока C все еще обрабатывает, A может немедленно отправить следующую задачу и не нужно ждать, пока C получит результат.

  3. он ожидал, что более медленный в B и C может пропустить некоторые задачи, пока задача выполняется другой. Например, B может завершить задачу t1 t2 t3 t4, а C только закончить t1 t4 из-за этого, когда C принял t2 и t3, по-прежнему обрабатывает t1.

есть ли подходящая для этого конструкция синхронизации потоков? Я проверяю java.util.concurrent.Phaser, но кажется, что это не соответствует моей потребности. Любые комментарии приветствуются, спасибо заранее.

  • 2
    Кстати, почему и B, и C должны начать обрабатывать одну и ту же задачу? Они обрабатывают это как-то по-другому? Если нет, то какой в этом смысл?
  • 1
    Требуется ли, чтобы задачи были связаны с потоками? Вид академического упражнения?
Показать ещё 2 комментария
Теги:
multithreading

3 ответа

2

Это было бы проще, если бы вы использовали Future или actors вместо Threads как строительный блок. Выполнение этого непосредственно над потоками может привести к возникновению многих проблем, поскольку вам необходимо позаботиться о деталях, например о очередности входящих сообщений. Другая проблема заключается в том, что я не понимаю, что то, что вы хотите решить, зеркалирует то, о чем вы просите, - где значение в выполнении одной и той же задачи дважды, на 2 разных потоках? Это просто неправильно.

Здесь наивная неблокирующая реализация, чтобы получить представление о том, что задействовано, НО НЕ СДЕЛАЙТЕ ЭТО в реальном коде (действительно рассматривайте абстракции более высокого уровня):

val queue = new AtomicReference(Queue.empty[Runnable])

def worker() = new Thread(new Runnable {
  @tailrec
  def run() = {
    val currentQueue = queue.get
    if (currentQueue.nonEmpty) {
      val (task, updatedQueue) = currentQueue.dequeue
      try {
        task.run()
      } catch {
        case NonFatal(ex) =>
          ex.printStackTrace()
      }

      // if this fails, then another worker succeeded
      queue.compareAndSet(currentQueue, updatedQueue)
      // process next task in queue
      if (updatedQueue.nonEmpty) run()
    }
  }
})

@tailrec
def submitTask(task: Runnable): Unit =  {
  val currentQueue = queue.get
  val newQueue = currentQueue.enqueue(task)

  if (!queue.compareAndSet(currentQueue, newQueue))
    submitTask(task)
  else if (currentQueue.isEmpty) {
    // because of the CAS above, only 2 workers will be
    // active at the same time
    worker().start()
    worker().start()
  }
}
1

Я бы использовал вариант балансировки рабочих нагрузок через узлы с помощью Akka 2, за исключением того, что вместо одного рабочего элемента на одного работника он назначает текущий рабочий элемент каждому работнику, который запрашивает работу до тех пор, пока рабочий элемент не будет завершен.

Это может показаться излишним, но этот шаблон очень хорошо масштабируется и легко настраивается. У меня есть два проекта в "Production", используя их, и есть другие. Этот подход является "тянущим" подходом.

Подход "подталкивания" с использованием участников потребует от действующих лиц, которые обрабатывают последнее сообщение только в своем почтовом ящике. Затем главный актер передавал все сообщения всем действующим субъектам.

0

Подумайте об использовании какого-либо Executor, например Executors.newFixedThreadPool(). Это позволяет:

  • Отправка задачи, которая будет обработана в будущем;
  • Темы, которые работают над задачами, не будут пытаться выполнить одну и ту же задачу;
  • Future интерфейс, возвращаемый для каждой представленной задачи, позволяет проверить, успешно ли выполнена задача, все еще выполняется или не выполняется;
  • Механизм блокировки, описанный в 2), может быть достигнут с учетом того факта, что Future.get() блокируется, пока задача не завершится/не завершится;

Я думаю, что эти функции охватывают требования поведения, которые вы написали.

Ещё вопросы

Сообщество Overcoder
Наверх
Меню