【Unit1 ブログリレー7日目】
こんにちは、CTOの大垣です。
このブログはUnit1ブログリレー 7日目の記事です。前日は木田さんの、大規模メンテナンスを振り返る 〜「やってよかった」準備集でした。大規模な作業だからこそ、リハーサルや実況などワイワイやることの良さが光りますね!
さて、今日の話題は、本番稼働している定期実行バッチの安全性についてです。
皆さんの環境でも、きっとたくさんのバッチが動いてますよね?
エムスリーのMR君サービスを支えるUnit1でも400以上のバッチが動いています。パーソナライズされたコンテンツの生成だったり、KPIの計測だったり、データの健全性チェックだったり、さまざまなものがあります。
特に、大規模データを扱うからオンデマンドじゃなくてバッチで事前計算する、という目的のもの、たくさんありますよね。
そして大規模データだからこそ。。。これらのバッチの入力や途中結果のデータが壊れたら大惨事ですよね!!!
ということで、今日はバッチが扱うデータの健全性。特にバッチならではの”一貫性”に着目したOSSを開発してみました。
Dataframe(df)の定期実行での誤った変更(drift)を検知するということでdfdriftです。
こだわりとしては、型をサボっちゃう我々を救うために作ったので、なんと、import dfdrift.pandas as pdだけで使い始められる最高にサボれるインタフェースにしました。

やりたいこと
RDB、BigQuery、Google Spreadsheet、ユーザーがアップロードしたcsvやexcelなど、多様なデータソース全てについて、不用意なデータ変更によってバッチが壊れたのを早めに検知する。
いくらコードが正しくても、データが変わってしまってはエラーになるので、プログラム自体のテストとは別に、特に本番データでのデータの健全性を確認し続けることはとても重要です。
そこで、本番のデータで、ロジックへの変更は最小で、データソースのスキーマの一貫性を確認し続ける、というのをdfdriftでは目指しました。
なお、エムスリーでは今回のスキーマ変更検知以外にも、データソースのテストとして以下のように常に重要な課題として取り組み続けています。
これらの記事もご一緒に読んでみてください。
本番データの不用意な修正を監視する話
BigQueryに依存するPythonバッチをモックデータでテストする
動作
dfdriftの機能は以下です。
1. Pythonのバッチで実行されたdataframeのスキーマをGCSにアップロードする

validationが実行されると、図のようなjsonが作成されます。
置き場はもちろんGCSじゃなくてオブジェクトストレージやBQなど、どこでもいいです。おそらくCronjobはサーバレスな構成で実行していることが多いと思うので、ローカル以外の永続化できる場所に、ファイル名+行番号:スキーマという形で保存しておきます。
ここで、どのデータフレームのスキーマか、という識別子としてファイル名+行番号を使ったことがポイントです。保存用の名前をつけずとも、自動で作られる識別子とできるからです。
もちろんコード自体が変更されて新しくリリースされた場合は行番号は変わってしまうんですが、dfdriftはリリース後のモニタリングが目的なのでリリース前後比較は不要だから問題ありません。
2. 前日の実行とスキーマが変更されていないかをバリデーション
バリデーションプロセスでは、1で保存されている前日のスキーマと、今日これから保存しようとしているのスキーマが一致していることを確認します。
どこまで一致を見るかは悩ましいんですが、一旦、カラム名と型の一致を確認しています。
なお、説明の都合上1を先に説明しましたが、もちろん動作としては、バリデーションしてから今日の実行結果のスキーマで上書き保存、です。
3. スキーマに変更があった場合slack通知

