# Install the packages
! pip3 install --upgrade google-cloud-aiplatform
! pip3 install shapely<2.0.0
10 Deep retrieval with nearest neighbours
Matching customer queries to products via embeddings and Retrieval Augmentated Generation.
10.0.1 Overview
This notebook demonstrates one method of using large language models to interact with data. Using the Wayfair WANDS dataset of more than 42,000 products, we will go through the following steps:
Download the data into a pandas dataframe
Generate embeddings for the product descriptions
Create and deploy and index of the embeddings on Vertex AI Matching Engine, a service which enables nearest neighbor search at scale
Prompt an LLM to retrieve relevant product suggestions from the embedded data.
10.0.1.1 Please note:
Deployed indexes on matching engine can quickly accrue significant costs. Please see the final cell of the notebook for cleaning up resources used.
Images from wayfair.co.uk
10.0.2 Technologies
In this notebook, we will use:
Vertex AI’s language model
Vertex AI Matching Engine, a high-scale, low-latency vector database.
10.0.3 Colab only: Uncomment the following cell to restart the kernel
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython
= IPython.Application.instance()
app True) app.kernel.do_shutdown(
Set your Google Cloud project id and region
= "<...>" # @param {type:"string"}
PROJECT_ID = "<..>" REGION
# Set the project id
! gcloud config set project {PROJECT_ID}
We will need a Cloud Storage bucket to store embeddings initially. Please create a bucket and add the URI below.
= "gs://<...>" BUCKET_URI
Authenticate your Google Cloud account Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.
- Vertex AI Workbench
Do nothing as you are already authenticated.
- Local JupyterLab instance, uncomment and run:
# ! gcloud auth login
- Colab, uncomment and run:
from google.colab import auth
auth.authenticate_user()
Install and intialize the SDK and language model. GCP uses the gecko
model for text embeddings.
import vertexai
=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI) vertexai.init(project
# Load the "Vertex AI Embeddings for Text" model
from vertexai.preview.language_models import TextEmbeddingModel
= TextEmbeddingModel.from_pretrained("textembedding-gecko@001") model
Now we’re ready to prepare the data
import os
import pandas as pd
= "data"
path
os.path.exists(path)if not os.path.exists(path):
os.makedirs(path)print("data directory created")
else:
print("data directory found")
# download datasets
!wget -q https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/product.csv
!mv *.csv data/
!ls data
The dataset features a wealth of information. The queries (user searchers), and the rating of the responses to the queries, have been particularly interesting to researchers. For this demo however we will focus on the product descriptions.
= pd.read_csv("data/product.csv", sep='\t')
product_df product_df
Filter the dataframe to consider product_id
, product_name
, product_description
.
= product_df.filter(["product_id", "product_name", "product_description"], axis=1) product_df
= product_df.rename(columns={"product_description": "product_text", "product_id": "id"}) product_df
= product_df.dropna() product_df
len(product_df)
The following three cells contain functions from this notebook from the vertex-ai-samples repository.
encode_texts_to_embeddings
will be used later to convert the product descriptions into embeddings.
from typing import List, Optional
# Define an embedding method that uses the model
def encode_texts_to_embeddings(text: List[str]) -> List[Optional[List[float]]]:
try:
= model.get_embeddings(text)
embeddings return [embedding.values for embedding in embeddings]
except Exception:
return [None for _ in range(len(text))]
These helper functions achieve the following:
generate_batches
splits the product descriptions into batches of five, since the embeddings API will field up to five text instances in each request.encode_text_to_embedding_batched
calls the embeddings API and handles rate limiting usingtime.sleep
.
import functools
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, List, Tuple
import numpy as np
from tqdm.auto import tqdm
# Generator function to yield batches of sentences
def generate_batches(
str], batch_size: int
text: List[-> Generator[List[str], None, None]:
) for i in range(0, len(text), batch_size):
yield text[i : i + batch_size]
def encode_text_to_embedding_batched(
str], api_calls_per_second: int = 10, batch_size: int = 5
text: List[-> Tuple[List[bool], np.ndarray]:
)
float]] = []
embeddings_list: List[List[
# Prepare the batches using a generator
= generate_batches(text, batch_size)
batches
= 1 / api_calls_per_second
seconds_per_job
with ThreadPoolExecutor() as executor:
= []
futures for batch in tqdm(
=math.ceil(len(text) / batch_size), position=0
batches, total
):
futures.append(
executor.submit(functools.partial(encode_texts_to_embeddings), batch)
)
time.sleep(seconds_per_job)
for future in futures:
embeddings_list.extend(future.result())
= [
is_successful is not None for text, embedding in zip(text, embeddings_list)
embedding
]= np.squeeze(
embeddings_list_successful for embedding in embeddings_list if embedding is not None])
np.stack([embedding
)return is_successful, embeddings_list_successful
Let’s encode a subset of data and check the distance metrics provide sane product suggestions.
import math
# Encode a subset of questions for validation
= product_df.product_text.tolist()[:500]
products = encode_text_to_embedding_batched(
is_successful, product_embeddings =product_df.product_text.tolist()[:500]
text
)
# Filter for successfully embedded sentences
= np.array(products)[is_successful] products
= len(product_embeddings[0])
DIMENSIONS
print(DIMENSIONS)
This function takes a description from the dataset (rather than a user) and looks for relevant matches. The first answer is likely to be the exact match.
import random
= random.randint(0, 99)
product_index
print(f"Product query: {products[product_index]} \n")
= np.dot(product_embeddings[product_index], product_embeddings.T)
scores
# Print top 3 matches
for index, (product, score) in enumerate(
sorted(zip(products, scores), key=lambda x: x[1], reverse=True)[:3]
):print(f"\t{index}: \n {product}: \n {score} \n")
10.0.4 Data formatting for building an index
We need to save the embeddings and the id
and product_name
columns to the JSON lines format in order to creat an index on Matching Engine. For more details, see the documentation here.
import tempfile
from pathlib import Path
# Create temporary file to write embeddings to
= Path(tempfile.mkdtemp())
embeddings_file_path
print(f"Embeddings directory: {embeddings_file_path}")
= np.array(product_embeddings) product_embeddings
!touch json_output.json
Let’s take a look at the shape and type of the embeddings. At the moment, the product_embeddings
are a numpy array. We will need to convert them to a Python dictionary to use them as another column in a dataframe.
type(product_embeddings)
= product_embeddings.tolist()
embeddings_list = [{'embedding': embedding} for embedding in embeddings_list] embeddings_dicts
= product_df.merge(pd.DataFrame(embeddings_dicts), left_on='id', right_index=True) embeddings_df
embeddings_df
10.0.5 JSON Lines
Now we can convert the entire dataframe to JSON lines.
= embeddings_df.to_json(orient='records', lines=True) json_lines
json_lines
import json
= 'merged_data.json'
output_file with open(output_file, 'w') as file:
for index, row in embeddings_df.iterrows():
= {
data 'id': row['id'],
'product_name': row['product_name'],
'product_text': row['product_text'],
'embedding': row['embedding']
}= json.dumps(data)
json_line file.write(json_line + '\n')
Copy the JSON lines file to Cloud Storage.
!gsutil cp merged_data.json gs://genai-experiments/
!cat json_output.json
10.0.6 Creating the index in Matching Engine
*This is a long-running operation which can take up to an hour.
= 768
DIMENSIONS # Add a display name
= "wands_index"
DISPLAY_NAME = "products and descriptions from Wayfair"
DESCRIPTION = BUCKET_URI
remote_folder
= aiplatform.MatchingEngineIndex.create_tree_ah_index(
tree_ah_index =DISPLAY_NAME,
display_name=remote_folder,
contents_delta_uri=DIMENSIONS,
dimensions=150,
approximate_neighbors_count="DOT_PRODUCT_DISTANCE",
distance_measure_type=500,
leaf_node_embedding_count=5,
leaf_nodes_to_search_percent=DESCRIPTION,
description )
In the results of the cell above, make note of the information under this line:
To use this MatchingEngineIndex in another session:
If Colab runtime resets, you will need this line to set the index variable:
index = aiplatform.MatchingEngineIndex(...)
Use gcloud
to list indexes
# Add your region below
!gcloud ai indexes list --region="<...>"
= tree_ah_index.resource_name INDEX_RESOURCE_NAME
10.0.7 Create an index endpoint
= aiplatform.MatchingEngineIndexEndpoint.create(
my_index_endpoint =DISPLAY_NAME,
display_name=DISPLAY_NAME,
description=True,
public_endpoint_enabled )
- Note, here is how to get an existing
MatchingEngineIndex
(from the output in the MatchingEngineIndex.create cell above) andMatchingEngineIndexEndpoint
(from another project, or if the Colab runtime resets).
# Fill in the values from the MatchingEngineIndex.create
# and MatchingEngineIndexEndpoint.create cells
# index = aiplatform.MatchingEngineIndex('<...>')
# my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
# index_endpoint_name = '<...>',
# )
# Write your own unique index name
= "<...>" DEPLOYED_INDEX_ID
10.0.8 Deploy the index to the endpoint
= my_index_endpoint.deploy_index(
my_index_endpoint =index, deployed_index_id=DEPLOYED_INDEX_ID
index
)
my_index_endpoint.deployed_indexes
10.0.9 Quick test query
Embedding a query should return relevant nearest neighbors.
= encode_texts_to_embeddings(text=["a midcentury modern dining table"]) test_embeddings
# Test query
= 5
NUM_NEIGHBOURS
= my_index_endpoint.find_neighbors(
response =DEPLOYED_INDEX_ID,
deployed_index_id=test_embeddings,
queries=NUM_NEIGHBOURS,
num_neighbors
)
response
Now let’s make that information useful, by creating helper functions to take the id
s and match them to products.
# Get the ids of the nearest neighbor results
def get_nn_ids(response):
= [item.id for sublist in response for item in sublist]
id_list = [eval(i) for i in id_list]
id_list print(id_list)
= product_df[product_df['id'].isin(id_list)]
results_df return results_df
# Create embeddings from a customer chat message
def get_embeddings(input_text):
= encode_texts_to_embeddings(text=[input_text])
chat_embeddings return chat_embeddings
# Retrieve the nearest neighbor lookups for
# the embedded customer message
= 3
NUM_NEIGHBOURS
def get_nn_response(chat_embeddings):
= my_index_endpoint.find_neighbors(
response =DEPLOYED_INDEX_ID,
deployed_index_id=chat_embeddings,
queries=NUM_NEIGHBOURS,
num_neighbors
)return response
# Create a dataframe of results. This will be the data on which we
# ask the language model to base its recommendations
def get_nn_ids(response):
= [item.id for sublist in response for item in sublist]
id_list = [eval(i) for i in id_list]
id_list print(id_list)
= product_df[product_df['id'].isin(id_list)]
results_df return results_df
10.0.10 RAG using the LLM and embeddings
import vertexai
from vertexai.preview.language_models import ChatModel, InputOutputTextPair
= ChatModel.from_pretrained("chat-bison@001")
chat_model = {
parameters "temperature": 0.1,
"max_output_tokens": 1024,
"top_p": 0.8,
"top_k": 40
}
= """\
customer_message Interested in a persian style rug
"""
# Chain together the helper functions to get results
# from customer_message
= get_nn_ids(get_nn_response(get_embeddings(customer_message)))
results_df
=f"""You are a customer service bot, writing in polite British English. \
service_context Suggest the top three relevant \
products only from {results_df}, mentioning:
product names and \
brief descriptions \
Number them and leave a line between suggestions. \
Preface the list of products with an introductory sentence such as \
'Here are some relevant products: ' \
Ensure each recommendation appears only once."""
= chat_model.start_chat(
chat =f"""{service_context}""",
context
)= chat.send_message(customer_message, **parameters)
response print(f"Response from Model: \n {response.text}")
A user may ask follow up questions, which the LLM could answer based on the information in the dataframe.
= chat.send_message("""could you tell me more about the Octagon Senoia?""", **parameters)
response print(f"Response from Model: {response.text}")
10.0.11 Cleaning up
To delete all the GCP resources used, uncomment and run the following cells.
# Force undeployment of indexes and delete endpoint
# my_index_endpoint.delete(force=True)
# Delete indexes
# tree_ah_index.delete()