Kotlin + Dataflow(Apache Beam)でCloudSQLからデータを取り出す

tl;dr

Kotlinで書いてるDataflow(Apache Beam)にて、CloudSQLからデータを取り出してAvroフォーマットで保持した。
BeamにはJdbcIOが提供されていてそれを利用するのですがドキュメントもなく色々手こずったのでメモしておきます
また、例のごとくkotlinで書きます

CloudSQLへの接続

CloudSQLへDataflowへ接続するには、ソケットファクトリを利用します。
必要なモジュールをpom.xmlに追記します。

     <!-- for CloudSQL -->
        <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-jdbc</artifactId>
        <version>2.0.0</version>
        </dependency>
        <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.5</version>
        </dependency>
        <dependency>
        <groupId>com.google.cloud.sql</groupId>
        <artifactId>mysql-socket-factory-connector-j-6</artifactId>
        <version>1.0.3</version>
        </dependency>

例は、usersテーブルからBigInt型のid, Char(100)型のname, Char(100)型のaddressをとりだすSQLです。
DataInfoというAvroフォーマットの自作クラスで、データを保持するとします。

jdbcIOにjdbcURLやらを引数に与えます。

option周り

   val jdbcUrl = buildString {
        append("jdbc:mysql://google/")
        append("<DATABASE_NAME>")
        append("?cloudSqlInstance=")
        append("<インスタンス接続名>")
        append("&socketFactory=com.google.cloud.sql.mysql.SocketFactory")
    }

    val jdbcInput = JdbcIO.read<DataInfo>()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl)
            .withUsername("<USER>")
            .withPassword("<PASS>")
        ).withQuery("SELECT id, name, address FROM users")
        .withCoder(AvroCoder.of(DataInfo::class.java))
        .withRowMapper(ReadUserFn())

    val p: Pipeline = Pipeline.create(options)

    p.apply("Read cloudsql", jdbcInput) // ...

    p.run()

データ保持クラスDataInfo定義、RowMapper実装

@DefaultCoder(AvroCoder::class)
data class DataInfo (
    var id: Long = 0L,
    var name: String = "",
    var address: String = ""
)

class ReadUserFn : JdbcIO.RowMapper<DataInfo> {
    override fun mapRow(resultSet: ResultSet): DataInfo {
        val id = resultSet.getLong(1)
        val name = resultSet.getString(2) ?: ""
        val address = resultSet.getString(3) ?: ""
        return WidgetInfo(id, name, address)
    }
}

data classのプロパティはvalでは実行時にエラーが起きたのでvarで持たせる必要があります。
おそらくCoderのほうでデフォルトコンストラクタを必要とする設計なためみたいです。

data classでもつと何が嬉しいかというと、次の処理ブロックにDataInfoクラスを伝搬させたあと、分解宣言ができます

class MapFn : DoFn<DataInfo, KV<String, String>>() {
    companion object {
        val LOG:Logger = LoggerFactory.getLogger(WidgetInfo::class.java)
    }

    @ProcessElement fun processElement(c: DoFn<DataInfo, KV<String, String>>.ProcessContext) {
        val (id, name, address) = c.element() // 分解宣言
        c.output(KV.of(id.toString(), name + "_" + address))
    }
}

KotlinでDataflow書きたくて、触ってみた その3

tl;dr

ログの内容

前回、Apacheログをもとにパスそれぞれのカウント(アクセス数)を求めました   ここで、動画広告の事業のほうからログ分析技術を紹介します  

動画広告は、配信中にビーコン型のログを取得しています
つまり、ユーザの視聴を「動画再生」「15秒観た」「50%観た」「クリックした」「視聴完了した」という時点で
イベントが発火します。これをトラッキングといいます。
しくみ的にはイベント発火時にビーコンサーバへHTTPリクエストを飛ばしています。
このHTTPリクエストは1*1サイズのgifのロードであり、ネットワーク通信量的にも大したことなく、
HTTPリクエスト時のクエリパラメータに色々情報を乗っけています。
http://<ビーコンサーバ>/impression.gif?data=1&data=2&data=3」みたいな感じです。
これをApacheをたてているビーコンサーバ側のアクセスログとして、ロギングを行なっています。

f:id:i101330:20170926030709p:plain

