Databricks DeltaテーブルからVector SearchとFaissでインデックスを作成してみる – CCCMKホールディングス TECH LABの Tech Blog

こんにちは、CCCMKホールディングスAIエンジニアの三浦です。

だんだん過ごしやすくなってきて、なんだかどこかに行きたいな、という気持ちになることが増えてきました。自分が知らない場所に行ったり知らない人に出会ったりすることは、自分にとってとてもよい経験なんだと思っています。

最近、Databricksで非構造化データの扱い方をいろいろと調べていて、その中でDeltaテーブルに格納されたドキュメントデータをベクトル検索するために、2種類の検索エンジン(Databricks Vector SearchとFaiss)を使ってインデックスを構築する方法を試していました。

今回はDatabricks Vector SearchとFaissそれぞれでインデックスを構築し、クエリを実行するまでの流れについてまとめてみたいと思います。

元になるDeltaテーブル

チーム内のLT会向けに作成したPPTXファイルから、MarkItDownというMicrosoftが開発した、さまざまなファイル形式(PowerPoint、PDF、画像など)をMarkdown形式に変換するためのツールを使ってテキスト情報を抽出し、スライドごとにレコードを分割してDeltaテーブルに格納しました。

pptxのテキスト情報を格納したDeltaテーブル

このDeltaテーブルを起点にし、埋め込みベクトルの出力をしたのち、Vector SearchとFaissでインデックスを構築します。

埋め込みベクトルの出力

どちらのエンジンもベクトル検索を行うため、埋め込みモデルを使ってテキストデータの埋め込みベクトルを出力しておきます。DatabricksはFoundation Modelで埋め込みモデルを提供しています。それらのモデルはAISQLの1つai_query関数で呼び出すことが出来るため、以下のようなシンプルなSQLで実現することが可能です。

spark.sql(f"""
CREATE OR REPLACE TABLE 
    {catalog_name}.{schema_name}.{emb_table} 
TBLPROPERTIES (delta.enableChangeDataFeed = true) AS
SELECT
    *,
    ai_query("databricks-bge-large-en", content) as embedding
FROM
    {catalog_name}.{schema_name}.{source_table}
""")

このクエリを実行すると、元のテーブルにテキストの埋め込みベクトルが格納された”embedding”というカラムが追加されたDeltaテーブルが新規作成されます。
TBLPROPERTIES (delta.enableChangeDataFeed = true)はDatabricks Vector Searchを作る際に必要になる設定です。

また、今回使用したFoundation Modelの埋め込みモデルは”databricks-bge-large-en”で、このモデルは1,024次元の埋め込みベクトルを生成することが出来ます。

埋め込みベクトル出力用関数

インデックスに入力する検索クエリの埋め込みベクトルを出力する関数を用意しておきます。api_keyにはDatabricksのトークンを指定します。

import os

from openai import OpenAI

def create_embedding(text):
    client = OpenAI(
    api_key=os.environ.get("API_KEY"),
    base_url=f"{workspace_url}/serving-endpoints"
    )

    embeddings = client.embeddings.create(
        input=text,
        model="databricks-bge-large-en"
    )

    return embeddings.data[0].embedding

Databricks Vector Searchによるインデックス構築

エンドポイントの作成

まずインデックスを格納するエンドポイントを作成します。

from databricks.vector_search.client import VectorSearchClient

client = VectorSearchClient()
client.create_endpoint(
    name=endpoint,
    endpoint_type="STANDARD" 
)

インデックスの作成

次にインデックスの作成です。source_table_nameは埋め込みベクトルを付与したDeltaテーブル名を指定し、index_nameは出力先のインデックス名を指定します。

index = client.create_delta_sync_index(
  endpoint_name=endpoint,
  source_table_name=f"{catalog_name}.{schema_name}.{emb_table}",
  index_name=f"{catalog_name}.{schema_name}.{vs_index_name}",
  pipeline_type="TRIGGERED",
  embedding_dimension=1024,
  primary_key="slide", 
  embedding_vector_column="embedding" 
)

クエリ実行

作成したインデックスに対し、クエリを実行してみます。

import pandas as pd

index = client.get_index(
  index_name=f"{catalog_name}.{schema_name}.{vs_index_name}"
)
all_columns = spark.table(
  f"{catalog_name}.{schema_name}.{emb_table}"
).columns

results = index.similarity_search(
  query_vector=create_embedding("multi modal"),
  columns=all_columns,
  num_results=5
)

pd.DataFrame(results["result"]["data_array"])

以下のように”multi modal”という単語に類似したテキストが含まれるレコードが取得されました。

Vector Searchのインデックスで”multi modal”を検索した結果

Faissによるインデックス構築

インデックスの作成

次にFaissのインデックスを作成します。最初に埋め込みベクトルを付与済みのDeltaテーブルをロードします。

df = spark.sql(f"""
SELECT 
  embedding, 
  content 
FROM 
  {catalog_name}.{schema_name}.{emb_table}
"""
)

ロードしたDataFrameから埋め込みベクトルを取得し、インデックスを作成します。

import faiss
import numpy as np
import pandas as pd


pdf = df.toPandas()


embeddings = np.array(pdf['embedding'].tolist()).astype('float32')
contents = pdf['content'].tolist()


dimension = embeddings.shape[1]


index = faiss.IndexFlatL2(dimension)


index.add(embeddings)

クエリ実行

Faissでのクエリ実行を試してみます。

query_vector = np.array(
    create_embedding("multi modal"),
    dtype="float32"
).reshape(1,-1)


k = 5
distances, indices = index.search(query_vector, k)

results = []

for i, idx in enumerate(indices[0]):
    results.append(contents[idx])


pd.DataFrame({"results":results})

こちらも次のように”multi modal”に関連するテキストを検索することが出来ました。

Faissのインデックスに”multi modal”で検索した結果。Vector Searchと同じ結果になりました。

まとめ

今回は同じDeltaテーブルを起点とし、Databricks Vector SearchとFaissでそれぞれインデックスを作成し、クエリを実行するまでの手順をまとめてみました。状況に応じて検索エンジンを使い分けることが出来るようになり、検証してみてよかったと思いました。

それぞれのインデックスを用いてRAGを実装してみて、精度やパフォーマンスの検証も今後試してみたいと思っています!




Source link

関連記事

コメント

この記事へのコメントはありません。