Kotlin + Dataflow(Apache Beam)でCloudSQLからデータを取り出す

tl;dr

Kotlinで書いてるDataflow(Apache Beam)にて、CloudSQLからデータを取り出してAvroフォーマットで保持した。
BeamにはJdbcIOが提供されていてそれを利用するのですがドキュメントもなく色々手こずったのでメモしておきます
また、例のごとくkotlinで書きます

CloudSQLへの接続

CloudSQLへDataflowへ接続するには、ソケットファクトリを利用します。
必要なモジュールをpom.xmlに追記します。

     <!-- for CloudSQL -->
        <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-jdbc</artifactId>
        <version>2.0.0</version>
        </dependency>
        <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.5</version>
        </dependency>
        <dependency>
        <groupId>com.google.cloud.sql</groupId>
        <artifactId>mysql-socket-factory-connector-j-6</artifactId>
        <version>1.0.3</version>
        </dependency>

例は、usersテーブルからBigInt型のid, Char(100)型のname, Char(100)型のaddressをとりだすSQLです。
DataInfoというAvroフォーマットの自作クラスで、データを保持するとします。

jdbcIOにjdbcURLやらを引数に与えます。

option周り

   val jdbcUrl = buildString {
        append("jdbc:mysql://google/")
        append("<DATABASE_NAME>")
        append("?cloudSqlInstance=")
        append("<インスタンス接続名>")
        append("&socketFactory=com.google.cloud.sql.mysql.SocketFactory")
    }

    val jdbcInput = JdbcIO.read<DataInfo>()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl)
            .withUsername("<USER>")
            .withPassword("<PASS>")
        ).withQuery("SELECT id, name, address FROM users")
        .withCoder(AvroCoder.of(DataInfo::class.java))
        .withRowMapper(ReadUserFn())

    val p: Pipeline = Pipeline.create(options)

    p.apply("Read cloudsql", jdbcInput) // ...

    p.run()

データ保持クラスDataInfo定義、RowMapper実装

@DefaultCoder(AvroCoder::class)
data class DataInfo (
    var id: Long = 0L,
    var name: String = "",
    var address: String = ""
)

class ReadUserFn : JdbcIO.RowMapper<DataInfo> {
    override fun mapRow(resultSet: ResultSet): DataInfo {
        val id = resultSet.getLong(1)
        val name = resultSet.getString(2) ?: ""
        val address = resultSet.getString(3) ?: ""
        return WidgetInfo(id, name, address)
    }
}

data classのプロパティはvalでは実行時にエラーが起きたのでvarで持たせる必要があります。
おそらくCoderのほうでデフォルトコンストラクタを必要とする設計なためみたいです。

data classでもつと何が嬉しいかというと、次の処理ブロックにDataInfoクラスを伝搬させたあと、分解宣言ができます

class MapFn : DoFn<DataInfo, KV<String, String>>() {
    companion object {
        val LOG:Logger = LoggerFactory.getLogger(WidgetInfo::class.java)
    }

    @ProcessElement fun processElement(c: DoFn<DataInfo, KV<String, String>>.ProcessContext) {
        val (id, name, address) = c.element() // 分解宣言
        c.output(KV.of(id.toString(), name + "_" + address))
    }
}