MapReduce

前回のは、pathをくっつけただけでしたので、?以降のクエリパラメータを除外して、
とにかくイベント内容ごとの粒度で振り分けてみようと思います

Map

class MapProc : DoFn<String, KV<String, Int>>() {
    @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) {
        val elem = c.element()
        val log: ApacheLog = Gson().fromJson(elem)
        val systime = log.systime ?: return
        val pathList = log.path?.split(Regex("\\?"), 2) ?: return
        val path = pathList[0]
        
        val timestamp = parseDateTime(systime).getMillis()
        val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis()
        val outputKey = startDay.toString() + "#" + path
        c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp))
    }

    override fun getAllowedTimestampSkew(): Duration {
        return Duration.millis(Long.MAX_VALUE);
    }
}

前回とちがって、pathの?以降は除外しました

Reduce

class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() {
    class Accum : Serializable {
        var count: Int = 0
    }

    override fun createAccumulator(): Accum = Accum()

    override fun addInput(accum: Accum, value: Int): Accum {
        accum.count = value
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        val merged = createAccumulator()
        accums.filter { it.count > 0 }.forEach { merged.count += it.count }
        return merged
    }

    override fun extractOutput(accum: Accum): Int = accum.count

    override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> = SerializableCoder.of(Accum::class.java)
}

なんかあんまり関数型っぽくかけてないので、せめて集計のところは
accums.filter{ it.count > 0}.forEach{ merged.count += it.count }という風にkotlinのラムダ式使いました
kotlinではラムダ式の変数は指定しなければitですし、it->も省略できます

ログに出力してみた

keyは{yyyy-MM-dd 00:00:00のUNIX TIME}#{ファイルパス(イベントの種類)}で、valueは集計値です。

f:id:i101330:20170926031623p:plain

これ乗せて大丈夫かな

KotlinでDataflow書きたくて、触ってみた その2

tl;dr

Apacheログから、その日のパス毎のアクセス数をリアルタイム分析(5分ごとに更新)するやつ をkotlinでかきました
(/users/1/ : 800view, /about.html : 2000000view的な)

unboundedとbounded

そもそもMapReduceバッチ処理の手法です
バッチ処理というのはたとえば先月分のデータとか、データサイズが決まっているものに対して行う処理です
それにくらべて、今もこうやっているうちに、常に増え続けるデータなどはデータサイズが可変長です
Mapはデータがくるたびにできるけど、Reduceは集約処理なので、どのタイミングで、どのくらいのデータに対して適用するかは、データサイズが増え続けるんだったら不可能です。
そのためにBeamモデルでは、ウィンドウ処理という概念を用いて、
データサイズを擬似的に決めて、MapReduceを行います
(イメージはフーリエ変換の窓関数)

そこでBeamモデルでは、データサイズが固定なデータを制限ありデータ(bounded)、
データサイズが可変長なデータを制限なしデータ(unbounded)というふうに用語を定義しています。

ApacheログをPub/Subに

Pub/Subはいわゆるキューシステムで、とあるアプリケーションのアクセスログ(Apache)をPub/Subにどんどんいれてます
アクセスされるたびに、fluentdを使ってjsonにパースしたものを転送しています

f:id:i101330:20170923234932p:plain

<source>
  @type tail
  path /var/log/httpd/access_log
  tag apache.access_log
  pos_file /var/log/fluent/access_log.pos
  format /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] \[(?<systime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")$/
  time_format %d/%b/%Y:%H:%M:%S %z
</source>

<match apache.access_log>
  @type gcloud_pubsub
  project <PROJECT_NAME>
  key <KEY-FILE PATH> 
  topic projects/<PROJECT_NAME>/topics/<TOPIC_NAME>
  autocreate_topic false
  max_messages 1000
  max_total_size 9800000
  max_message_size 4000000
  buffer_type file
  buffer_path /var/log/fluent/pubsub/
  flush_interval 1s
  try_flush_interval 0.2
  format json
</match>

Dataflow

やりたいこと

pathのカウントをしましょう、1日(00:00 ~ 23:59)を粒度にします
だから、キーは(yyyy-MM-dd 00:00:00を表すUNIX TIME)#(pathのvalue)とかにしておけばいいと思います。#はセパレータです。

Map処理

なので、アクセスログ一行ずつに、

  1. systimeから、yyyy-MM-dd 00:00:00のUNIX TIMEを計算する
  2. pathと文字列結合してValueは1をもつKeyValue形式にする

というMap処理を行います

BeamモデルではParDo処理というのがMapに相当します
DoFnのサブクラスを実装します

data class ApacheLog (
    val systime: String?,
    val path: String?
)

class MapProc : DoFn<String, KV<String, Int>>() {
    @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) {
        val elem = c.element()
        val log = Gson().fromJson(elem, ApacheLog::class.java)
        val systime = log?.systime ?: return
        val path = log?.path ?: return
        
        val timestamp = parseDateTime(systime).getMillis()
        val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis()
        val outputKey = startDay.toString() + "#" + path
        c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp))
    }

    override fun getAllowedTimestampSkew(): Duration {
        return Duration.millis(Long.MAX_VALUE);
    }
}

