はじめに
マイクロアドでサーバーサイドエンジニアをしている高橋です。
Apache Spark SQL connector for Google BigQuery(以下spark-bigquery-connector)でSparkからBigQueryにデータを転送する際に、ドキュメント通りでもうまくいかない部分がありました。
そこで本記事では実際に転送できた設定例を紹介します。
タイトルのとおりですが、本記事ではGCP(Google Cloud Platform)外で動作するSparkからBigQueryへのデータ転送について扱います。
経緯
マイクロアドでは機械学習基盤としてGCPを使用しており、学習に使用するデータなどはBigQueryに蓄積されている必要があります。
一方で、広告配信のログやそれを集計したデータなどの学習で使用したいデータは主にオンプレ環境に構築されているデータ基盤に蓄積されています。
このような経緯で社内のデータ基盤からBigQueryへの転送処理が必要になっています。
マイクロアドの機械学習基盤についての記事もありますので、こちらも併せてご一読ください。
使用するライブラリや環境
Spark
PySpark、さらに言えばSpark Connectを使用しています。
それぞれのバージョンは以下です。
- クライアント
- Python 3.11
- PySpark 3.5.1
- Spark Connect server
今回のコードの例はPython + PySpark(Spark Connect)になりますが、設定内容等はSparkでも概ね同じはずです。
また、Spark Connect serverはKubernetesクラスタ上で起動させます。
spark-bigquery-connector
ドキュメントの「Downloading and Using the Connector」セクションの表を見ると、jar毎に対応しているSparkやScalaのバージョンがわかります。
今回はScala 2.13とSpark 3.5に対応しているものを使いますが、ここは環境に合わせて選定してください。
実際に使用したのはcom.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0です。
0.41.0の部分は検証時点での最新バージョンとなっています。
検証時にポイントとなったのが、依存も含む(spark-bigquery-with-dependencies)jarを使用することでした。
このjarはSpark Connect serverで使用するため、クラスパスが通っているディレクトリに配置した状態でDockerイメージをビルドします。
その他のjar
spark-bigquery-connector以外に使用したjarは以下です。
こちらもバージョンなどは環境に合わせて選定してください。
com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.26:jar:shadedorg.apache.spark:spark-avro_2.13:3.5.1
これらが必要となった理由は後述します。
こちらも依存を含む(shaded)jarを使用するのがおすすめです。
これらのjarもSpark Connect serverで使用するため、クラスパスが通っているディレクトリに配置した状態でDockerイメージをビルドします。
BigQueryへの転送
書き込み方法の指定
BigQueryへのデータの書き込み方法はwriteMethodというオプションで指定でき、directとindirectの2種類用意されています。
directではBigQuery Storage Write APIを使ってBigQueryに直接データを書き込みます。
indirectではGCS(Google Cloud Storage)バケットにParquetやAvroなどの形式で一度データを書き込み、BigQueryのデータ読み込み機能でこのファイルを読み込むことでテーブルにデータを書き込みます。
今回はデフォルト値になっているindirectを使用します。
設定ファイルで必要な項目
検証時に以下の項目はspark-defaults.confなどに設定が必要でした。
spark.hadoop.fs.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem spark.hadoop.google.cloud.auth.service.account.enable true spark.hadoop.google.cloud.auth.service.account.json.keyfile /path/to/keyfile.json
indirectではGCS経由でBigQueryにデータを転送するので、SparkとしてはGCSバケットにデータを書き込むことになります。
この書き込みにcom.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.26:jar:shadedが必要になります。
設定の書き方がGoogle Cloud Storage Connector for Spark and Hadoopのドキュメントなどを見ても正確にはわからなかったので試行錯誤しました。
spark.hadoop.google.cloud.auth.service.account.json.keyfileについてはサービスアカウントのJSONファイルへのパスを指定します。
Spark Connect serverからJSONファイルを参照する必要があったので、KubernetesのシークレットとしてJSONファイルを配置し、それをSpark Connect serverのPodでマウントして使用しています。
クライアント側
クライアント側の書き込み時のコード例は以下になります。
このコードを実行すると、以下のような動作になります。
- テーブルが存在しなかった場合は作成
- テーブルが存在し、パーティションにデータがない場合は追加
- テーブルが存在し、パーティションにデータがある場合は上書き
また、転送後の型の対応についてはドキュメントに書かれているのでそちらをご参照ください。
(
df.write.format('bigquery')
.option('parentProject', 'xxx-project')
.option('writeMethod', 'indirect')
.option('partitionType', 'HOUR')
.option('datePartition', '2025083109')
.option('partitionExpirationMs', 86400000)
.option('temporaryGcsBucket', 'xxx-bucket')
.option('intermediateFormat', 'avro')
.mode("overwrite")
.save('xxx_dataset.yyy_table')
)
それぞれのオプションについて、以下で説明していきます。
partitionTypeのオプションで選べる選択肢とそのときのdatePartitionのフォーマットは以下になります。
partitionType |
datePartitionのフォーマット |
|---|---|
HOUR |
YYYYMMDDHH |
DAY |
YYYYMMDD |
MONTH |
YYYYMM |
YEAR |
YYYY |
HOURを使う場合のみですが、BigQueryでは時刻はUTC基準となるため9時間ズラさなければならないという点には注意が必要です。
サンプルコードの例では2025083109を指定しているため、日本時間では2025-08-31 18:00:00相当のパーティションということになります。
partitionExpirationMsについてはパーティションの有効期限を設定できるオプションですが、datePartitionの日時からpartitionExpirationMsが経過するとパーティションが削除されるというものになっています。
過去のデータを転送するなどdatePartitionで既に有効期限が切れているパーティションを登録した場合にはすぐに消えてしまうという現象が発生します。
intermediateFormatについて、今回はAvroを指定しています。
その理由としてはARRAYのようなComplex型を含むデータの場合、中間フォーマットがParquetだとエラーが発生してしまったためです。
jarとして追加しているorg.apache.spark:spark-avro_2.13:3.5.1はこのAvro書き込みのために使用しています。
また、サンプルコードでは指定していませんがspark.sql.sources.partitionOverwriteModeというオプションについても注意点があります。
Sparkの通常の書き込み時においてはこのオプションをstaticやdynamicのように小文字で指定しても問題ないのですが、spark-bigquery-connectorでは大文字しか対応していないため、STATICまたはDYNAMICで指定する必要があります。
まとめ
GCP外のSparkからBigQueryにデータを転送する際に必要となる認証情報の追加などをやる部分でつまづいたので、同じような人の助けになればと思い執筆しました。
ここで挙げた設定や構成のうち省けるもの・追加で必要なものも環境によってはありますが、検証した環境ではこれでうまくいきました。
また、spark-bigquery-connectorを使ってBigQueryからデータを読み込んでSparkで操作することもできるのでそちらは機会があれば記事にさせていただきます。
コメント