
こんにちは、CCCMKホールディングスAIエンジニアの三浦です。
だんだん過ごしやすくなってきて、なんだかどこかに行きたいな、という気持ちになることが増えてきました。自分が知らない場所に行ったり知らない人に出会ったりすることは、自分にとってとてもよい経験なんだと思っています。
最近、Databricksで非構造化データの扱い方をいろいろと調べていて、その中でDeltaテーブルに格納されたドキュメントデータをベクトル検索するために、2種類の検索エンジン(Databricks Vector SearchとFaiss)を使ってインデックスを構築する方法を試していました。
今回はDatabricks Vector SearchとFaissそれぞれでインデックスを構築し、クエリを実行するまでの流れについてまとめてみたいと思います。
元になるDeltaテーブル
チーム内のLT会向けに作成したPPTXファイルから、MarkItDownというMicrosoftが開発した、さまざまなファイル形式(PowerPoint、PDF、画像など)をMarkdown形式に変換するためのツールを使ってテキスト情報を抽出し、スライドごとにレコードを分割してDeltaテーブルに格納しました。

このDeltaテーブルを起点にし、埋め込みベクトルの出力をしたのち、Vector SearchとFaissでインデックスを構築します。
埋め込みベクトルの出力
どちらのエンジンもベクトル検索を行うため、埋め込みモデルを使ってテキストデータの埋め込みベクトルを出力しておきます。DatabricksはFoundation Modelで埋め込みモデルを提供しています。それらのモデルはAISQLの1つai_query関数で呼び出すことが出来るため、以下のようなシンプルなSQLで実現することが可能です。
spark.sql(f""" CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{emb_table} TBLPROPERTIES (delta.enableChangeDataFeed = true) AS SELECT *, ai_query("databricks-bge-large-en", content) as embedding FROM {catalog_name}.{schema_name}.{source_table} """)
このクエリを実行すると、元のテーブルにテキストの埋め込みベクトルが格納された”embedding”というカラムが追加されたDeltaテーブルが新規作成されます。
TBLPROPERTIES (delta.enableChangeDataFeed = true)はDatabricks Vector Searchを作る際に必要になる設定です。
また、今回使用したFoundation Modelの埋め込みモデルは”databricks-bge-large-en”で、このモデルは1,024次元の埋め込みベクトルを生成することが出来ます。
埋め込みベクトル出力用関数
インデックスに入力する検索クエリの埋め込みベクトルを出力する関数を用意しておきます。api_keyにはDatabricksのトークンを指定します。
import os from openai import OpenAI def create_embedding(text): client = OpenAI( api_key=os.environ.get("API_KEY"), base_url=f"{workspace_url}/serving-endpoints" ) embeddings = client.embeddings.create( input=text, model="databricks-bge-large-en" ) return embeddings.data[0].embedding
Databricks Vector Searchによるインデックス構築
エンドポイントの作成
まずインデックスを格納するエンドポイントを作成します。
from databricks.vector_search.client import VectorSearchClient client = VectorSearchClient() client.create_endpoint( name=endpoint, endpoint_type="STANDARD" )
インデックスの作成
次にインデックスの作成です。source_table_nameは埋め込みベクトルを付与したDeltaテーブル名を指定し、index_nameは出力先のインデックス名を指定します。
index = client.create_delta_sync_index( endpoint_name=endpoint, source_table_name=f"{catalog_name}.{schema_name}.{emb_table}", index_name=f"{catalog_name}.{schema_name}.{vs_index_name}", pipeline_type="TRIGGERED", embedding_dimension=1024, primary_key="slide", embedding_vector_column="embedding" )
クエリ実行
作成したインデックスに対し、クエリを実行してみます。
import pandas as pd index = client.get_index( index_name=f"{catalog_name}.{schema_name}.{vs_index_name}" ) all_columns = spark.table( f"{catalog_name}.{schema_name}.{emb_table}" ).columns results = index.similarity_search( query_vector=create_embedding("multi modal"), columns=all_columns, num_results=5 ) pd.DataFrame(results["result"]["data_array"])
以下のように”multi modal”という単語に類似したテキストが含まれるレコードが取得されました。

Faissによるインデックス構築
インデックスの作成
次にFaissのインデックスを作成します。最初に埋め込みベクトルを付与済みのDeltaテーブルをロードします。
df = spark.sql(f""" SELECT embedding, content FROM {catalog_name}.{schema_name}.{emb_table} """ )
ロードしたDataFrameから埋め込みベクトルを取得し、インデックスを作成します。
import faiss import numpy as np import pandas as pd pdf = df.toPandas() embeddings = np.array(pdf['embedding'].tolist()).astype('float32') contents = pdf['content'].tolist() dimension = embeddings.shape[1] index = faiss.IndexFlatL2(dimension) index.add(embeddings)
クエリ実行
Faissでのクエリ実行を試してみます。
query_vector = np.array(
create_embedding("multi modal"),
dtype="float32"
).reshape(1,-1)
k = 5
distances, indices = index.search(query_vector, k)
results = []
for i, idx in enumerate(indices[0]):
results.append(contents[idx])
pd.DataFrame({"results":results})
こちらも次のように”multi modal”に関連するテキストを検索することが出来ました。

まとめ
今回は同じDeltaテーブルを起点とし、Databricks Vector SearchとFaissでそれぞれインデックスを作成し、クエリを実行するまでの手順をまとめてみました。状況に応じて検索エンジンを使い分けることが出来るようになり、検証してみてよかったと思いました。
それぞれのインデックスを用いてRAGを実装してみて、精度やパフォーマンスの検証も今後試してみたいと思っています!
コメント