In this tutorial, we will show you how to implement streaming to return results to your users as soon as possible with the use of SSE.

To see the final implementation, you can view it here

Basic Setup

It is important to think of the way you develop models using Cerebrium should be identical to developing on a virtual machine or Google Colab - so converting this should be very easy! Please make sure you have the Cerebrium package installed and have logged in. If not, please take a look at our docs here

First we create our project:

cerebrium init streaming-falcon

It is important to think of the way you develop models using Cerebrium should be identical to developing on a virtual machine or Google Colab - so converting this should be very easy!

Let us add the following packages to the [cerebrium.dependencies.pip] section of our cerebrium.toml file:

[cerebrium.dependencies.pip]
peft = "git+https://github.com/huggingface/peft.git"
transformers = "git+https://github.com/huggingface/transformers.git"
accelerate = "git+https://github.com/huggingface/accelerate.git"
bitsandbytes = "latest"
sentencepiece = "latest"
torch = "2.1.0"

Our main.py file will contain our main Python code. This is a relatively simple implementation, so we can do everything in 1 file. We would like a user to send in a link to a YouTube video with a question and return to them the answer as well as the time segment of where we got that response. So let us define our request object.

from pydantic import BaseModel

class Item(BaseModel):
    prompt: str
    cutoff_len: Optional[int] = 256
    temperature: Optional[float] = 0.8
    top_p: Optional[float] = 0.75
    top_k: Optional[float] = 40
    max_new_tokens: Optional[int] = 250

Above, we use Pydantic as our data validation library. We specify the parameters that are required as well as the parameters that are not (ie: using the Optional keyword) as well as assign defaults to some values. Prompt is the only required parameter so if it is not present in the request, the user will automatically receive an error message.

Falcon Implementation

Below, we will use the Whisper model from OpenAI to convert the video audio to text. We will then split the text into its phrase segments with its respective timings, so we know the exact source of where our model got the answer from.

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    GenerationConfig,
    TextIteratorStreamer,
)
import torch

modal_path = "tiiuae/falcon-7b-instruct"

# Loading in base model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(modal_path)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
    modal_path,
    torch_dtype=torch.bfloat16,
    trust_remote_code=True,
    device_map="auto",
)

In the above, we simply import the required packages and instantiate the tokenizer and model. We do this outside the predict function, so we don’t load our model weights onto the GPU with every request but rather only on model startup.

Streaming Implementation

Below, we define our predict function, which will be responsible for our logic to stream results back from our endpoint.

def predict(item, run_id, logger):
    item = Item(**item)
    inputs = tokenizer(
      item.prompt, return_tensors="pt", max_length=512, truncation=True, padding=True
    )
    input_ids = inputs["input_ids"].to("cuda")

    streamer = TextIteratorStreamer(tokenizer)
    generation_config = GenerationConfig(
        temperature=item.temperature,
        top_p=item.top_p,
        top_k=item.top_k,
    )
    with torch.no_grad():
       generation_kwargs = {
          "input_ids": input_ids,
          "generation_config": generation_config,
          "return_dict_in_generate": True,
          "output_scores": True,
          "pad_token_id": tokenizer.eos_token_id,
          "max_new_tokens": item.max_new_tokens,
          "streamer": streamer,
      }
       model.generate(**generation_kwargs)
       for text in streamer:
        yield text #vital for streaming

Above, we receive our inputs from the request item we defined. We then implement a TextIteratorStreamer to stream output from the model as it’s ready. Lastly and most importantly, we use the yield keyword to return output from our model as its generated.

Deploy

Your cerebrium.toml file is where you can set your compute/environment. Please make sure that the GPU you specify is a AMPERE_A5000 and that you have enough memory (RAM) on your instance to run the models. You cerebrium.toml file should look like:


[cerebrium.build]
predict_data = "{\"prompt\": \"Here is some example predict data for your cerebrium.toml which will be used to test your predict function on build.\"}"
force_rebuild = false
disable_animation = false
log_level = "INFO"
disable_deployment_confirmation = false

[cerebrium.deployment]
name = "streaming-falcon"
python_version = "3.10"
include = "[./*, main.py]"
exclude = "[./.*, ./__*]"

[cerebrium.hardware]
gpu = "AMPERE_A5000"
cpu = 2
memory = 16.0
gpu_count = 1

[cerebrium.scaling]
min_replicas = 0
cooldown = 60

[cerebrium.dependencies.apt]

[cerebrium.dependencies.pip]
peft = "git+https://github.com/huggingface/peft.git"
transformers = "git+https://github.com/huggingface/transformers.git"
accelerate = "git+https://github.com/huggingface/accelerate.git"
bitsandbytes = "latest"
sentencepiece = "latest"
torch = "2.1.0"

[cerebrium.dependencies.conda]

To deploy the model use the following command:

cerebrium deploy streaming-falcon

Once deployed, we can make the following request:

Please note the end of the URL is set to stream instead of predict

curl --location --request POST 'https://run.cerebrium.ai/v3/p-xxxxxx/streaming-falcon/stream' \
--header 'Authorization: public-XXXXXXXXXXXX' \
--header 'Content-Type: application/json' \
--data-raw '{
    "url": "https://www.youtube.com/watch?v=UF8uR6Z6KLc&ab_channel=Stanford",
    "question": "How old was Steve Jobs when started Apple?"
}'

We then get the output of the model as SSE (Server Side events). Below is an example from postman:

Streaming