Large-scale Document Retrieval with ElasticSearch
A tutorial to understand the process of retrieving documents/items using elastic search and vector indexing methods.
- Retrieval Flow Overview
 - Part 1 - Setting up Elasticsearch
 - Part 2 - Walking through an embedding-based retrieval system
 - Part 3 - Approximate Nearest Neighbor (ANN) Algorithms
 
# download the latest elasticsearch version
!wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.11.1-linux-x86_64.tar.gz
!tar -xzvf elasticsearch-7.11.1-linux-x86_64.tar.gz
!chown -R daemon:daemon elasticsearch-7.11.1
# prep the elasticsearch server
import os
from subprocess import Popen, PIPE, STDOUT
es_subprocess = Popen(['elasticsearch-7.11.1/bin/elasticsearch'], stdout=PIPE, stderr=STDOUT, preexec_fn=lambda : os.setuid(1))
# wait for a few minutes for the local host to start
!curl -X GET "localhost:9200/"
# install elasticsearch python api
!pip install -q elasticsearch
# check if elasticsearch server is properly running in the background
from elasticsearch import Elasticsearch, helpers
es_client = Elasticsearch(['localhost'])
es_client.info()
!wget https://files.grouplens.org/datasets/movielens/ml-25m.zip --no-check-certificate
!unzip ml-25m.zip
import pandas as pd
data = pd.read_csv('ml-25m/movies.csv').drop_duplicates()
data.head()
import tensorflow_hub as hub
from timeit import default_timer as timer
import json
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-large/5")
INDEX_NAME = "movie_title"
BATCH_SIZE = 200
SEARCH_SIZE = 10
MAPPINGS = {
    'mappings': {'_source': {'enabled': 'true'},
                 'dynamic': 'true',
                 'properties': {'title_vector':
                                {'dims': 512, 'type': 'dense_vector'},
                                'movie_id': {'type': 'keyword'},
                                'genres': {'type': 'keyword'}
                                }
                 },
            'settings': {'number_of_replicas': 1, 'number_of_shards':2}
}
index_movie_lens(data, num_doc=2000)
return_top_movies("war")
!pip install faiss
!pip install nmslib
!apt-get install libomp-dev
import faiss
import nmslib
documents = data['title'].to_list()[:2000]
# # OOM for large document size
embeddings = embed(documents).numpy()
embeddings.shape
class DemoIndexLSH():
  def __init__(self, dimension, documents, embeddings):
    self.dimension = dimension
    self.documents = documents
    self.embeddings = embeddings
  def build(self, num_bits=8):
    self.index = faiss.IndexLSH(self.dimension, num_bits)
    self.index.add(self.embeddings)
  def query(self, input_embedding, k=5):
    distances, indices = self.index.search(input_embedding, k)
    return [(distance, self.documents[index]) for distance, index in zip(distances[0], indices[0])]
index_lsh = DemoIndexLSH(512, documents, embeddings)
index_lsh.build(num_bits=16)
class DemoIndexIVFPQ():
  def __init__(self, dimension, documents, embeddings):
    self.dimension = dimension
    self.documents = documents
    self.embeddings = embeddings
  def build(self,
            number_of_partition=2,
            number_of_subquantizers=2,
            subvector_bits=4):
    quantizer = faiss.IndexFlatL2(self.dimension)
    self.index = faiss.IndexIVFPQ(quantizer, 
                                  self.dimension,
                                  number_of_partition,
                                  number_of_subquantizers,
                                  subvector_bits)
    self.index.train(self.embeddings)
    self.index.add(self.embeddings)
  def query(self, input_embedding, k=5):
    distances, indices = self.index.search(input_embedding, k)
    return [(distance, self.documents[index]) for distance, index in zip(distances[0], indices[0])]
index_pq = DemoIndexIVFPQ(512, documents, embeddings)
index_pq.build()
class DemoHNSW():
  def __init__(self, dimension, documents, embeddings):
    self.dimension = dimension
    self.documents = documents
    self.embeddings = embeddings
  def build(self, num_bits=8):
    self.index = nmslib.init(method='hnsw', space='cosinesimil')
    self.index.addDataPointBatch(self.embeddings)
    self.index.createIndex({'post': 2}, print_progress=True)
  def query(self, input_embedding, k=5):
    indices, distances = self.index.knnQuery(input_embedding, k)
    return [(distance, self.documents[index]) for distance, index in zip(distances, indices)]
index_hnsw = DemoHNSW(512, documents, embeddings)
index_hnsw.build()
class DemoIndexFlatL2():
  def __init__(self, dimension, documents, embeddings):
    self.dimension = dimension
    self.documents = documents
    self.embeddings = embeddings
  def build(self, num_bits=8):
    self.index = faiss.IndexFlatL2(self.dimension)
    self.index.add(self.embeddings)
  def query(self, input_embedding, k=5):
    distances, indices = self.index.search(input_embedding, k)
    return [(distance, self.documents[index]) for distance, index in zip(distances[0], indices[0])]
index_flat = DemoIndexFlatL2(512, documents, embeddings)
index_flat.build()
def return_ann_top_movies(ann_index, query, k=SEARCH_SIZE):
  query_vector = embed([query]).numpy()
  search_start = timer()
  top_docs = ann_index.query(query_vector, k)
  search_time = timer() - search_start
  print("search time: {:.2f} ms".format(search_time * 1000))
  return top_docs
return_ann_top_movies(index_flat, "romance")
return_ann_top_movies(index_lsh, "romance")
return_ann_top_movies(index_pq, "romance")
return_ann_top_movies(index_hnsw, "romance")