KotlinでDataflow書きたくて、触ってみた その2

tl;dr

Apacheログから、その日のパス毎のアクセス数をリアルタイム分析(5分ごとに更新)するやつ をkotlinでかきました
(/users/1/ : 800view, /about.html : 2000000view的な)

unboundedとbounded

そもそもMapReduceバッチ処理の手法です
バッチ処理というのはたとえば先月分のデータとか、データサイズが決まっているものに対して行う処理です
それにくらべて、今もこうやっているうちに、常に増え続けるデータなどはデータサイズが可変長です
Mapはデータがくるたびにできるけど、Reduceは集約処理なので、どのタイミングで、どのくらいのデータに対して適用するかは、データサイズが増え続けるんだったら不可能です。
そのためにBeamモデルでは、ウィンドウ処理という概念を用いて、
データサイズを擬似的に決めて、MapReduceを行います
(イメージはフーリエ変換の窓関数)

そこでBeamモデルでは、データサイズが固定なデータを制限ありデータ(bounded)、
データサイズが可変長なデータを制限なしデータ(unbounded)というふうに用語を定義しています。

ApacheログをPub/Subに

Pub/Subはいわゆるキューシステムで、とあるアプリケーションのアクセスログ(Apache)をPub/Subにどんどんいれてます
アクセスされるたびに、fluentdを使ってjsonにパースしたものを転送しています

f:id:i101330:20170923234932p:plain

<source>
  @type tail
  path /var/log/httpd/access_log
  tag apache.access_log
  pos_file /var/log/fluent/access_log.pos
  format /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] \[(?<systime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")$/
  time_format %d/%b/%Y:%H:%M:%S %z
</source>

<match apache.access_log>
  @type gcloud_pubsub
  project <PROJECT_NAME>
  key <KEY-FILE PATH> 
  topic projects/<PROJECT_NAME>/topics/<TOPIC_NAME>
  autocreate_topic false
  max_messages 1000
  max_total_size 9800000
  max_message_size 4000000
  buffer_type file
  buffer_path /var/log/fluent/pubsub/
  flush_interval 1s
  try_flush_interval 0.2
  format json
</match>

Dataflow

やりたいこと

pathのカウントをしましょう、1日(00:00 ~ 23:59)を粒度にします
だから、キーは(yyyy-MM-dd 00:00:00を表すUNIX TIME)#(pathのvalue)とかにしておけばいいと思います。#はセパレータです。

Map処理

なので、アクセスログ一行ずつに、

  1. systimeから、yyyy-MM-dd 00:00:00のUNIX TIMEを計算する
  2. pathと文字列結合してValueは1をもつKeyValue形式にする

というMap処理を行います

BeamモデルではParDo処理というのがMapに相当します
DoFnのサブクラスを実装します

data class ApacheLog (
    val systime: String?,
    val path: String?
)

class MapProc : DoFn<String, KV<String, Int>>() {
    @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) {
        val elem = c.element()
        val log = Gson().fromJson(elem, ApacheLog::class.java)
        val systime = log?.systime ?: return
        val path = log?.path ?: return
        
        val timestamp = parseDateTime(systime).getMillis()
        val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis()
        val outputKey = startDay.toString() + "#" + path
        c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp))
    }

    override fun getAllowedTimestampSkew(): Duration {
        return Duration.millis(Long.MAX_VALUE);
    }
}

fun parseDateTime(systime: String): DateTime { 
    return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        .parseDateTime(systime)
}

これで、PubSubからのデータを、<KV<String, Int>>に変換するMapクラスができあがりです
c.outputWithTimestamp(Object, Instant)は第二引数にorg.joda.time.Instant型の数値を渡すと、
その数値がそのデータのイベント時間として扱われます
なので、Apacheログのsystimeがイベント時間として適用されます
Window処理はイベント時間をもとにデータを時分割するためです

Reduce処理

BeamではCombineがReduceに相当します
CombineFnのサブクラスを実装します

class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() {
    class Accum : Serializable {
        var count: Int = 0
    }

    override fun createAccumulator(): Accum {
        return Accum()
    }

    override fun addInput(accum: Accum, value: Int): Accum {
        accum.count = value
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        val merged = createAccumulator()
        for (accum in accums) {
            merged.count += accum.count
        }
        return merged
    }

    override fun extractOutput(accum: Accum): Int {
        return accum.count
    }
    override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> {
        return SerializableCoder.of(Accum::class.java)
    }
}

パイプラインの設計

それでは、実装したMapReduce関数をパイプラインに肉付けしていきたいのですが、
Pub/Subからのアクセスログデータは、常に増え続けるunboundedデータなので、Window処理を適用する必要があります

fun main(args: Array<String>) {

    val credentials = GoogleCredentials
        .fromStream(FileInputStream("<GCP認証のjsonファイルパス>"))
        .createScoped(StorageScopes.all());

    val options = JavaDataflowUtils.getOptionInstance()
    options.setGcpCredential(credentials)
    options.setProject("<GCPのプロジェクト名>")
    options.setJobName("pubsub")
    options.setStreaming(true)

    val subscriptionPath = buildString {
        append("projects/")
        append("<GCPのプロジェクト名>").append("/subscriptions/")
        append("<Pub/Subのサブスクリプション名>")
    }

    runner(options, subscriptionPath)
}

fun runner(options: DataflowPipelineOptions, input: String) {
    val p:Pipeline = Pipeline.create(options)

    p.apply("read pubsub", PubsubIO.readStrings().fromSubscription(input))
        .apply("Map", ParDo.of(MapProc()))
        .apply("Window", Window.into<KV<String, Int>>(FixedWindows.of(Duration.standardDays(1)))
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(5))))
            .withAllowedLateness(Duration.standardHours(1))
            .accumulatingFiredPanes())
        .apply("Reduce", Combine.perKey<String, Int, Int>(ReduceProc()))

    p.run()
}

Pipelineクラスのオブジェクトにapplyメソッドで関数を適用していきます
このWindowは、1日(00:00~23:59)の固定ウィンドウですが、Reduceが1日ずっと待ち続けるのはちょっとおかしいので、
早期トリガーとして5分ごとにウィンドウ内のデータをはきだすようにしています。

そんな感じで、Window + MapReduceを実現しています。

つかれた、また今度

超参考 :

catindog.hatenablog.com