KotlinでDataflow書きたくて、触ってみた

tl;dr

KotlinでDataflowをこれから書いていくことにした、多分

Cloud Dataflow(Apache Beam)

Cloud DataflowはGCPのサービスかつプログラミングモデル(SDK)で、
Compute Engineインスタンスを並列に立ち上げて、分散処理ができます
オープンソース化して、プロジェクト名はApache Beam

cloud.google.com

beam.apache.org

分散処理プログラミングモデルを、JavaもしくはPythonで高レイヤーな記述ができるSDKを提供しています
Pythonはβ版で、だいたいはJavaのほうで実装することになります

分散処理はGoogleが2004年くらいに論文で発表したMapReduceが根源とあって、
関数型プログラミングのMap関数とReduce関数の組み合わせを、
並列的に複数のサーバ上で実行すれば、どんな大きなサイズのデータに対しても計算ができるという、
最強のプログラミングモデルです
HadoopやSparkがその派生で、Dataflow(Beam Model)もそういえます。

  • Map関数は、配列のすべての要素に対して同じ処理を適用し、配列として返す
  • Reduce関数は、配列のすべての要素を累積したりする関数

ざっくりといってますが、あらゆるタスクはこのMapReduceのコンボで解けるというのが論文の主張です

そして、データフローというのは、データソース(ストレージやデータウェアハウス)から、
適切な形へ加工・集計しつつ、別のデータソースに格納する、という要件です。
たとえば、 アクセスログなどの非構造化データ(1アクセス1行の粒度) を加工し、1時間や1日の粒度で集計した結果をデータベースにいれる(データ1行がたとえば3:00~4:00の間の合計値など) などがあります
これを数学的に簡単に捉えると、 データ加工の流れは、関数写像の繰り返し じゃないかなと思えます

ちなみに、誰しも触ったことあるデータベースへのインターフェイスといえばSQLがあって、
SQLと同等なことをしたかったら、SQLのようなクエリを、
MapReduceモデルではプログラミング力で再現していく必要があるので、プログラミング力が必要になってきます

Cloud DataflowをKotlinで書く理由

Cloud Dataflowのプログラミングモデルは、そんな関数写像にピッタリで、Javaよりも、ScalaやKotlinのほうが、
モデルの適合性的にも、シンタックス的にも、いいんじゃないか、と思ったわけです
ScalaじゃなくてKotlinを選んだ理由は、ぶっちゃけどっちも触ったことがなかったし、
どうせなら興味のあるKotlinを選びました。モダンだし。

Kotlinの特徴は、

  • 型推論
  • Null安全
  • 高階関数とラムダ
  • JVMで走り、Javaと互換性100%(Javaのライブラリなどそのまま使えるし、Javaファイルと同プロジェクトで共存化)

Kotlinの素晴らしさは、この記事がすごく楽しく読めました

trapti.tech

とくにラムダ式がデータフローモデルにぴったりあってる気がします

この辺は、nardtreeさんの考え方に強く影響を受けてます

catindog.hatenablog.com

Kotlinの導入

KotlinはMacだとbrewでいいぽい

brew update
brew install kotlin

mavenを使って、JavaとKotlinを共存させた、Cloud Dataflowプロジェクト

pom.xmlはこんな感じにするとよし

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>xxxxxxxxxxx</groupId>
    <artifactId>xxxxxxxxxxxx</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <name>beacon_dataflow</name>
    <url>http://maven.apache.org</url>

    <properties>
        <kotlin.version>1.1.4-3</kotlin.version>
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>
        <dependency>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-stdlib</artifactId>
        <version>${kotlin.version}</version>
        </dependency>
        <dependency>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-test</artifactId>
        <version>${kotlin.version}</version>
        <scope>test</scope>
        </dependency>
        <dependency>
        <groupId>com.google.cloud.dataflow</groupId>
        <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
        <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
        <groupId>org.hamcrest</groupId>
        <artifactId>hamcrest-all</artifactId>
        <version>1.3</version>
        <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- for Kotlin -->
            <plugin>
                <artifactId>kotlin-maven-plugin</artifactId>
                <groupId>org.jetbrains.kotlin</groupId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <source>src/main/kotlin</source>
                            </sourceDirs>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <source>src/main/kotlin</source>
                            </sourceDirs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- for .java -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>none</phase>
                    </execution>
                    <execution>
                        <id>default-testCompile</id>
                        <phase>none</phase>
                    </execution>
                    <execution>
                        <id>compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>testCompile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

ここでブログ書くのが力尽きたので、後半は明日