2で不一致が発生した時はslackでアラートがなります。
アラートを受け取った開発者は、以下の振る舞いをすることを期待しています。
まず、validateだけじゃなくてバッチ全体も失敗している場合はここが原因で障害が発生したということなので、直ちにデータソースの作成者と相談します。
もし、今回のバッチには問題がなく、バッチの動作も正しかった場合は、問題ないデータ変更なのかもしれませんが、それでも潜在的にバグになり得るので、型変換や不要なカラムのdropなど、より厳密なスキーマでデータ読み出しをするようにします(ここがポイント)。
後で詳しく議論しますが、今回はアラートを通知することだけが目的なので、スキーマに変更があった場合、slack通知はしますが、その後変更後のスキーマで上書きする実装にしています。
ここは直感的ではないかもですが、前述の通り、スキーマ変更は必ずしもエラーではないので、その場合はスルーするという選択肢も取れるように、です。
どのようにデータのバリデーションを行うか
ここからは、データソースの変更に強くするためにどうしてこの実装にしたか、他のもっと美しい方法がなかったかのディスカッションを記します。
A: 当然全てのdfにpanderaを設定するのがよい

- メリット: バリデーションが明瞭。コード中に型が明示されるため開発も高速化される。
- デメリット: 流石にすでに存在する全てのDataFrameに型をつけるまでを必須にするのは変更が大きい。また、将来的にどれくらいデータソースの変化を許容するべきかの見積もりが初動では取れないこともあり、緩い制約にしていることもある。
まず基本として、エムスリーではPythonのバッチ実装についてはpanderaやpydanticを使って型をつけるよう推奨しています。
特にDBやcsvからの読み出しでは一度pandasのDataframeにすることが多いので、読み出し後すぐにpanderaで型をつけるのが良い振る舞いと考えます。
しかしながら、今動いている既存のすべてのバッチがそうなっているわけではないですし、全部書き直すとなるとちょっと大変な部分もあります。
B: panderaのスキーマを自動生成する?
- メリット: panderaのエコシステムに乗ることができる
- デメリット: 実行系からソースにPRが出されるのは若干アクロバティック。実行されているデータから作られるスキーマは、将来を考えるとstrictすぎる、という可能性もある。
dfdriftの機能1でスキーマをアップロードする代わりに、もっと攻めて、実行するとpanderaのスキーマを自動生成して、それをGithubのソースに自動Pull Requestする仕組み、というのが、人間が楽をしつつAの仕組みに乗れる方法と考えられます。しかし、バージョン管理システムやリリース手順も巻き込んだ仕組みとなり、単発の実行内で完結しないのが、ちょっとやりたいことに対して大きいかなと考えました。
C(採用): 一貫性のバリデーションだけなら、前回実行との差分チェックで良い

- メリット: ストレージさえ用意すれば、あとはPythonの実行の中でバリデーションも更新も完結してポータブル。コードを追加で書かなくていい。
- デメリット: もちろん初回実行ではなにもチェックされない。コードで明示しないロジックでのアラートが出ることになる。
正しいデータ、という観点は今まで通り必要な部分での手動で書いたpanderaで良いと考え、その上で、”データの突然の変更”だけに着目するなら、前回実行を記録してその差分で良い、という議論です。
例外にするかwarningにするか
前回実行との差分が発生した時の開発者への報告として、例外を発生させてバッチを落とすか、なんらかのアラートにするか、強制力の観点で悩んだんですが、今回はslackにアラートを出す実装にしました*1。
理由は、今回スキーマの変化として、カラムの追加もアラートしています。
しかし、実際には、このバッチで使わないカラムが追加されただけなら、一旦実行には影響が出ないことも多いからです。
とはいえメモリ観点でも潜在的なバグ防止のためにも不要なカラムはdropしておいて欲しいので、不要なカラムが増えた、というアラートにも対応することが推奨です。
なお、差分が出た時に、保存したスキーマを更新せずにその後毎日アラートされる方向で行くか、アラートしつつも上書きして、それを新たな基本スキーマとするかも悩んだんですが、対応不要な変更もあるので、差分があってもスキーマは更新し、翌日からは新しいスキーマとの差分を見る実装にしました。
さらにサボりたい人向けのimplicitなインタフェース、import dfdrift.pandas as pd

