Streaming Output - Falcon 7B
In this tutorial, we will show you how to implement streaming to return results to your users as soon as possible with the use of SSE.
To see the final implementation, you can view it here
Basic Setup
It is important to think of the way you develop models using Cerebrium should be identical to developing on a virtual machine or Google Colab - so converting this should be very easy! Please make sure you have the Cerebrium package installed and have logged in. If not, please take a look at our docs here
First we create our project:
cerebrium init streaming-falcon
It is important to think of the way you develop models using Cerebrium should be identical to developing on a virtual machine or Google Colab - so converting this should be very easy!
Let us create our requirements.txt file and add the following packages:
git+https://github.com/huggingface/peft.git
git+https://github.com/huggingface/transformers.git
git+https://github.com/huggingface/accelerate.git
bitsandbytes
sentencepiece
torch
Our main.py file will contain our main Python code. This is a relatively simple implementation, so we can do everything in 1 file. We would like a user to send in a link to a YouTube video with a question and return to them the answer as well as the time segment of where we got that response. So let us define our request object.
from pydantic import BaseModel
class Item(BaseModel):
prompt: str
cutoff_len: Optional[int] = 256
temperature: Optional[float] = 0.8
top_p: Optional[float] = 0.75
top_k: Optional[float] = 40
max_new_tokens: Optional[int] = 250
Above, we use Pydantic as our data validation library. We specify the parameters that are required as well as the parameters that are not (ie: using the Optional keyword) as well as assign defaults to some values. Prompt is the only required parameter so if it is not present in the request, the user will automatically receive an error message.
Falcon Implementation
Below, we will use the Whisper model from OpenAI to convert the video audio to text. We will then split the text into its phrase segments with its respective timings, so we know the exact source of where our model got the answer from.
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
GenerationConfig,
TextIteratorStreamer,
)
import torch
modal_path = "tiiuae/falcon-7b-instruct"
# Loading in base model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(modal_path)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
modal_path,
torch_dtype=torch.bfloat16,
trust_remote_code=True,
device_map="auto",
)
In the above, we simply import the required packages and instantiate the tokenizer and model. We do this outside the predict function, so we don’t load our model weights onto the GPU with every request but rather only on model startup.
Streaming Implementation
Below, we define our predict function, which will be responsible for our logic to stream results back from our endpoint.
def predict(item, run_id, logger):
item = Item(**item)
inputs = tokenizer(
item.prompt, return_tensors="pt", max_length=512, truncation=True, padding=True
)
input_ids = inputs["input_ids"].to("cuda")
streamer = TextIteratorStreamer(tokenizer)
generation_config = GenerationConfig(
temperature=item.temperature,
top_p=item.top_p,
top_k=item.top_k,
)
with torch.no_grad():
generation_kwargs = {
"input_ids": input_ids,
"generation_config": generation_config,
"return_dict_in_generate": True,
"output_scores": True,
"pad_token_id": tokenizer.eos_token_id,
"max_new_tokens": item.max_new_tokens,
"streamer": streamer,
}
model.generate(**generation_kwargs)
for text in streamer:
yield text #vital for streaming
Above, we receive our inputs from the request item we defined. We then implement a TextIteratorStreamer to stream output from the model as it’s ready. Lastly and most importantly, we use the yield keyword to return output from our model as its generated.
Deploy
Your config.yaml file is where you can set your compute/environment. Please make sure that the hardware you specify is a AMPERE_A5000 and that you have enough memory (RAM) on your instance to run the models. You config.yaml file should look like:
%YAML 1.2
---
hardware: AMPERE_A5000
memory: 14
cpu: 2
min_replicas: 0
log_level: INFO
include: '[./*, main.py, requirements.txt, pkglist.txt, conda_pkglist.txt]'
exclude: '[./.*, ./__*]'
cooldown: 60
disable_animation: false
To deploy the model use the following command:
cerebrium deploy streaming-falcon
Once deployed, we can make the following request:
Please note the end of the URL is set to stream instead of predict
curl --location --request POST 'https://run.cerebrium.ai/v3/p-xxxxxx/streaming-falcon/stream' \
--header 'Authorization: public-XXXXXXXXXXXX' \
--header 'Content-Type: application/json' \
--data-raw '{
"url": "https://www.youtube.com/watch?v=UF8uR6Z6KLc&ab_channel=Stanford",
"question": "How old was Steve Jobs when started Apple?"
}'
We then get the output of the model as SSE (Server Side events). Below is an example from postman: