Turbocharge LangChain: guide to 20x faster embedding

By Amog Kamsetty and Philipp Moritz   

This is part 2(part 1 here) of a blog series. In this blog, we’ll show you how to turbocharge embeddings. In future parts, we will show you how to combine a vector database and an LLM to create a fact-based question answering service. Additionally, we will optimize the code and measure performance: cost, latency and throughput.

Generating embeddings from documents is a critical step for LLM workflows. Many LLM apps are being built today through retrieval based similarity search:

  1. Documents are embedded and stored in a vector database.  

  2. Incoming queries are used to pull semantically relevant passages from the vector database, and these passages are used as context for LLMs to answer the query.

In a previous blog post, we showed how to use LangChain to do step 1, and we also showed how to parallelize this step by using Ray tasks for faster embedding creation.

In this blog post, we take it one step further, scaling out to many more documents. Continue reading to see how to use Ray Data, a distributed data processing system that’s a part of the Ray framework, to generate and store embeddings for 2,000 PDF documents from cloud storage, parallelizing across 20 GPUs, all in under 4 minutes and in less than 100 lines of code.

While in this walkthrough we use Ray Data to read PDF files from S3 cloud storage, it also supports a wide number of other data formats like text data, parquet, images, and can read data from a variety of sources like MongoDB and SQL Databases.

LinkWhy do I need to parallelize this?

Embedding - Why do I need to parallelize this?

LangChain provides all the tools and the integrations for building LLM applications, including loading, embedding, and storing documents. While LangChain works great for quickly getting started with a handful of documents, when you want to scale your corpus up to thousands or more documents, this can quickly become unwieldy.

Naively using a for loop to do this for each document within a corpus of a 2,000 documents takes 75 minutes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import os
from tqdm import tqdm

from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100,
    length_function=len,
)
model_name = "sentence-transformers/all-mpnet-base-v2"
model_kwargs = {"device": "cuda"}

hf = HuggingFaceEmbeddings(model_name=model_name, model_kwargs=model_kwargs)

# Put your directory containing PDFs here
directory = '/tmp/data/'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]

langchain_documents = []
for document in tqdm(pdf_documents):
    try:
        loader = PyPDFLoader(document)
        data = loader.load()
        langchain_documents.extend(data)
    except Exception:
        continue

print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
split_docs = text_splitter.split_documents(langchain_documents)

print("Embed and create vector index")
db = FAISS.from_documents(split_docs, embedding=hf)

Clearly, if you want to iterate quickly and try out different multiple document corpuses, splitting techniques, chunk sizes, or embedding models, just doing this in a for loop won’t cut it.

Instead, for faster development, you need to horizontally scale, and for this you need a framework to make this parallelization very easy. 

By using Ray Data, we can define our embedding generation pipeline and execute it in a few lines of code, and it will automatically scale out, leveraging the compute capabilities of all the CPUs and GPUs in our cluster.

LinkStages of our Data Pipeline

In this example, we want to generate embeddings for our document corpus consisting of the top 2,000 arxiv papers on “large language models”. There are over 30,000 pages in all these documents. The code for generating this dataset can be found here

Embedding - Stages of our Data Pipeline

Let’s take a look at the stages of our data pipeline:

  1. Load the PDF documents from our S3 bucket as raw bytes

  2. Use PyPDF to convert those bytes into string text

  3. Use LangChain’s text splitter to split the text into chunks

  4. Use a pre-trained sentence-transformers model to embed each chunk

  5. Store the embeddings and the original text into a FAISS vector store

The full data pipeline was run on 5 g4dn.12xlarge instances on AWS EC2, consisting of 20 GPUs in total. The code for the full data pipeline can be found here.

LinkStarting the Ray Cluster

Follow the steps here to set up a multi-node Ray cluster on AWS.

Ray clusters can also be started on GCP, Azure, or other cloud providers. See the Ray Cluster documentation for full info. 

Alternatively, you can use Workspaces on Anyscale to manage your Ray clusters.

