Skip to main content
In this tutorial, we’ll create a high-throughput, low-latency REST API for serving text-embeddings, reranking models, clip, clap and colpali. We will be using the open-source framework infinity. Infinity is very versatile, with the ability to be used across different GPUs/CPUs as well as across different frameworks. The inference server is built on top of PyTorch, optimum (ONNX/TensorRT) and CTranslate2, using FlashAttention to get the most out of your NVIDIA CUDA, AMD ROCM, CPU, AWS INF2 or APPLE MPS accelerator. Infinity uses dynamic batching and tokenization dedicated in worker threads. You can see the final working version here on Github

Project Setup

Make sure you have completed our quickstart to make sure you have our CLI installed and have created an account.
  1. Run the command: cerebrium init infinity-throughput
It would have created two files:
  • main.py: Where our entrypoint code lives
  • cerebrium.toml: Where we define our container image and auto-scaling parameters
We will start with first defining our container environment in order to run our application. Infinity has a public docker image on Dockerhub that we will use. In order for Cerebrium to pull this image (even though its public), you need to be signed in to Dockerhub. You can run the following command:
docker login -u your-dockerhub-username
# Enter your password or access token when prompted
Now that you are logged in, you can add the following to your cerebrium.toml
[cerebrium.deployment]
name = "1-high-throughput"
python_version = "3.11"
docker_base_image_url = "michaelf34/infinity:0.0.77"
disable_auth = true
include = ['./*', 'main.py', 'cerebrium.toml']
exclude = ['.*']
Depending on your hardware type and model(s) you select you will have different autoscaling criteria. You can define this with the following sections in your cerebrium.toml:
[cerebrium.hardware]
cpu = 6.0
memory = 12.0
compute = "AMPERE_A10"
region = "us-east-1"

[cerebrium.scaling]
min_replicas = 0
max_replicas = 2
cooldown = 30
replica_concurrency = 500
scaling_metric = "concurrency_utilization"

[cerebrium.dependencies.pip]
numpy = "latest"
"infinity-emb[all]" = "0.0.77"
optimum = ">=1.24.0,<2.0.0"
transformers = "<4.49"
click = "==8.1.8"
fastapi = "latest"
uvicorn = "latest"
pandas = "latest"
We will run our model using a Ampere A10 which can handle up to 500 concurrent inputs. In our main.py, we will start by creating a class that will handle all our Embedding model functionality using the infintity framework. We are building this with a few models in mind in order for us to test the functionality/results on a wide array of models.
from infinity_emb import AsyncEngineArray, EngineArgs

class InfinityModel:
    def __init__(self):
        self.model_ids = [
            "jinaai/jina-clip-v1",
            "michaelfeil/bge-small-en-v1.5",
            "mixedbread-ai/mxbai-rerank-xsmall-v1",
            "philschmid/tiny-bert-sst2-distilled"
        ]
        self.engine_array = None

    def _get_array(self):
        return AsyncEngineArray.from_args([
            EngineArgs(model_name_or_path=model, model_warmup=False)
            for model in self.model_ids
        ])

    async def setup(self):
        print(f"Setting up models: {self.model_ids}")
        self.engine_array = self._get_array()
        await self.engine_array.astart()
        print("All models loaded successfully!")


model = InfinityModel()
Since it can take long for models to load and to become ready, we will use Fast API which gives us greater control. Cerebrium allows you to bring your own ASGI server. We add the following to our main.py
from fastapi import FastAPI, Body

app = FastAPI(title="High-Throughput Embedding Service")

@app.on_event("startup")
async def startup_event():
    """Initialize models on container startup"""
    await model.setup()


@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.get("/ready")
async def ready():
    """Readiness endpoint to report model initialization state."""
    is_ready = model.engine_array is not None
    return {"ready": is_ready}
Infinity has support for the most common embedding functionality which is text embeddings, image imbeddings, reranking and classification - we will create different endpoints for each of these:
def embeddings_to_list(embeddings: list) -> list:
    """Convert list of numpy arrays to list of lists."""
    return [e.tolist() for e in embeddings]

@app.post("/embed")
async def embed(sentences: list[str] = Body(...), model_index: int = Body(1)):
    """Generate embeddings using the specified model."""
    engine = model.engine_array[model_index]
    embeddings, usage = await engine.embed(sentences=sentences)

    return {
        "embeddings": to_json(embeddings),
        "usage": to_json(usage),
        "model": model.model_ids[model_index]
    }


@app.post("/image_embed")
async def image_embed(image_urls: list[str] = Body(...), model_index: int = Body(0)):
    """Generate embeddings for images using CLIP model."""
    engine = model.engine_array[model_index]
    embeddings, usage = await engine.image_embed(images=image_urls)

    return {
        "embeddings": to_json(embeddings),
        "usage": to_json(usage),
        "model": model.model_ids[model_index]
    }


@app.post("/rerank")
async def rerank(query: str = Body(...), docs: list[str] = Body(...), model_index: int = Body(2)):
    """Rerank documents based on query relevance."""
    engine = model.engine_array[model_index]
    rankings, usage = await engine.rerank(query=query, docs=docs)

    return {
        "rankings": to_json(rankings),
        "usage": to_json(usage),
        "model": model.model_ids[model_index]
    }


@app.post("/classify")
async def classify(sentences: list[str] = Body(...), model_index: int = Body(3)):
    """Classify text sentiment."""
    engine = model.engine_array[model_index]
    classes, usage = await engine.classify(sentences=sentences)

    return {
        "classifications": to_json(classes),
        "usage": to_json(usage),
        "model": model.model_ids[model_index]
    }

Now you have a multi-purpose embedding server! Let us update our cerebrium.toml to point it to our FastAPI server. Add the following section:
[cerebrium.runtime.custom]
port = 5000
entrypoint = ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "5000"]
healthcheck_endpoint = "/health"
readycheck_endpoint = "/ready"
Now to deploy it, you can run cerebrium deploy Once deployed you should be able to inference it with a command similar to:
curl --location 'https://api.aws.us-east-1.cerebrium.ai/v4/p-xxxx/infinity-throughput/image_embed' \
--header 'Content-Type: application/json' \
--data '{"image_urls": ["https://www.borrowmydoggy.com/_next/image?url=https%3A%2F%2Fcdn.sanity.io%2Fimages%2F4ij0poqn%2Fproduction%2Fe24bfbd855cda99e303975f2bd2a1bf43079b320-800x600.jpg&w=1080&q=80"]}'
You should get a response like:
{
    "embeddings": [
        [
            -0.05284368246793747,
            0.0011637501884251833,
            -0.029046623036265373,
            ....
        ]
    ]
}
Great! Now you have a scalable, multi-purpose embedding/reranking server