Spark Streaming: объединение пакетов Dstream в единую выходную папку

Я использую Spark Streaming для получения твитов из твиттера, создавая StreamingContext как:
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))

и создать твиттер-поток как:
val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)

затем сохраните его как текстовый файл
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")

и проблема в том, что твиты сохраняются в виде папок в зависимости от времени пакета, но мне нужны все данные каждого пакета в одной папке.

Есть ли обходной путь для этого?

Спасибо


person Hussain Shaik    schedule 14.05.2015    source источник


Ответы (1)


Мы можем сделать это, используя новый API сохранения DataFrame Spark SQL, который позволяет добавлять к существующему выводу. По умолчанию saveAsTextFile не сможет сохранить в каталог с существующими данными (см. https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations рассказывается, как настроить контекст Spark SQL для использования с Spark Streaming.

Если вы скопируете часть руководства с помощью SQLContextSingleton, результирующий код будет выглядеть примерно так:

data.foreachRDD{rdd =>
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  // Convert your data to a DataFrame, depends on the structure of your data
  val df = ....
  df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}

(Обратите внимание, что в приведенном выше примере для сохранения результата использовался JSON, но вы также можете использовать другие форматы вывода).

person Holden    schedule 14.05.2015
comment
Могу ли я сохранить DF как текстовый файл? Как я видел, тип по умолчанию — паркет. какой должен быть источник? - person Hussain Shaik; 19.05.2015
comment
@Holden, @HussainShaik У меня был тот же вопрос, и я использовал ваше решение, но продолжаю получать сообщение об ошибке - не найдено: путь к значению [error] df.save("com.databricks.spark.csv",SaveMode.Append,Map("path" ->path.toString)). Есть ли способ исправить это? - person serendipity; 28.07.2015
comment
требует, чтобы путь был определен как место, где вы хотите его сохранить. - person Holden; 29.07.2015
comment
@ Холден, спасибо .. Я понял. Если можно задать еще вопрос... Я пытаюсь собрать твиты в этом файле. Теперь файл создается правильно, но я не вижу в нем сохраненных твитов. Вот мой фрагмент кода: - person serendipity; 29.07.2015
comment
@serendipity: ваш фрагмент кода не опубликован. Вероятно, вам было бы лучше создать еще один вопрос, а не использовать комментарии к этому. - person Holden; 29.07.2015
comment
@Holden Я сделал это уже вчера ... должен был вставить сюда ссылку на этот вопрос. Вот: ссылка. Спасибо за вашу помощь - person serendipity; 30.07.2015