fun parseDateTime(systime: String): DateTime { 
    return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        .parseDateTime(systime)
}

これで、PubSubからのデータを、<KV<String, Int>>に変換するMapクラスができあがりです
c.outputWithTimestamp(Object, Instant)は第二引数にorg.joda.time.Instant型の数値を渡すと、
その数値がそのデータのイベント時間として扱われます
なので、Apacheログのsystimeがイベント時間として適用されます
Window処理はイベント時間をもとにデータを時分割するためです

Reduce処理

BeamではCombineがReduceに相当します
CombineFnのサブクラスを実装します

class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() {
    class Accum : Serializable {
        var count: Int = 0
    }

    override fun createAccumulator(): Accum {
        return Accum()
    }

    override fun addInput(accum: Accum, value: Int): Accum {
        accum.count = value
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        val merged = createAccumulator()
        for (accum in accums) {
            merged.count += accum.count
        }
        return merged
    }

    override fun extractOutput(accum: Accum): Int {
        return accum.count
    }
    override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> {
        return SerializableCoder.of(Accum::class.java)
    }
}

パイプラインの設計

それでは、実装したMapReduce関数をパイプラインに肉付けしていきたいのですが、
Pub/Subからのアクセスログデータは、常に増え続けるunboundedデータなので、Window処理を適用する必要があります

fun main(args: Array<String>) {

    val credentials = GoogleCredentials
        .fromStream(FileInputStream("<GCP認証のjsonファイルパス>"))
        .createScoped(StorageScopes.all());

    val options = JavaDataflowUtils.getOptionInstance()
    options.setGcpCredential(credentials)
    options.setProject("<GCPのプロジェクト名>")
    options.setJobName("pubsub")
    options.setStreaming(true)

    val subscriptionPath = buildString {
        append("projects/")
        append("<GCPのプロジェクト名>").append("/subscriptions/")
        append("<Pub/Subのサブスクリプション名>")
    }

    runner(options, subscriptionPath)
}

fun runner(options: DataflowPipelineOptions, input: String) {
    val p:Pipeline = Pipeline.create(options)

    p.apply("read pubsub", PubsubIO.readStrings().fromSubscription(input))
        .apply("Map", ParDo.of(MapProc()))
        .apply("Window", Window.into<KV<String, Int>>(FixedWindows.of(Duration.standardDays(1)))
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(5))))
            .withAllowedLateness(Duration.standardHours(1))
            .accumulatingFiredPanes())
        .apply("Reduce", Combine.perKey<String, Int, Int>(ReduceProc()))

    p.run()
}

Pipelineクラスのオブジェクトにapplyメソッドで関数を適用していきます
このWindowは、1日(00:00~23:59)の固定ウィンドウですが、Reduceが1日ずっと待ち続けるのはちょっとおかしいので、
早期トリガーとして5分ごとにウィンドウ内のデータをはきだすようにしています。

そんな感じで、Window + MapReduceを実現しています。

つかれた、また今度

超参考 :

catindog.hatenablog.com

KotlinでDataflow書きたくて、触ってみた

tl;dr

KotlinでDataflowをこれから書いていくことにした、多分

Cloud Dataflow(Apache Beam)

Cloud DataflowはGCPのサービスかつプログラミングモデル(SDK)で、
Compute Engineインスタンスを並列に立ち上げて、分散処理ができます
オープンソース化して、プロジェクト名はApache Beam