Now that we have the cluster setup, let’s go through the steps in our script.

LinkInstalling Dependencies

First, we need to install the necessary dependencies on all the nodes in our Ray cluster. We can do this via Ray’s runtime environment feature.

1
2
3
import ray

ray.init(runtime_env={"pip": ["langchain", "pypdf", "sentence_transformers", "transformers"]})

LinkStage 1 - Loading the Documents

Load the 2,143 documents from our S3 bucket as raw bytes.The S3 bucket contains unmodified PDF files that have been downloaded from arxiv.

We can easily do this via Ray Data’s read APIs, which creates a Ray Dataset object. Ray Datasets are lazy. Further operations can be chained and the stages are run only when execution is triggered.

1
2
3
4
from ray.data.datasource import FileExtensionFilter

# Filter out non-PDF files.
ds = ray.data.read_binary_files("s3://ray-llm-batch-inference/", partition_filter=FileExtensionFilter("pdf"))

LinkStage 2 - Converting

Use PyPDF to load in the raw bytes and parse them as string text. We also skip over any documents or pages that are unparseable. Even after skipping these, we still have over 33,642 pages in our dataset.

We use the pypdf library directly to read PDFs directly from bytes rather than file paths. Once https://github.com/hwchase17/langchain/pull/3915 is merged, LangChain’s PyPdfLoader can be used directly.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def convert_to_text(pdf_bytes: bytes):
    pdf_bytes_io = io.BytesIO(pdf_bytes)

    try:
        pdf_doc = PdfReader(pdf_bytes_io)
    except pypdf.errors.PdfStreamError:
        # Skip pdfs that are not readable.
        # We still have over 30,000 pages after skipping these.
        return []

    text = []
    for page in pdf_doc.pages:
        try:
            text.append(page.extract_text())
        except binascii.Error:
            # Skip all pages that are not parseable due to malformed characters.
            print("parsing failed")
    return text

# We use `flat_map` as `convert_to_text` has a 1->N relationship.
# It produces N strings for each PDF (one string per page).
# Use `map` for 1->1 relationship.
ds = ds.flat_map(convert_to_text)

LinkStage 3 - Splitting

Split the text into chunks using LangChain’s TextSplitter abstraction. After applying this transformation, the 33,642 pages are split into 144,411 chunks. Each chunk will then be encoded into an embedding in Step 4.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain.text_splitter import RecursiveCharacterTextSplitter

def split_text(page_text: str):
    # Use chunk_size of 1000.
    # We felt that the answer we would be looking for would be 
    # around 200 words, or around 1000 characters.
    # This parameter can be modified based on your documents and use case.
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=100, length_function=len
    )
    split_text: List[str] = text_splitter.split_text(page_text)

    split_text = [text.replace("\n", " ") for text in split_text]
    return split_text

# We use `flat_map` as `split_text` has a 1->N relationship.
# It produces N output chunks for each input string.
# Use `map` for 1->1 relationship.
ds = ds.flat_map(split_text)

LinkStage 4 - Embedding

Then, we can embed each of our chunks using a pre-trained sentence transformer model on GPUs. Here, we leverage Ray Actors for stateful computation, allowing us to initialize a model only once per GPU, rather than for every single batch.

At the end of this stage, we have 144,411 encodings by running 20 model replicas across 20 GPUs, each processing a batch of 100 chunks at a time to maximize GPU utilization.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Embed:
    def __init__(self):
        # Specify "cuda" to move the model to GPU.
        self.transformer = SentenceTransformer(model_name, device="cuda")

    def __call__(self, text_batch: List[str]):
        # We manually encode using sentence_transformer since LangChain
        # HuggingfaceEmbeddings does not support specifying a batch size yet.
        embeddings = self.transformer.encode(
            text_batch,
            batch_size=100,  # Large batch size to maximize GPU utilization.
            device="cuda",
        ).tolist()

        return list(zip(text_batch, embeddings))

