использовать SQL в DStream.transform() вместо Spark Streaming?

Есть несколько примеров использования SQL поверх Spark Streaming в foreachRDD(). Но если я хочу использовать SQL в tranform():

case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
  if (rdd.count > 0) {
    val t = sqc.jsonRDD(rdd)
    t.registerTempTable("logstash")
    val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
    sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
  } else {
    rdd
  }
}).print()

Я получил такую ​​​​ошибку:

[ошибка] /Users/raochenlin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/LogStash.scala:52: нет параметров типа для преобразования метода: (transformFunc: org.apache. spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[U])(неявное свидетельство$5: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U ] существуют, чтобы его можно было применять к аргументам (org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String ‹: java.io.Serializable ]) [ошибка] --- потому что --- [ошибка] тип выражения аргумента не совместим с типом формального параметра; [ошибка] найдено: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg со строкой ‹: java.io.Serializable] [ошибка] требуется: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[?U] [ошибка] lines.transform( rdd => { [ошибка] ^ [ошибка] найдена одна ошибка [ошибка ] (компилировать:компилировать) Ошибка компиляции

Кажется, только если я использую sqlreport.map(r => r.toString), может быть правильное использование?


person chenryn    schedule 15.02.2015    source источник


Ответы (1)


dstream.transform взять функцию transformFunc: (RDD[T]) ⇒ RDD[U] В этом случае if должен привести к одному и тому же типу при обеих оценках условия, что не так:

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

В этом случае удалите оптимизацию if rdd.count ..., чтобы у вас был уникальный путь преобразования.

person maasg    schedule 15.02.2015