import部分のimport pandas as pdをimport dfdrift.pandas as pdと書き換えるだけで、DataFrameを作っている部分全てでバリデーションされるようなインタフェースを用意しました!
個人的には、implicitなインタフェースはバグった時に調査が面倒になるという観点で避けるようにしてるんですが、今回はこのライブラリは処理を行わないし例外も吐かないのでエラーには繋がらない、ということからこのインタフェースを許容しています。
それよりも、既存のスクリプトも一括でpdを置き換えらることで、もれなくバリデーションされる、というメリットを取りました。
ちなみにdfdrift.patch()とかを呼んだらpd.Dataframe.newとかにモンキーパッチするのも検討しましたが、流石に実装が汚くなりそうだったのでその方針ではなく、 import dfdrift.pandas as pdを各ファイルに書く方針にしています。
なお、この方針はfireducksを参考にさせてもらいました!
一括オーバーライドの実装のテクニック
import dfdrift.pandas as pdでpdを置き換える、という方針はいいとして、それをどう実装するか、という話ですが、以下のコードで割と変なことはせずに実装できます。
まず、pd.DataFrame()のコンストラクタでの作成については、もちろんdfdrift.pandas.DataFrameがpandas.DataFrameを継承すればいいだけです。
ただし、DBやcsvからの作成は、pd.read_sqlやpd.read_csvなど、ヘルパー関数から作られることが一般的です。
ここもサポートするためには、pandasのトップレベルに置かれている関数たちもオーバーライドする必要がありますね。
ところで、pdを置き換えちゃってるんだから、pdの他の全ての関数も置き換える必要があります。
ここで出てくるのがdef __getattr__(name: str) -> Any:で、Pythonではモジュールレベルでもgetattrが名前解決を担っているので、ここでpandasへの移譲を行えば良いです。
なお、ここより上に定義されている関数を呼び出す時はgetattrを呼ばれないので、read_csvなどのオーバーライドもちゃんと成り立ちます。
import pandas as _pd class DataFrame(_pd.DataFrame): def __init__(self, data=None, index=None, columns=None, dtype=None, copy=None): super().__init__(data=data, index=index, columns=columns, dtype=dtype, copy=copy) _validate_dataframe(self) def read_csv(*args, **kwargs): df = _pd.read_csv(*args, **kwargs) _validate_dataframe(df) return df def __getattr__(name: str) -> Any: return getattr(_pd, name)
小話: Claude Codeへの指示を細かいステップに分ける & レポジトリに記録する

今回開発にはClaude Codeを利用したのですが、最近こういう小さいツールを作る時には、以下の戦略を取るようにしてみています。
- 小さいステップで指示する
- 作りたいもの全体と今回のステップでやりたいことを両方指示に入れる
- 各ステップの指示は1つずつmarkdownでレポジトリに保存する
細かく分けるのは、コミットを小さくすることでやり直しやすくするため、全体の設計も入れるのは、ステップの指示を実行するせいで全体設計を壊さないようにするため、プロンプトもコミットするのは他の開発者に引き継ぎやすくするためです。指示とコミットを小さくするのはみなさんやってるかと思いますが、継続的な開発とAIエージェントを組み合わせる方法論は発展途上だと思うので、この辺もブラッシュアップしていきたいです。
まとめ
バッチ処理を安全に行うために、データに正確な型をつけて回る作業は重要なのですが、本番のバッチで最も重要なのは一貫性だと言えます。
前回実行との型の差分をチェックするというアプローチは、手軽さと安全性のバランスで一定効果のありそうなアプローチに思います。
DataFrameは便利な一方で、無理やり動いてバグを生む可能性もはらんでいるので、安全なDataFrame活用を引き続き議論していきたいです*2!
dfdriftは先週から本番に入れてるのですが、幸いにもまだアラートはなっておらず、ある意味本番検証できてない状況なので、まだまだ機能不足などあると思いますが、今後アラートに対応しながら改善していきたいです!
We are Hiring!
エムスリーでは一貫したデータを作るエンジニア、一貫したデータから価値を生み出すエンジニアを募集しています!!
コメント