In this tutorial, I am going to create a real-time voice AI agent that can respond to any query via speech, in speech, in ~500ms. This is an extremely flexible implementation where you can swap in any Large Language model, Text-to-speech (TTS) model and Speech-to-text (STT) model of your liking. This is extremely useful for use cases involving voice such as customer service bots, receptionists and many more.

To create this application, we use the PipeCat, an open source framework for voice and multimodal conversational AI that handles some of the functionality we might need such as handling user interruptions, dealing with audio data etc. We will speak with our voice AI agent via a WebRTC transport, using Daily (the creators of Pipecat) and will deploy this application on Cerebrium to show how it handles deploying and scaling our application seamlessly.

You can find the final version of the code here

Cerebrium setup

If you don’t have a Cerebrium account, you can create one by signing up here and following the documentation here to get setup

In your IDE, run the following command to create our Cerebrium starter project: cerebrium init 2-realtime-voice-agent. This creates two files:

  • Main.py - Our entrypoint file where our code lives
  • cerebrium.toml - A configuration file that contains all our build and environment settings ‍ Add the following pip packages and hardware requirements near the bottom of your cerebrium.toml. This will be used in creating our deployment environment.
[cerebrium.deployment]
# existing values...
docker_base_image_url = "registry.cerebrium.ai/daily:latest"

[cerebrium.hardware]
region = "us-east-1"
provider = "aws"
gpu = "AMPERE_A10"
cpu = 4
memory = 18.0
gpu_count = 1

[cerebrium.dependencies.pip]
torch = ">=2.0.0"
"pipecat-ai[silero, daily, openai, deepgram]" = "latest"
aiohttp = "latest"
torchaudio = "latest"
vllm = "latest"
huggingface_hub = "latest"

You will also see we specify a Docker base image above. The reason for this is Daily has supplied a Docker image that contains local Deepgram Speech-to-Text (STT) and Text-to-Speech (TTS) models. This helps us achieve our low latency since everything is running locally and not going over the network.

Custom Docker files are not support yet but are rather in the works to be released soon. This is just a very early preview of how it would work.

Pipecat setup

In this example, we will be using Llama 3 8B as our LLM and serving it via vLLM. To use Llama 3, we need to be authenticated via Hugging Face.

To authenticate ourselves, we need to go to HuggingFace and accept the model permissions for Llama 8B if we haven’t already. It takes about 30 minutes or less for them to accept your request.

In your Cerebrium dashboard, you can add your HuggingFace token as a secret by navigating to “Secrets” in the sidebar. For the sake of this tutorial, I called mine “HF_TOKEN”. We can now access these values in our code at runtime without exposing them in our code.

You can then add the following code to your main.py:

from huggingface_hub import login
import subprocess

os.environ['OUTLINES_CACHE_DIR'] = '/tmp/.outlines'

login(token=get_secret('HF_TOKEN'))
# Run vllM Server in background process
def start_server():
    while True:
        process = subprocess.Popen(
            f"subprocess.Popen(f"python -m vllm.entrypoints.openai.api_server --port 5000 --model NousResearch/Meta-Llama-3-8B-Instruct --dtype bfloat16 --api-key {get_secret('HF_TOKEN')} --download-dir /persistent-storage/", shell=True)",
            shell=True
        )
        process.wait()  # Wait for the process to complete
        logger.error("Server process ended unexpectedly. Restarting in 5 seconds...")
        time.sleep(5)  # Wait before restarting

# Start the server in a separate process
server_process = Process(target=start_server, daemon=True)
server_process.start()

Pipecat currently doesn’t support locally instantiated models and requires them to follow the OpenAI compatible format. Therefore we run the vLLM server locally on our instance in a background process. We monitor the background process to make sure it launched successfully since there seems to be a bug with rapidly starting multiple vLLM instances. If it doesn’t launch correctly, we wait 5 seconds before trying again. We set the environment variable for OUTLINES_CACHE_DIR, this has to do with a disk I/O bug in outlines that vLLM uses. GitHub issue is here

Note, we are running the vLLM server on port 5000 (8000 is automatically used by Cerebrium) and we set the download directory of the model so that subsequent cold starts can be much quicker.

Now we implement the Pipecat framework by instantiating the various components. Create a function call main with the following code:

