KotlinでDataflow書きたくて、触ってみた その3
tl;dr
ログの内容
前回、Apacheログをもとにパスそれぞれのカウント(アクセス数)を求めました ここで、動画広告の事業のほうからログ分析技術を紹介します
動画広告は、配信中にビーコン型のログを取得しています
つまり、ユーザの視聴を「動画再生」「15秒観た」「50%観た」「クリックした」「視聴完了した」という時点で
イベントが発火します。これをトラッキングといいます。
しくみ的にはイベント発火時にビーコンサーバへHTTPリクエストを飛ばしています。
このHTTPリクエストは1*1サイズのgifのロードであり、ネットワーク通信量的にも大したことなく、
HTTPリクエスト時のクエリパラメータに色々情報を乗っけています。
「http://<ビーコンサーバ>/impression.gif?data=1&data=2&data=3」みたいな感じです。
これをApacheをたてているビーコンサーバ側のアクセスログとして、ロギングを行なっています。
MapReduce
前回のは、pathをくっつけただけでしたので、?以降のクエリパラメータを除外して、
とにかくイベント内容ごとの粒度で振り分けてみようと思います
Map
class MapProc : DoFn<String, KV<String, Int>>() { @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) { val elem = c.element() val log: ApacheLog = Gson().fromJson(elem) val systime = log.systime ?: return val pathList = log.path?.split(Regex("\\?"), 2) ?: return val path = pathList[0] val timestamp = parseDateTime(systime).getMillis() val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis() val outputKey = startDay.toString() + "#" + path c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp)) } override fun getAllowedTimestampSkew(): Duration { return Duration.millis(Long.MAX_VALUE); } }
前回とちがって、pathの?以降は除外しました
Reduce
class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() { class Accum : Serializable { var count: Int = 0 } override fun createAccumulator(): Accum = Accum() override fun addInput(accum: Accum, value: Int): Accum { accum.count = value return accum } override fun mergeAccumulators(accums: Iterable<Accum>): Accum { val merged = createAccumulator() accums.filter { it.count > 0 }.forEach { merged.count += it.count } return merged } override fun extractOutput(accum: Accum): Int = accum.count override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> = SerializableCoder.of(Accum::class.java) }
なんかあんまり関数型っぽくかけてないので、せめて集計のところは
accums.filter{ it.count > 0}.forEach{ merged.count += it.count }
という風にkotlinのラムダ式使いました
kotlinではラムダ式の変数は指定しなければitですし、it->も省略できます
ログに出力してみた
keyは{yyyy-MM-dd 00:00:00のUNIX TIME}#{ファイルパス(イベントの種類)}
で、valueは集計値です。
これ乗せて大丈夫かな