Kotlin + Dataflow(Apache Beam)でCloudSQLからデータを取り出す
tl;dr
Kotlinで書いてるDataflow(Apache Beam)にて、CloudSQLからデータを取り出してAvroフォーマットで保持した。
BeamにはJdbcIOが提供されていてそれを利用するのですがドキュメントもなく色々手こずったのでメモしておきます
また、例のごとくkotlinで書きます
CloudSQLへの接続
CloudSQLへDataflowへ接続するには、ソケットファクトリを利用します。
必要なモジュールをpom.xmlに追記します。
<!-- for CloudSQL --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-jdbc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.5</version> </dependency> <dependency> <groupId>com.google.cloud.sql</groupId> <artifactId>mysql-socket-factory-connector-j-6</artifactId> <version>1.0.3</version> </dependency>
例
例は、users
テーブルからBigInt型のid
, Char(100)型のname
, Char(100)型のaddress
をとりだすSQLです。
DataInfo
というAvroフォーマットの自作クラスで、データを保持するとします。
jdbcIOにjdbcURLやらを引数に与えます。
option周り
val jdbcUrl = buildString { append("jdbc:mysql://google/") append("<DATABASE_NAME>") append("?cloudSqlInstance=") append("<インスタンス接続名>") append("&socketFactory=com.google.cloud.sql.mysql.SocketFactory") } val jdbcInput = JdbcIO.read<DataInfo>() .withDataSourceConfiguration( JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl) .withUsername("<USER>") .withPassword("<PASS>") ).withQuery("SELECT id, name, address FROM users") .withCoder(AvroCoder.of(DataInfo::class.java)) .withRowMapper(ReadUserFn()) val p: Pipeline = Pipeline.create(options) p.apply("Read cloudsql", jdbcInput) // ... p.run()
データ保持クラスDataInfo
定義、RowMapper実装
@DefaultCoder(AvroCoder::class) data class DataInfo ( var id: Long = 0L, var name: String = "", var address: String = "" ) class ReadUserFn : JdbcIO.RowMapper<DataInfo> { override fun mapRow(resultSet: ResultSet): DataInfo { val id = resultSet.getLong(1) val name = resultSet.getString(2) ?: "" val address = resultSet.getString(3) ?: "" return WidgetInfo(id, name, address) } }
data classのプロパティはvalでは実行時にエラーが起きたのでvarで持たせる必要があります。
おそらくCoderのほうでデフォルトコンストラクタを必要とする設計なためみたいです。
data classでもつと何が嬉しいかというと、次の処理ブロックにDataInfoクラスを伝搬させたあと、分解宣言ができます
class MapFn : DoFn<DataInfo, KV<String, String>>() { companion object { val LOG:Logger = LoggerFactory.getLogger(WidgetInfo::class.java) } @ProcessElement fun processElement(c: DoFn<DataInfo, KV<String, String>>.ProcessContext) { val (id, name, address) = c.element() // 分解宣言 c.output(KV.of(id.toString(), name + "_" + address)) } }
KotlinでDataflow書きたくて、触ってみた その3
tl;dr
ログの内容
前回、Apacheログをもとにパスそれぞれのカウント(アクセス数)を求めました ここで、動画広告の事業のほうからログ分析技術を紹介します
動画広告は、配信中にビーコン型のログを取得しています
つまり、ユーザの視聴を「動画再生」「15秒観た」「50%観た」「クリックした」「視聴完了した」という時点で
イベントが発火します。これをトラッキングといいます。
しくみ的にはイベント発火時にビーコンサーバへHTTPリクエストを飛ばしています。
このHTTPリクエストは1*1サイズのgifのロードであり、ネットワーク通信量的にも大したことなく、
HTTPリクエスト時のクエリパラメータに色々情報を乗っけています。
「http://<ビーコンサーバ>/impression.gif?data=1&data=2&data=3」みたいな感じです。
これをApacheをたてているビーコンサーバ側のアクセスログとして、ロギングを行なっています。
MapReduce
前回のは、pathをくっつけただけでしたので、?以降のクエリパラメータを除外して、
とにかくイベント内容ごとの粒度で振り分けてみようと思います
Map
class MapProc : DoFn<String, KV<String, Int>>() { @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) { val elem = c.element() val log: ApacheLog = Gson().fromJson(elem) val systime = log.systime ?: return val pathList = log.path?.split(Regex("\\?"), 2) ?: return val path = pathList[0] val timestamp = parseDateTime(systime).getMillis() val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis() val outputKey = startDay.toString() + "#" + path c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp)) } override fun getAllowedTimestampSkew(): Duration { return Duration.millis(Long.MAX_VALUE); } }
前回とちがって、pathの?以降は除外しました
Reduce
class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() { class Accum : Serializable { var count: Int = 0 } override fun createAccumulator(): Accum = Accum() override fun addInput(accum: Accum, value: Int): Accum { accum.count = value return accum } override fun mergeAccumulators(accums: Iterable<Accum>): Accum { val merged = createAccumulator() accums.filter { it.count > 0 }.forEach { merged.count += it.count } return merged } override fun extractOutput(accum: Accum): Int = accum.count override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> = SerializableCoder.of(Accum::class.java) }
なんかあんまり関数型っぽくかけてないので、せめて集計のところは
accums.filter{ it.count > 0}.forEach{ merged.count += it.count }
という風にkotlinのラムダ式使いました
kotlinではラムダ式の変数は指定しなければitですし、it->も省略できます
ログに出力してみた
keyは{yyyy-MM-dd 00:00:00のUNIX TIME}#{ファイルパス(イベントの種類)}
で、valueは集計値です。
これ乗せて大丈夫かな
KotlinでDataflow書きたくて、触ってみた その2
tl;dr
Apacheログから、その日のパス毎のアクセス数をリアルタイム分析(5分ごとに更新)するやつ をkotlinでかきました
(/users/1/ : 800view, /about.html : 2000000view的な)
unboundedとbounded
そもそもMapReduceはバッチ処理の手法です
バッチ処理というのはたとえば先月分のデータとか、データサイズが決まっているものに対して行う処理です
それにくらべて、今もこうやっているうちに、常に増え続けるデータなどはデータサイズが可変長です
Mapはデータがくるたびにできるけど、Reduceは集約処理なので、どのタイミングで、どのくらいのデータに対して適用するかは、データサイズが増え続けるんだったら不可能です。
そのためにBeamモデルでは、ウィンドウ処理という概念を用いて、
データサイズを擬似的に決めて、MapReduceを行います
(イメージはフーリエ変換の窓関数)
そこでBeamモデルでは、データサイズが固定なデータを制限ありデータ(bounded)、
データサイズが可変長なデータを制限なしデータ(unbounded)というふうに用語を定義しています。
ApacheログをPub/Subに
Pub/Subはいわゆるキューシステムで、とあるアプリケーションのアクセスログ(Apache)をPub/Subにどんどんいれてます
アクセスされるたびに、fluentdを使ってjsonにパースしたものを転送しています
<source> @type tail path /var/log/httpd/access_log tag apache.access_log pos_file /var/log/fluent/access_log.pos format /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] \[(?<systime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")$/ time_format %d/%b/%Y:%H:%M:%S %z </source> <match apache.access_log> @type gcloud_pubsub project <PROJECT_NAME> key <KEY-FILE PATH> topic projects/<PROJECT_NAME>/topics/<TOPIC_NAME> autocreate_topic false max_messages 1000 max_total_size 9800000 max_message_size 4000000 buffer_type file buffer_path /var/log/fluent/pubsub/ flush_interval 1s try_flush_interval 0.2 format json </match>
Dataflow
やりたいこと
pathのカウントをしましょう、1日(00:00 ~ 23:59)を粒度にします
だから、キーは(yyyy-MM-dd 00:00:00を表すUNIX TIME)#(pathのvalue)
とかにしておけばいいと思います。#はセパレータです。
Map処理
なので、アクセスログ一行ずつに、
というMap処理を行います
BeamモデルではParDo処理というのがMapに相当します
DoFnのサブクラスを実装します
data class ApacheLog ( val systime: String?, val path: String? ) class MapProc : DoFn<String, KV<String, Int>>() { @ProcessElement fun processElement(c: DoFn<String, KV<String, Int>>.ProcessContext) { val elem = c.element() val log = Gson().fromJson(elem, ApacheLog::class.java) val systime = log?.systime ?: return val path = log?.path ?: return val timestamp = parseDateTime(systime).getMillis() val startDay = parseDateTime(systime).withTimeAtStartOfDay().getMillis() val outputKey = startDay.toString() + "#" + path c.outputWithTimestamp(KV.of(outputKey, 1), Instant(timestamp)) } override fun getAllowedTimestampSkew(): Duration { return Duration.millis(Long.MAX_VALUE); } } fun parseDateTime(systime: String): DateTime { return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") .parseDateTime(systime) }
これで、PubSubからのデータ
c.outputWithTimestamp(Object, Instant)
は第二引数にorg.joda.time.Instant型の数値を渡すと、
その数値がそのデータのイベント時間として扱われます
なので、Apacheログのsystimeがイベント時間として適用されます
Window処理はイベント時間をもとにデータを時分割するためです
Reduce処理
BeamではCombineがReduceに相当します
CombineFnのサブクラスを実装します
class ReduceProc : CombineFn<Int, ReduceProc.Accum, Int>() { class Accum : Serializable { var count: Int = 0 } override fun createAccumulator(): Accum { return Accum() } override fun addInput(accum: Accum, value: Int): Accum { accum.count = value return accum } override fun mergeAccumulators(accums: Iterable<Accum>): Accum { val merged = createAccumulator() for (accum in accums) { merged.count += accum.count } return merged } override fun extractOutput(accum: Accum): Int { return accum.count } override fun getAccumulatorCoder(registry: CoderRegistry, inputCoder: Coder<Int>): Coder<Accum> { return SerializableCoder.of(Accum::class.java) } }
パイプラインの設計
それでは、実装したMapReduce関数をパイプラインに肉付けしていきたいのですが、
Pub/Subからのアクセスログデータは、常に増え続けるunboundedデータなので、Window処理を適用する必要があります
fun main(args: Array<String>) { val credentials = GoogleCredentials .fromStream(FileInputStream("<GCP認証のjsonファイルパス>")) .createScoped(StorageScopes.all()); val options = JavaDataflowUtils.getOptionInstance() options.setGcpCredential(credentials) options.setProject("<GCPのプロジェクト名>") options.setJobName("pubsub") options.setStreaming(true) val subscriptionPath = buildString { append("projects/") append("<GCPのプロジェクト名>").append("/subscriptions/") append("<Pub/Subのサブスクリプション名>") } runner(options, subscriptionPath) } fun runner(options: DataflowPipelineOptions, input: String) { val p:Pipeline = Pipeline.create(options) p.apply("read pubsub", PubsubIO.readStrings().fromSubscription(input)) .apply("Map", ParDo.of(MapProc())) .apply("Window", Window.into<KV<String, Int>>(FixedWindows.of(Duration.standardDays(1))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(5)))) .withAllowedLateness(Duration.standardHours(1)) .accumulatingFiredPanes()) .apply("Reduce", Combine.perKey<String, Int, Int>(ReduceProc())) p.run() }
Pipelineクラスのオブジェクトにapplyメソッドで関数を適用していきます
このWindowは、1日(00:00~23:59)の固定ウィンドウですが、Reduceが1日ずっと待ち続けるのはちょっとおかしいので、
早期トリガーとして5分ごとにウィンドウ内のデータをはきだすようにしています。
そんな感じで、Window + MapReduceを実現しています。
つかれた、また今度
超参考 :
KotlinでDataflow書きたくて、触ってみた
tl;dr
KotlinでDataflowをこれから書いていくことにした、多分
Cloud Dataflow(Apache Beam)
Cloud DataflowはGCPのサービスかつプログラミングモデル(SDK)で、
Compute Engineインスタンスを並列に立ち上げて、分散処理ができます
オープンソース化して、プロジェクト名はApache Beam
分散処理プログラミングモデルを、JavaもしくはPythonで高レイヤーな記述ができるSDKを提供しています
Pythonはβ版で、だいたいはJavaのほうで実装することになります
分散処理はGoogleが2004年くらいに論文で発表したMapReduceが根源とあって、
関数型プログラミングのMap関数とReduce関数の組み合わせを、
並列的に複数のサーバ上で実行すれば、どんな大きなサイズのデータに対しても計算ができるという、
最強のプログラミングモデルです
HadoopやSparkがその派生で、Dataflow(Beam Model)もそういえます。
- Map関数は、配列のすべての要素に対して同じ処理を適用し、配列として返す
- Reduce関数は、配列のすべての要素を累積したりする関数
ざっくりといってますが、あらゆるタスクはこのMapReduceのコンボで解けるというのが論文の主張です
そして、データフローというのは、データソース(ストレージやデータウェアハウス)から、
適切な形へ加工・集計しつつ、別のデータソースに格納する、という要件です。
たとえば、 アクセスログなどの非構造化データ(1アクセス1行の粒度) を加工し、1時間や1日の粒度で集計した結果をデータベースにいれる(データ1行がたとえば3:00~4:00の間の合計値など) などがあります
これを数学的に簡単に捉えると、 データ加工の流れは、関数写像の繰り返し じゃないかなと思えます
ちなみに、誰しも触ったことあるデータベースへのインターフェイスといえばSQLがあって、
SQLと同等なことをしたかったら、SQLのようなクエリを、
MapReduceモデルではプログラミング力で再現していく必要があるので、プログラミング力が必要になってきます
Cloud DataflowをKotlinで書く理由
Cloud Dataflowのプログラミングモデルは、そんな関数写像にピッタリで、Javaよりも、ScalaやKotlinのほうが、
モデルの適合性的にも、シンタックス的にも、いいんじゃないか、と思ったわけです
ScalaじゃなくてKotlinを選んだ理由は、ぶっちゃけどっちも触ったことがなかったし、
どうせなら興味のあるKotlinを選びました。モダンだし。
Kotlinの特徴は、
Kotlinの素晴らしさは、この記事がすごく楽しく読めました
とくにラムダ式がデータフローモデルにぴったりあってる気がします
この辺は、nardtreeさんの考え方に強く影響を受けてます
Kotlinの導入
brew update brew install kotlin
mavenを使って、JavaとKotlinを共存させた、Cloud Dataflowプロジェクト
pom.xmlはこんな感じにするとよし
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>xxxxxxxxxxx</groupId> <artifactId>xxxxxxxxxxxx</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>beacon_dataflow</name> <url>http://maven.apache.org</url> <properties> <kotlin.version>1.1.4-3</kotlin.version> <junit.version>4.12</junit.version> </properties> <dependencies> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib</artifactId> <version>${kotlin.version}</version> </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-test</artifactId> <version>${kotlin.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <!-- for Kotlin --> <plugin> <artifactId>kotlin-maven-plugin</artifactId> <groupId>org.jetbrains.kotlin</groupId> <version>${kotlin.version}</version> <executions> <execution> <id>compile</id> <phase>compile</phase> <goals> <goal>compile</goal> </goals> <configuration> <sourceDirs> <source>src/main/kotlin</source> </sourceDirs> </configuration> </execution> <execution> <id>test-compile</id> <phase>test-compile</phase> <goals> <goal>test-compile</goal> </goals> <configuration> <sourceDirs> <source>src/main/kotlin</source> </sourceDirs> </configuration> </execution> </executions> </plugin> <!-- for .java --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <executions> <execution> <id>default-compile</id> <phase>none</phase> </execution> <execution> <id>default-testCompile</id> <phase>none</phase> </execution> <execution> <id>compile</id> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>testCompile</id> <phase>test-compile</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
ここでブログ書くのが力尽きたので、後半は明日