Я новичок на всей сцене Scala, но до сих пор любил езду! Тем не менее, я застрял в проблеме и еще не смог понять причину... Я сейчас работаю с Kafka и пытаюсь читать данные из темы и передавать ее где-то в другом месте.
Проблема заключается в том, что println во внутреннем понимании выводит строки внизу, как и ожидалось, но все остальные prinln вне этого внутреннего значения пропускаются, и функция не возвращает ничего вообще (даже не может выдавать getClass в тест!)... Что может быть причиной? У меня действительно закончились идеи...
Связанный код:
def tryBatchRead(maxMessages: Int = 100, skipMessageOnError: Boolean = true): List[String] = {
var numMessages = 0L
var list = List[String]()
val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream
for (messageAndTopic <- iter) {
for (m <- messageAndTopic) {
println(m.offset.toString + " --- " + new String(m.message))
list = list ++ List(new String(m.message))
println("DEBUG " + list)
numMessages += 1
}
println("test1")
}
println("test2")
println("FINISH" + list)
connector.shutdown()
println("test3")
list
}
Выход:
6 --- {"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}})
7 --- test 2
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2)
8 --- {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2, {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}})
Спасибо за помощь!
Я думаю, что это нормальное поведение, так как вы делаете над потоком, который может быть в теории бесконечного размера (так что он никогда не кончится или не сможет висеть, если он ждет результатов над IO....). IMHO Я скорее напишу для (m <- messageAndTopic.take(maxMessages).toList) вместо for (m <- messageAndTopic)
Я не совсем уверен, но это очень вероятно, что вы блокируете чтение последнего сообщения, ожидающего следующего следующего (потоки кафки в основном бесконечны). Настройте тайм-аут для потребителя kafka, поэтому он откажется, если в течение некоторого времени нет сообщения. Для этого есть свойство consumer.timeout.ms
(например, его значение составляет 3000
мс), что приведет к появлению исключения ConsumerTimeoutException после достижения предела ожидания.
Кстати, я бы переписал ваш код как:
def tryBatchRead(maxMessages: Int = 100): List[String] = {
// '.take' works fine if collection has less elements than max
val batchStream = stream.take(maxMessages)
// TODO: add try/catch section, according to the above comments
// in scala we usually write a single joined for, instead of multiple nested ones
val batch = for {
messageAndTopic <- batchStream.take(maxMessages)
msg <- messageAndTopic // are you sure you can iterate message and topic? 0_o
} yield {
println(m.offset.toString + " --- " + new String(m.message))
msg
}
println("Number of messages: " + batch.length)
// shutdown has to be done outside, it bad idea to implicitly tear down streams in reading function
batch
}
for {...} yield ...