This example is only compatible with CLI v1.20 and later. Should you be making use of an older version of the CLI, please run pip install --upgrade cerebrium to upgrade it to the latest version.

In this tutorial, we will show you how to implement streaming to return results to your users as soon as possible with the use of SSE.

To see the final implementation, you can view it here

Basic Setup

It is important to think of the way you develop models using Cerebrium should be identical to developing on a virtual machine or Google Colab - so converting this should be very easy! Please make sure you have the Cerebrium package installed and have logged in. If not, please take a look at our docs here

First we create our project:

cerebrium init 5-streaming-endpoint

It is important to think of the way you develop models using Cerebrium should be identical to developing on a virtual machine or Google Colab - so converting this should be very easy!

Let us add the following packages to the [cerebrium.dependencies.pip] section of our cerebrium.toml file:

[cerebrium.dependencies.pip]
peft = "git+https://github.com/huggingface/peft.git"
transformers = "git+https://github.com/huggingface/transformers.git"
accelerate = "git+https://github.com/huggingface/accelerate.git"
bitsandbytes = "latest"
sentencepiece = "latest"
pydantic = "latest"
torch = "2.1.0"

Our main.py file will contain our main Python code. This is a relatively simple implementation, so we can do everything in 1 file. We would like a user to send in a link to a YouTube video with a question and return to them the answer as well as the time segment of where we got that response. So let us define our request object.

from pydantic import BaseModel

class Item(BaseModel):
    prompt: str
    cutoff_len: int
    temperature: float
    top_p: float
    top_k: float
    max_new_tokens: int

Above, we use Pydantic as our data validation library. We specify the parameters that are required as well as the parameters that are not (ie: using the Optional keyword) as well as assign defaults to some values. Prompt is the only required parameter so if it is not present in the request, the user will automatically receive an error message.

Falcon Implementation

Below, we will use the Whisper model from OpenAI to convert the video audio to text. We will then split the text into its phrase segments with its respective timings, so we know the exact source of where our model got the answer from.

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    GenerationConfig,
    TextIteratorStreamer,
)
import torch

modal_path = "tiiuae/falcon-7b-instruct"

# Loading in base model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(modal_path)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
    modal_path,
    torch_dtype=torch.bfloat16,
    trust_remote_code=True,
    device_map="auto",
)

In the above, we simply import the required packages and instantiate the tokenizer and model. We do this outside the predict function, so we don’t load our model weights onto the GPU with every request but rather only on model startup.

Streaming Implementation

Below, we define our predict function, which will be responsible for our logic to stream results back from our endpoint.

def stream(prompt, cutoff_len=256, temperature=0.8, top_p=0.75, top_k=40, max_new_tokens=250):
    item = Item(
        prompt=prompt,
        cutoff_len=cutoff_len,
        temperature=temperature,
        top_p=top_p,
        top_k=top_k,
        max_new_tokens=max_new_tokens,
    )
    inputs = tokenizer(
        item.prompt, return_tensors="pt", max_length=512, truncation=True, padding=True
    )
    input_ids = inputs["input_ids"].to("cuda")

    streamer = TextIteratorStreamer(tokenizer)
    generation_config = GenerationConfig(
        temperature=item.temperature,
        top_p=item.top_p,
        top_k=item.top_k,
    )
    with torch.no_grad():
        generation_kwargs = {
            "input_ids": input_ids,
            "generation_config": generation_config,
            "return_dict_in_generate": True,
            "output_scores": True,
            "pad_token_id": tokenizer.eos_token_id,
            "max_new_tokens": item.max_new_tokens,
            "streamer": streamer,
        }
        model.generate(**generation_kwargs)
        for text in streamer:
            yield text  # vital for streaming

Above, we receive our inputs from the request item we defined. We then implement a TextIteratorStreamer to stream output from the model as it’s ready. Lastly and most importantly, we use the yield keyword to return output from our model as its generated.

Deploy

Your cerebrium.toml file is where you can set your compute/environment. You cerebrium.toml file should look like:

[cerebrium.build]
predict_data = "{\"prompt\": \"Here is some example predict data for your config.yaml which will be used to test your predict function on build.\"}"
hide_public_endpoint = false
disable_animation = false
disable_build_logs = false
disable_syntax_check = false
disable_predict = false
log_level = "INFO"
disable_confirmation = false

[cerebrium.deployment]
name = "5-streaming-endpoint"
python_version = "3.11"
include = ["./*", "main.py", "cerebrium.toml"]
exclude = ["./example_exclude"]
docker_base_image_url = "nvidia/cuda:12.1.1-runtime-ubuntu22.04"

[cerebrium.hardware]
region = "us-east-1"
provider = "aws"
compute = "AMPERE_A10"
cpu = 2
memory = 16.0
gpu_count = 1

[cerebrium.scaling]
min_replicas = 0
max_replicas = 5
cooldown = 60

[cerebrium.dependencies.pip]
peft = "git+https://github.com/huggingface/peft.git"
transformers = "git+https://github.com/huggingface/transformers.git"
accelerate = "git+https://github.com/huggingface/accelerate.git"
bitsandbytes = "latest"
sentencepiece = "latest"
pydantic = "latest"
torch = "2.1.0"

[cerebrium.dependencies.conda]

[cerebrium.dependencies.apt]

To deploy the model use the following command:

cerebrium deploy streaming-falcon

Once deployed, we can make the following request:

Please our function is called stream and so this is what the final path should be in your url

curl --location --request POST 'https://api.cortex.cerebrium.ai/v4/p-<YOUR PROJECT ID>/5-streaming-endpoint/stream' \
--header 'Authorization: Bearer <YOUR TOKEN HERE>' \
--header 'Content-Type: application/json' \
--data-raw '{
    "prompt": "Tell me a story",
}'

We then get the output of the model as SSE (Server Side events). Below is an example from postman: