10  Deep retrieval with nearest neighbours

Open In Colab

Matching customer queries to products via embeddings and Retrieval Augmentated Generation.

10.0.1 Overview

This notebook demonstrates one method of using large language models to interact with data. Using the Wayfair WANDS dataset of more than 42,000 products, we will go through the following steps:

  • Download the data into a pandas dataframe

  • Generate embeddings for the product descriptions

  • Create and deploy and index of the embeddings on Vertex AI Matching Engine, a service which enables nearest neighbor search at scale

  • Prompt an LLM to retrieve relevant product suggestions from the embedded data.

10.0.1.1 Please note:

Deployed indexes on matching engine can quickly accrue significant costs. Please see the final cell of the notebook for cleaning up resources used.

Images from wayfair.co.uk

10.0.2 Technologies

In this notebook, we will use:

  • Vertex AI’s language model

  • Vertex AI Matching Engine, a high-scale, low-latency vector database.

# Install the packages
! pip3 install --upgrade google-cloud-aiplatform
! pip3 install shapely<2.0.0

10.0.3 Colab only: Uncomment the following cell to restart the kernel

# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

Set your Google Cloud project id and region

PROJECT_ID = "<...>"  # @param {type:"string"}
REGION = "<..>"
# Set the project id
! gcloud config set project {PROJECT_ID}

We will need a Cloud Storage bucket to store embeddings initially. Please create a bucket and add the URI below.

BUCKET_URI = "gs://<...>"

Authenticate your Google Cloud account Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

  1. Vertex AI Workbench

Do nothing as you are already authenticated.

  1. Local JupyterLab instance, uncomment and run:
# ! gcloud auth login
  1. Colab, uncomment and run:
from google.colab import auth
auth.authenticate_user()

Install and intialize the SDK and language model. GCP uses the gecko model for text embeddings.

import vertexai

vertexai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)
# Load the "Vertex AI Embeddings for Text" model
from vertexai.preview.language_models import TextEmbeddingModel

model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001")

Now we’re ready to prepare the data

import os
import pandas as pd

path = "data"

os.path.exists(path)
if not os.path.exists(path):
  os.makedirs(path)
  print("data directory created")
else:
  print("data directory found")
# download datasets
!wget -q https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/product.csv

!mv *.csv data/
!ls data

The dataset features a wealth of information. The queries (user searchers), and the rating of the responses to the queries, have been particularly interesting to researchers. For this demo however we will focus on the product descriptions.

product_df = pd.read_csv("data/product.csv", sep='\t')
product_df

Filter the dataframe to consider product_id, product_name, product_description.

product_df = product_df.filter(["product_id", "product_name", "product_description"], axis=1)
product_df = product_df.rename(columns={"product_description": "product_text", "product_id": "id"})
product_df = product_df.dropna()
len(product_df)

The following three cells contain functions from this notebook from the vertex-ai-samples repository.

encode_texts_to_embeddings will be used later to convert the product descriptions into embeddings.

from typing import List, Optional

# Define an embedding method that uses the model
def encode_texts_to_embeddings(text: List[str]) -> List[Optional[List[float]]]:
    try:
        embeddings = model.get_embeddings(text)
        return [embedding.values for embedding in embeddings]
    except Exception:
        return [None for _ in range(len(text))]

These helper functions achieve the following:

  • generate_batches splits the product descriptions into batches of five, since the embeddings API will field up to five text instances in each request.

  • encode_text_to_embedding_batched calls the embeddings API and handles rate limiting using time.sleep.

import functools
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, List, Tuple

import numpy as np
from tqdm.auto import tqdm


# Generator function to yield batches of sentences
def generate_batches(
    text: List[str], batch_size: int
) -> Generator[List[str], None, None]:
    for i in range(0, len(text), batch_size):
        yield text[i : i + batch_size]


def encode_text_to_embedding_batched(
    text: List[str], api_calls_per_second: int = 10, batch_size: int = 5
) -> Tuple[List[bool], np.ndarray]:

    embeddings_list: List[List[float]] = []

    # Prepare the batches using a generator
    batches = generate_batches(text, batch_size)

    seconds_per_job = 1 / api_calls_per_second

    with ThreadPoolExecutor() as executor:
        futures = []
        for batch in tqdm(
            batches, total=math.ceil(len(text) / batch_size), position=0
        ):
            futures.append(
                executor.submit(functools.partial(encode_texts_to_embeddings), batch)
            )
            time.sleep(seconds_per_job)

        for future in futures:
            embeddings_list.extend(future.result())

    is_successful = [
        embedding is not None for text, embedding in zip(text, embeddings_list)
    ]
    embeddings_list_successful = np.squeeze(
        np.stack([embedding for embedding in embeddings_list if embedding is not None])
    )
    return is_successful, embeddings_list_successful

Let’s encode a subset of data and check the distance metrics provide sane product suggestions.

import math

# Encode a subset of questions for validation
products = product_df.product_text.tolist()[:500]
is_successful, product_embeddings = encode_text_to_embedding_batched(
    text=product_df.product_text.tolist()[:500]
)

# Filter for successfully embedded sentences
products = np.array(products)[is_successful]
DIMENSIONS = len(product_embeddings[0])

print(DIMENSIONS)

This function takes a description from the dataset (rather than a user) and looks for relevant matches. The first answer is likely to be the exact match.

import random

product_index = random.randint(0, 99)

print(f"Product query: {products[product_index]} \n")

scores = np.dot(product_embeddings[product_index], product_embeddings.T)

# Print top 3 matches
for index, (product, score) in enumerate(
    sorted(zip(products, scores), key=lambda x: x[1], reverse=True)[:3]
):
    print(f"\t{index}: \n {product}: \n {score} \n")

10.0.4 Data formatting for building an index

We need to save the embeddings and the id and product_name columns to the JSON lines format in order to creat an index on Matching Engine. For more details, see the documentation here.

import tempfile
from pathlib import Path

# Create temporary file to write embeddings to
embeddings_file_path = Path(tempfile.mkdtemp())

print(f"Embeddings directory: {embeddings_file_path}")
product_embeddings = np.array(product_embeddings)
!touch json_output.json

Let’s take a look at the shape and type of the embeddings. At the moment, the product_embeddings are a numpy array. We will need to convert them to a Python dictionary to use them as another column in a dataframe.

type(product_embeddings)
embeddings_list = product_embeddings.tolist()
embeddings_dicts = [{'embedding': embedding} for embedding in embeddings_list]
embeddings_df = product_df.merge(pd.DataFrame(embeddings_dicts), left_on='id', right_index=True)
embeddings_df

10.0.5 JSON Lines

Now we can convert the entire dataframe to JSON lines.

json_lines = embeddings_df.to_json(orient='records', lines=True)
json_lines
import json

output_file = 'merged_data.json'
with open(output_file, 'w') as file:
    for index, row in embeddings_df.iterrows():
        data = {
            'id': row['id'],
            'product_name': row['product_name'],
            'product_text': row['product_text'],
            'embedding': row['embedding']
        }
        json_line = json.dumps(data)
        file.write(json_line + '\n')

Copy the JSON lines file to Cloud Storage.

!gsutil cp merged_data.json gs://genai-experiments/
!cat json_output.json

10.0.6 Creating the index in Matching Engine

*This is a long-running operation which can take up to an hour.

DIMENSIONS = 768
# Add a display name
DISPLAY_NAME = "wands_index"
DESCRIPTION = "products and descriptions from Wayfair"
remote_folder = BUCKET_URI

tree_ah_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    contents_delta_uri=remote_folder,
    dimensions=DIMENSIONS,
    approximate_neighbors_count=150,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=5,
    description=DESCRIPTION,
)

In the results of the cell above, make note of the information under this line:

To use this MatchingEngineIndex in another session:

If Colab runtime resets, you will need this line to set the index variable:

index = aiplatform.MatchingEngineIndex(...)

Use gcloud to list indexes

# Add your region below
!gcloud ai indexes list --region="<...>"
INDEX_RESOURCE_NAME = tree_ah_index.resource_name

10.0.7 Create an index endpoint

my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=DISPLAY_NAME,
    description=DISPLAY_NAME,
    public_endpoint_enabled=True,
)
  • Note, here is how to get an existing MatchingEngineIndex (from the output in the MatchingEngineIndex.create cell above) and MatchingEngineIndexEndpoint (from another project, or if the Colab runtime resets).
# Fill in the values from the MatchingEngineIndex.create
# and MatchingEngineIndexEndpoint.create cells

# index = aiplatform.MatchingEngineIndex('<...>')

# my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
#     index_endpoint_name = '<...>',
# )
# Write your own unique index name
DEPLOYED_INDEX_ID = "<...>"

10.0.8 Deploy the index to the endpoint

my_index_endpoint = my_index_endpoint.deploy_index(
    index=index, deployed_index_id=DEPLOYED_INDEX_ID
)

my_index_endpoint.deployed_indexes

10.0.9 Quick test query

Embedding a query should return relevant nearest neighbors.

test_embeddings = encode_texts_to_embeddings(text=["a midcentury modern dining table"])
# Test query
NUM_NEIGHBOURS = 5

response = my_index_endpoint.find_neighbors(
    deployed_index_id=DEPLOYED_INDEX_ID,
    queries=test_embeddings,
    num_neighbors=NUM_NEIGHBOURS,
)

response

Now let’s make that information useful, by creating helper functions to take the ids and match them to products.

# Get the ids of the nearest neighbor results
def get_nn_ids(response):
  id_list = [item.id for sublist in response for item in sublist]
  id_list = [eval(i) for i in id_list]
  print(id_list)
  results_df = product_df[product_df['id'].isin(id_list)]
  return results_df
# Create embeddings from a customer chat message
def get_embeddings(input_text):
  chat_embeddings = encode_texts_to_embeddings(text=[input_text])
  return chat_embeddings
# Retrieve the nearest neighbor lookups for
# the embedded customer message

NUM_NEIGHBOURS = 3

def get_nn_response(chat_embeddings):
  response = my_index_endpoint.find_neighbors(
    deployed_index_id=DEPLOYED_INDEX_ID,
    queries=chat_embeddings,
    num_neighbors=NUM_NEIGHBOURS,
)
  return response
# Create a dataframe of results. This will be the data on which we
# ask the language model to base its recommendations
def get_nn_ids(response):
  id_list = [item.id for sublist in response for item in sublist]
  id_list = [eval(i) for i in id_list]
  print(id_list)
  results_df = product_df[product_df['id'].isin(id_list)]
  return results_df

10.0.10 RAG using the LLM and embeddings

import vertexai
from vertexai.preview.language_models import ChatModel, InputOutputTextPair

chat_model = ChatModel.from_pretrained("chat-bison@001")
parameters = {
    "temperature": 0.1,
    "max_output_tokens": 1024,
    "top_p": 0.8,
    "top_k": 40
}

customer_message = """\
Interested in a persian style rug
"""

# Chain together the helper functions to get results
# from customer_message
results_df = get_nn_ids(get_nn_response(get_embeddings(customer_message)))

service_context=f"""You are a customer service bot, writing in polite British English. \
    Suggest the top three relevant \
    products only from {results_df}, mentioning:
     product names and \
     brief descriptions \
    Number them and leave a line between suggestions. \
    Preface the list of products with an introductory sentence such as \
    'Here are some relevant products: ' \
    Ensure each recommendation appears only once."""


chat = chat_model.start_chat(
    context=f"""{service_context}""",
)
response = chat.send_message(customer_message, **parameters)
print(f"Response from Model: \n {response.text}")

A user may ask follow up questions, which the LLM could answer based on the information in the dataframe.

response = chat.send_message("""could you tell me more about the Octagon Senoia?""", **parameters)
print(f"Response from Model: {response.text}")

10.0.11 Cleaning up

To delete all the GCP resources used, uncomment and run the following cells.

# Force undeployment of indexes and delete endpoint
# my_index_endpoint.delete(force=True)
# Delete indexes
# tree_ah_index.delete()