cloud.google.com

beam.apache.org

分散処理プログラミングモデルを、JavaもしくはPythonで高レイヤーな記述ができるSDKを提供しています
Pythonはβ版で、だいたいはJavaのほうで実装することになります

分散処理はGoogleが2004年くらいに論文で発表したMapReduceが根源とあって、
関数型プログラミングのMap関数とReduce関数の組み合わせを、
並列的に複数のサーバ上で実行すれば、どんな大きなサイズのデータに対しても計算ができるという、
最強のプログラミングモデルです
HadoopやSparkがその派生で、Dataflow(Beam Model)もそういえます。

  • Map関数は、配列のすべての要素に対して同じ処理を適用し、配列として返す
  • Reduce関数は、配列のすべての要素を累積したりする関数

ざっくりといってますが、あらゆるタスクはこのMapReduceのコンボで解けるというのが論文の主張です

そして、データフローというのは、データソース(ストレージやデータウェアハウス)から、
適切な形へ加工・集計しつつ、別のデータソースに格納する、という要件です。
たとえば、 アクセスログなどの非構造化データ(1アクセス1行の粒度) を加工し、1時間や1日の粒度で集計した結果をデータベースにいれる(データ1行がたとえば3:00~4:00の間の合計値など) などがあります
これを数学的に簡単に捉えると、 データ加工の流れは、関数写像の繰り返し じゃないかなと思えます

ちなみに、誰しも触ったことあるデータベースへのインターフェイスといえばSQLがあって、
SQLと同等なことをしたかったら、SQLのようなクエリを、
MapReduceモデルではプログラミング力で再現していく必要があるので、プログラミング力が必要になってきます

Cloud DataflowをKotlinで書く理由

Cloud Dataflowのプログラミングモデルは、そんな関数写像にピッタリで、Javaよりも、ScalaやKotlinのほうが、
モデルの適合性的にも、シンタックス的にも、いいんじゃないか、と思ったわけです
ScalaじゃなくてKotlinを選んだ理由は、ぶっちゃけどっちも触ったことがなかったし、
どうせなら興味のあるKotlinを選びました。モダンだし。

Kotlinの特徴は、

  • 型推論
  • Null安全
  • 高階関数とラムダ
  • JVMで走り、Javaと互換性100%(Javaのライブラリなどそのまま使えるし、Javaファイルと同プロジェクトで共存化)

Kotlinの素晴らしさは、この記事がすごく楽しく読めました

trapti.tech

とくにラムダ式がデータフローモデルにぴったりあってる気がします

この辺は、nardtreeさんの考え方に強く影響を受けてます

catindog.hatenablog.com

Kotlinの導入

KotlinはMacだとbrewでいいぽい

brew update
brew install kotlin

mavenを使って、JavaとKotlinを共存させた、Cloud Dataflowプロジェクト

pom.xmlはこんな感じにするとよし

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>xxxxxxxxxxx</groupId>
    <artifactId>xxxxxxxxxxxx</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <name>beacon_dataflow</name>
    <url>http://maven.apache.org</url>

    <properties>
        <kotlin.version>1.1.4-3</kotlin.version>
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>
        <dependency>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-stdlib</artifactId>
        <version>${kotlin.version}</version>
        </dependency>
        <dependency>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-test</artifactId>
        <version>${kotlin.version}</version>
        <scope>test</scope>
        </dependency>
        <dependency>
        <groupId>com.google.cloud.dataflow</groupId>
        <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
        <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
        <groupId>org.hamcrest</groupId>
        <artifactId>hamcrest-all</artifactId>
        <version>1.3</version>
        <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- for Kotlin -->
            <plugin>
                <artifactId>kotlin-maven-plugin</artifactId>
                <groupId>org.jetbrains.kotlin</groupId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <source>src/main/kotlin</source>
                            </sourceDirs>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <source>src/main/kotlin</source>
                            </sourceDirs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- for .java -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>none</phase>
                    </execution>
                    <execution>
                        <id>default-testCompile</id>
                        <phase>none</phase>
                    </execution>
                    <execution>
                        <id>compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>testCompile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

ここでブログ書くのが力尽きたので、後半は明日