# Use `map_batches` since we want to specify a batch size to maximize GPU utilization.
ds = ds.map_batches(
    Embed,
    # Large batch size to maximize GPU utilization.
    # Too large a batch size may result in GPU running out of memory.
    # If the chunk size is increased, then decrease batch size.
    # If the chunk size is decreased, then increase batch size.
    batch_size=100,  # Large batch size to maximize GPU utilization.
    compute=ray.data.ActorPoolStrategy(min_size=20, max_size=20),  # I have 20 GPUs in my cluster
    num_gpus=1,  # 1 GPU for each actor.
)

We use the `sentence_transformers` library directly so that we can provide a specific batch size. Once https://github.com/hwchase17/langchain/pull/3914 is merged, LangChain’s `HuggingfaceEmbeddings` can be used instead.

LinkStage 5 - Storing

Finally, we can execute this Data Pipeline by iterating through it, and we store the results in a persisted FAISS vector database for future querying.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain import FAISS
from langchain.embeddings import HuggingFaceEmbeddings

text_and_embeddings = []
for output in ds.iter_rows():
    text_and_embeddings.append(output)

vectore_store = FAISS.from_embeddings(
    text_and_embeddings,
    # Provide the embedding model to embed the query.
    # The documents are already embedded.
    embedding=HuggingFaceEmbeddings(model_name=model_name)
)

# Persist the vector store.
vectore_store.save_local("faiss_index")

LinkExecution

Executing this code, we see that all 20 GPUs are utilized at near 100% utilization. And what would normally take over an hour to run, can now be done in under 4 minutes! If you use AWS spot instances, this would only cost $0.95 total.

Embedding - Execution-1
Embedding - Execution 2

LinkQuerying the Vector Database

We can now load in our persisted LangChain FAISS database, and query it for similarity search. Let’s see the top document that’s most relevant to the “prompt engineering” query:

1
2
3
4
5
6
7
8
from langchain.embeddings import HuggingFaceEmbeddings
from langchain import FAISS

model_name = "sentence-transformers/all-mpnet-base-v2"
query_embedding = HuggingFaceEmbeddings(model_name=model_name)
db = FAISS.load_local("faiss_index", query_embedding)
documents = db.similarity_search(query="prompt engineering", k=1)
[doc.page_content for doc in documents]
1
2
3
4
5
6
7
8
9
10
11
12
13
['Prompt Engineering for Job Classification 7 5 LLMs & Prompt Engineering Table '
 '3. Overview of the various prompt modifications explored in thi s study. '
 'Short name Description Baseline Provide a a job posting and asking if it is '
 'fit for a graduate. CoT Give a few examples of accurate classification before '
 'queryi ng. Zero-CoT Ask the model to reason step-by-step before providing '
 'its an swer. rawinst Give instructions about its role and the task by adding '
 'to the user msg. sysinst Give instructions about its role and the task as a '
 'system msg. bothinst Split instructions with role as a system msg and task '
 'as a user msg. mock Give task instructions by mocking a discussion where it '
 'ackn owledges them. reit Reinforce key elements in the instructions by '
 'repeating the m. strict Ask the model to answer by strictly following a '
 'given templat e. loose Ask for just the final answer to be given following a '
 'given temp late. right Asking the model to reach the right conclusion.']

LinkNext Steps

See part 3 here.

Review the code and data used in this blog in the following Github repo.

See our earlier blog series on solving Generative AI infrastructure with Ray.

If you are interested in learning more about Ray, see Ray.io and Docs.Ray.io.

To connect with the Ray community join #LLM on the Ray Slack or our Discuss forum.

If you are interested in our Ray hosted service for ML Training and Serving, see Anyscale.com/Platform and click the 'Try it now' button

Ray Summit 2023: If you are interested to learn much more about how Ray can be used to build performant and scalable LLM applications and fine-tune/train/serve LLMs on Ray, join Ray Summit on September 18-20th! We have a set of great keynote speakers including John Schulman from OpenAI and Aidan Gomez from Cohere, community and tech talks about Ray as well as practical training focused on LLMs.