KotlinでDataflow書きたくて、触ってみた その3

tl;dr

ログの内容

前回、Apacheログをもとにパスそれぞれのカウント(アクセス数)を求めました   ここで、動画広告の事業のほうからログ分析技術を紹介します  

動画広告は、配信中にビーコン型のログを取得しています
つまり、ユーザの視聴を「動画再生」「15秒観た」「50%観た」「クリックした」「視聴完了した」という時点で
イベントが発火します。これをトラッキングといいます。
しくみ的にはイベント発火時にビーコンサーバへHTTPリクエストを飛ばしています。
このHTTPリクエストは1*1サイズのgifのロードであり、ネットワーク通信量的にも大したことなく、
HTTPリクエスト時のクエリパラメータに色々情報を乗っけています。
http://<ビーコンサーバ>/impression.gif?data=1&data=2&data=3」みたいな感じです。
これをApacheをたてているビーコンサーバ側のアクセスログとして、ロギングを行なっています。

f:id:i101330:20170926030709p:plain

MapReduce

前回のは、pathをくっつけただけでしたので、?以降のクエリパラメータを除外して、
とにかくイベント内容ごとの粒度で振り分けてみようと思います

Map

class MapProc : DoFn<String, KV<String, Int>>() {
    @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) {
        val elem = c.element()
        val log: ApacheLog = Gson().fromJson(elem)
        val systime = log.systime ?: return
        val pathList = log.path?.split(Regex("\\?"), 2) ?: return
        val path = pathList[0]
        
        val timestamp = parseDateTime(systime).getMillis()
        val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis()
        val outputKey = startDay.toString() + "#" + path
        c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp))
    }

    override fun getAllowedTimestampSkew(): Duration {
        return Duration.millis(Long.MAX_VALUE);
    }
}

前回とちがって、pathの?以降は除外しました

Reduce

class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() {
    class Accum : Serializable {
        var count: Int = 0
    }

    override fun createAccumulator(): Accum = Accum()

    override fun addInput(accum: Accum, value: Int): Accum {
        accum.count = value
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        val merged = createAccumulator()
        accums.filter { it.count > 0 }.forEach { merged.count += it.count }
        return merged
    }

    override fun extractOutput(accum: Accum): Int = accum.count

    override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> = SerializableCoder.of(Accum::class.java)
}

なんかあんまり関数型っぽくかけてないので、せめて集計のところは
accums.filter{ it.count > 0}.forEach{ merged.count += it.count }という風にkotlinのラムダ式使いました
kotlinではラムダ式の変数は指定しなければitですし、it->も省略できます

ログに出力してみた

keyは{yyyy-MM-dd 00:00:00のUNIX TIME}#{ファイルパス(イベントの種類)}で、valueは集計値です。

f:id:i101330:20170926031623p:plain

これ乗せて大丈夫かな