Странный вопрос относительно понимания

1

Я новичок на всей сцене Scala, но до сих пор любил езду! Тем не менее, я застрял в проблеме и еще не смог понять причину... Я сейчас работаю с Kafka и пытаюсь читать данные из темы и передавать ее где-то в другом месте.

Проблема заключается в том, что println во внутреннем понимании выводит строки внизу, как и ожидалось, но все остальные prinln вне этого внутреннего значения пропускаются, и функция не возвращает ничего вообще (даже не может выдавать getClass в тест!)... Что может быть причиной? У меня действительно закончились идеи...

Связанный код:

def tryBatchRead(maxMessages: Int = 100, skipMessageOnError: Boolean = true): List[String] = {
  var numMessages = 0L

  var list = List[String]()

  val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream

  for (messageAndTopic <- iter) {
    for (m <- messageAndTopic) {
      println(m.offset.toString + " --- " + new String(m.message))
      list = list ++ List(new String(m.message))
      println("DEBUG " + list)
      numMessages += 1
    }
    println("test1")
  }

  println("test2")
  println("FINISH" + list)
  connector.shutdown()
  println("test3")
  list
}

Выход:

6 --- {"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}})
7 --- test 2
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2)
8 --- {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2, {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}})

Спасибо за помощь!

  • 1
    То, что у вас есть в вашем коде, является простым циклом for, for-compiction - синтаксический сахар вокруг стиля монадического программирования, то есть for {...} yield ...
  • 0
    Спасибо за замечание. ;-)
Теги:
apache-kafka
for-comprehension

2 ответа

0

Я думаю, что это нормальное поведение, так как вы делаете над потоком, который может быть в теории бесконечного размера (так что он никогда не кончится или не сможет висеть, если он ждет результатов над IO....). IMHO Я скорее напишу для (m <- messageAndTopic.take(maxMessages).toList) вместо for (m <- messageAndTopic)

0

Я не совсем уверен, но это очень вероятно, что вы блокируете чтение последнего сообщения, ожидающего следующего следующего (потоки кафки в основном бесконечны). Настройте тайм-аут для потребителя kafka, поэтому он откажется, если в течение некоторого времени нет сообщения. Для этого есть свойство consumer.timeout.ms (например, его значение составляет 3000 мс), что приведет к появлению исключения ConsumerTimeoutException после достижения предела ожидания.

Кстати, я бы переписал ваш код как:

def tryBatchRead(maxMessages: Int = 100): List[String] = {
  // '.take' works fine if collection has less elements than max
  val batchStream = stream.take(maxMessages)  

  // TODO: add try/catch section, according to the above comments
  // in scala we usually write a single joined for, instead of multiple nested ones
  val batch = for {
    messageAndTopic <- batchStream.take(maxMessages)
    msg <- messageAndTopic // are you sure you can iterate message and topic? 0_o
  } yield {
    println(m.offset.toString + " --- " + new String(m.message))
    msg
  }

  println("Number of messages: " + batch.length)

  // shutdown has to be done outside, it bad idea to implicitly tear down streams in reading function
  batch 
}
  • 0
    Здравствуйте! Спасибо за помощь! Я объединил ваш пример кода с предложением @ chekkal и парой исправлений, но все же не смог заставить его работать. = SI пытался поддерживать его на самом низком уровне, чтобы, например, не смешивать функции Akka, но не может достичь этой цели таким образом. Он продолжает блокировать ...
  • 0
    Кроме того , код , который я написал был сильно вдохновлен собственным ConsoleConsumer код Кафки ссылка - в том числе consumer.shutdown вещи ...
Показать ещё 7 комментариев

Ещё вопросы

Сообщество Overcoder
Наверх
Меню