import aiohttp
import os
import sys
import subprocess
import time
import requests
import asyncio
from multiprocessing import Process
from loguru import logger

from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame

from pipecat.processors.aggregators.llm_response import (
    LLMAssistantResponseAggregator, LLMUserResponseAggregator
)

from helpers import (
    ClearableDeepgramTTSService,
    AudioVolumeTimer,
    TranscriptionTimingLogger
)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

deepgram_voice: str = "aura-asteria-en"

async def main(room_url: str, token: str = None):

    async with aiohttp.ClientSession() as session:
        transport = DailyTransport(
            room_url,
            token if token else get_secret("DAILY_TOKEN"),
            "Respond bots",
            DailyParams(
                audio_out_enabled=True,
                transcription_enabled=False,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(params=VADParams(
                    stop_secs=0.2
                )),
                vad_audio_passthrough=True
            )
        )

        stt = DeepgramSTTService(
            name="STT",
            api_key=None,
            url='ws://127.0.0.1:8082/v1/listen'
        )

        tts = ClearableDeepgramTTSService(
            name="Voice",
            aiohttp_session=session,
            api_key=None,
            voice=deepgram_voice,
            base_url="http://127.0.0.1:8082/v1/speak"
        )

        llm = OpenAILLMService(
            name="LLM",
            api_key=get_secret("HF_TOKEN"),
            model="NousResearch/Meta-Llama-3-8B-Instruct",
            base_url="http://0.0.0.0:5000/v1"
        )

        messages = [
            {
                "role": "system",
                "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
            },
        ]

        avt = AudioVolumeTimer()
        tl = TranscriptionTimingLogger(avt)

        tma_in = LLMUserResponseAggregator(messages)
        tma_out = LLMAssistantResponseAggregator(messages)

        pipeline = Pipeline([
            transport.input(),   # Transport user input
            avt,                 # Audio volume timer
            stt,                 # Speech-to-text
            tl,                  # Transcription timing logger
            tma_in,              # User responses
            llm,                 # LLM
            tts,                 # TTS
            transport.output(),  # Transport bot output
            tma_out,             # Assistant spoken responses
        ])

        task = PipelineTask(
            pipeline,
            PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                report_only_initial_ttfb=True
            ))

First, in our main function, we initialize the daily transport layer to receive/send the audio/video data from the Daily room we will connect to. You can see we pass the room_url we would like to join as well as a token to authenticate us programmatically joining. We also set our VAD stop seconds which is the amount of time we wait for a pause before our bot will respond - in this example, we set it to 200 milliseconds.

Next we connect to our locally running Deepgram models that are part of our Docker base image we specified in our cerebrium.toml - these are running on port 8082. This is where the Pipecat framework helps convert audio data to text and vice versa. We then follow the same patten to connect our locally running LLM model from our vLLM server.

Lastly, we then put this all together as a PipelineTask which is what Pipecat runs all together. The make up of a task is completely customizable and has support for Image and Vision use cases. You can read more here. Pipeline tasks come with a parameters that make it easy to handle interruptions, swap models to our preference and much more only changing a few lines of code.

In the code above, we are importing some helper functions at the top of our file to help with our implementation. You can copy the file from the github repository here. Make sure to name the file helpers.py.

Daily Event Webhooks

The Daily Python SDK comes with a lot of event webhooks where you can trigger functionality based on events occurring within your Daily room. We would like to handle events such as a user leaving/joining a call. Continue to add the following code to the main() function.

# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
    # Kick off the conversation.
    messages.append(
        {"role": "system", "content": "Please introduce yourself to the user."})
    await task.queue_frame(LLMMessagesFrame(messages))

# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
    await task.queue_frame(EndFrame())

# If the call is ended make sure we quit as well.
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
    if state == "left":
        await task.queue_frame(EndFrame())

runner = PipelineRunner()

await runner.run(task)
await session.close()

Above we handle the following events:

  • When the first participant joins, we get the bot to introduce itself to the user. We do this by adding a message to the conversation.
  • We add support for multiple participants to join and listen/respond to the bot.
  • When a participant leaves or the call is ended, we get the bot to terminate itself.

