KotlinでDataflow書きたくて、触ってみた その2
tl;dr
Apacheログから、その日のパス毎のアクセス数をリアルタイム分析(5分ごとに更新)するやつ をkotlinでかきました
(/users/1/ : 800view, /about.html : 2000000view的な)
unboundedとbounded
そもそもMapReduceはバッチ処理の手法です
バッチ処理というのはたとえば先月分のデータとか、データサイズが決まっているものに対して行う処理です
それにくらべて、今もこうやっているうちに、常に増え続けるデータなどはデータサイズが可変長です
Mapはデータがくるたびにできるけど、Reduceは集約処理なので、どのタイミングで、どのくらいのデータに対して適用するかは、データサイズが増え続けるんだったら不可能です。
そのためにBeamモデルでは、ウィンドウ処理という概念を用いて、
データサイズを擬似的に決めて、MapReduceを行います
(イメージはフーリエ変換の窓関数)
そこでBeamモデルでは、データサイズが固定なデータを制限ありデータ(bounded)、
データサイズが可変長なデータを制限なしデータ(unbounded)というふうに用語を定義しています。
ApacheログをPub/Subに
Pub/Subはいわゆるキューシステムで、とあるアプリケーションのアクセスログ(Apache)をPub/Subにどんどんいれてます
アクセスされるたびに、fluentdを使ってjsonにパースしたものを転送しています
<source> @type tail path /var/log/httpd/access_log tag apache.access_log pos_file /var/log/fluent/access_log.pos format /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] \[(?<systime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")$/ time_format %d/%b/%Y:%H:%M:%S %z </source> <match apache.access_log> @type gcloud_pubsub project <PROJECT_NAME> key <KEY-FILE PATH> topic projects/<PROJECT_NAME>/topics/<TOPIC_NAME> autocreate_topic false max_messages 1000 max_total_size 9800000 max_message_size 4000000 buffer_type file buffer_path /var/log/fluent/pubsub/ flush_interval 1s try_flush_interval 0.2 format json </match>
Dataflow
やりたいこと
pathのカウントをしましょう、1日(00:00 ~ 23:59)を粒度にします
だから、キーは(yyyy-MM-dd 00:00:00を表すUNIX TIME)#(pathのvalue)
とかにしておけばいいと思います。#はセパレータです。
Map処理
なので、アクセスログ一行ずつに、
というMap処理を行います
BeamモデルではParDo処理というのがMapに相当します
DoFnのサブクラスを実装します
data class ApacheLog ( val systime: String?, val path: String? ) class MapProc : DoFn<String, KV<String, Int>>() { @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) { val elem = c.element() val log = Gson().fromJson(elem, ApacheLog::class.java) val systime = log?.systime ?: return val path = log?.path ?: return val timestamp = parseDateTime(systime).getMillis() val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis() val outputKey = startDay.toString() + "#" + path c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp)) } override fun getAllowedTimestampSkew(): Duration { return Duration.millis(Long.MAX_VALUE); } } fun parseDateTime(systime: String): DateTime { return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") .parseDateTime(systime) }
これで、PubSubからのデータ
c.outputWithTimestamp(Object, Instant)
は第二引数にorg.joda.time.Instant型の数値を渡すと、
その数値がそのデータのイベント時間として扱われます
なので、Apacheログのsystimeがイベント時間として適用されます
Window処理はイベント時間をもとにデータを時分割するためです
Reduce処理
BeamではCombineがReduceに相当します
CombineFnのサブクラスを実装します
class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() { class Accum : Serializable { var count: Int = 0 } override fun createAccumulator(): Accum { return Accum() } override fun addInput(accum: Accum, value: Int): Accum { accum.count = value return accum } override fun mergeAccumulators(accums: Iterable<Accum>): Accum { val merged = createAccumulator() for (accum in accums) { merged.count += accum.count } return merged } override fun extractOutput(accum: Accum): Int { return accum.count } override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> { return SerializableCoder.of(Accum::class.java) } }
パイプラインの設計
それでは、実装したMapReduce関数をパイプラインに肉付けしていきたいのですが、
Pub/Subからのアクセスログデータは、常に増え続けるunboundedデータなので、Window処理を適用する必要があります
fun main(args: Array<String>) { val credentials = GoogleCredentials .fromStream(FileInputStream("<GCP認証のjsonファイルパス>")) .createScoped(StorageScopes.all()); val options = JavaDataflowUtils.getOptionInstance() options.setGcpCredential(credentials) options.setProject("<GCPのプロジェクト名>") options.setJobName("pubsub") options.setStreaming(true) val subscriptionPath = buildString { append("projects/") append("<GCPのプロジェクト名>").append("/subscriptions/") append("<Pub/Subのサブスクリプション名>") } runner(options, subscriptionPath) } fun runner(options: DataflowPipelineOptions, input: String) { val p:Pipeline = Pipeline.create(options) p.apply("read pubsub", PubsubIO.readStrings().fromSubscription(input)) .apply("Map", ParDo.of(MapProc())) .apply("Window", Window.into<KV<String, Int>>(FixedWindows.of(Duration.standardDays(1))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(5)))) .withAllowedLateness(Duration.standardHours(1)) .accumulatingFiredPanes()) .apply("Reduce", Combine.perKey<String, Int, Int>(ReduceProc())) p.run() }
Pipelineクラスのオブジェクトにapplyメソッドで関数を適用していきます
このWindowは、1日(00:00~23:59)の固定ウィンドウですが、Reduceが1日ずっと待ち続けるのはちょっとおかしいので、
早期トリガーとして5分ごとにウィンドウ内のデータをはきだすようにしています。
そんな感じで、Window + MapReduceを実現しています。
つかれた、また今度
超参考 :