From the code above, you will see the events are attached to “Transport”, which is the method of communication - in this case the meeting room. We then pass in our defined Pipeline task to our pipeline runner which executes indefinitely until we signal it to exit which in this case happens when a call ends. If you want to read further about the PipeCat infrastructure you can read more here

Starting Bot

We can run our instance with a minimum number of instances by settings the “min_replicas” in our cerebrium.toml for the optimal user experience however we do also want to handle autoscaling use cases. We want to make sure the vLLM server is live before the bot joins the meeting and so we make a local GET request to check this. These models take about 40s to load into VRAM from disk.

Additionally, we need to run the above code in a separate execution environment so PipeCat does not get instantiate multiple instances. To do this, we need to run the above code as a background process. This will be the entry point of our REST API endpoint to start the PipeCat bot. Once the pipecat bot has returned (ie: the call has ended) then we will return a response to our API endpoint. We therefore create the following function:

def check_vllm_model_status():
    url = "http://0.0.0.0:5000/v1/models"
    headers = {
        "Authorization": f"Bearer {get_secret('HF_TOKEN')}"
    }
    max_retries = 8
    for _ in range(max_retries):
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return True
        time.sleep(15)
    return False

def start_bot(room_url: str, token: str = None):

    def target():
        asyncio.run(main(room_url, token))

    check_vllm_model_status()
    process = Process(target=target)
    process.start()
    process.join()  # Wait for the process to complete
    return {"message": "session finished"}

That’s it! You now have a fully functioning AI bot that can interact with a user through speech in ~500ms. Imagine the possibilities!

Let us now create a user facing UI in order for you to interface with this bot.

Creating Meeting Room

Cerebrium doesn’t only have to be used to run AI heavy workloads, it can run any Python code. Therefore we define two functions for our demo that will create a room to join programmatically and a temporary token, both of which will only be usable for 5 minutes. To implement this, we use the Daily REST API.

We need to get our Daily developer token from our profile. If, you don’t have an account you can sign up for one here (they have a generous free tier). You can then go to the “developers” tab to fetch your API key - add this to your Cerebrium Secrets.

Below we create a room that only lasts 5 minutes and a temporary token to access it

def create_room():
    url = "https://api.daily.co/v1/rooms/"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_secret('DAILY_TOKEN')}"
    }
    data = {
        "properties": {
            "exp": int(time.time()) + 60*5 ##5 mins
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        room_info = response.json()
        token = create_token(room_info['name'])
        if token and 'token' in token:
            room_info['token'] = token['token']
        else:
            logger.error("Failed to create token")
            return {"message": 'There was an error creating your room', "status_code": 500}
        return room_info
    else:
        logger.error(f"Failed to create room: {response.status_code}")
        return {"message": 'There was an error creating your room', "status_code": 500}

def create_token(room_name: str):
    url = "https://api.daily.co/v1/meeting-tokens"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_secret('DAILY_TOKEN')}"
    }
    data = {
        "properties": {
            "room_name": room_name
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        token_info = response.json()
        return token_info
    else:
        logger.error(f"Failed to create token: {response.status_code}")
        return None

Deploy to Cerebrium

To deploy this application to Cerebrium you can simply run the command: cerebrium deploy in your terminal.

If it deployed successfully, you should see something like this:

We will add these endpoints to our frontend interface.

Connect frontend

We created a public fork of the PipeCat frontend to show you a nice demo of this application. You can clone the repo here.

Follow the instructions in the README.md and then populate the following variables in your .env.development.local

VITE_SERVER_URL=https://api.cortex.cerebrium.ai/v4/p-xxxxx/<APP_NAME> #This is the base url. Do not include the function names
VITE_SERVER_AUTH= #This is the JWT token you can get from the API Keys section of your Cerebrium Dashboard.

You can now run yarn dev and go to the URL: http://localhost:5173/ to test your application!

Conclusion

This tutorial acts as a good starting point for you to implement voice AI agents into your application as well as extend it into image and vision capabilities. Pipecat is an extensible and open-source framework that makes it easy to build applications using generative AI and Cerebrium makes the process seamless to deploy and auto scale while only paying for the compute you need.

Tag us as @cerebriumai so we can see what you build and please feel free to ask questions/send feedback to us on Slack or Discord communities