In this tutorial, we’ll create a real-time voice AI agent that responds to queries via speech in ~500ms. This flexible implementation lets you swap in any Large Language Model, Text-to-Speech (TTS) model, and Speech-to-Text (STT) model. It’s ideal for voice-based use cases like customer service bots and receptionists.

To create this app, we use PipeCat, an open-source framework for voice and multimodal conversational AI that handles user interruptions, audio data, and other essential functions. We communicate with our voice AI agent via WebRTC transport using Daily (PipeCat’s creators) and deploy the app on Cerebrium for seamless deployment and scaling.

You can find the final version of the code here

Cerebrium setup

If you don’t have a Cerebrium account, you can create one by signing up here and following the documentation here to get set up.

In your IDE, run the following command to create our Cerebrium starter project: cerebrium init 2-realtime-voice-agent. This creates two files:

  • main.py: Our entrypoint file where our code lives.
  • cerebrium.toml: A configuration file that contains all our build and environment settings.

Add the following pip packages and hardware requirements near the bottom of your cerebrium.toml to create your deployment environment:

[cerebrium.deployment]
# existing values...
docker_base_image_url = "registry.cerebrium.ai/daily:latest"

[cerebrium.hardware]
region = "us-east-1"
provider = "aws"
gpu = "AMPERE_A10"
cpu = 4
memory = 18.0
gpu_count = 1

[cerebrium.dependencies.pip]
torch = ">=2.0.0"
"pipecat-ai[silero, daily, openai, deepgram]" = "latest"
aiohttp = "latest"
torchaudio = "latest"
vllm = "latest"
huggingface_hub = "latest"

We specify a Docker base image that contains local Deepgram Speech-to-Text (STT) and Text-to-Speech (TTS) models, provided by Daily. Running everything locally instead of over the network helps achieve low latency.

Custom Docker files are not support yet but are rather in the works to be released soon. This is just a very early preview of how it would work.

Pipecat setup

In this example, we will be using Llama 3 8B as our LLM and serving it via vLLM. To use Llama 3, we need to be authenticated via Hugging Face.

To authenticate ourselves, we need to go to HuggingFace and accept the model permissions for Llama 8B if we haven’t already. It takes about 30 minutes or less for them to accept your request.

In your Cerebrium dashboard, you can add your HuggingFace token as a secret by navigating to “Secrets” in the sidebar. For the sake of this tutorial, I called mine “HF_TOKEN”. We can now access these values in our code at runtime without exposing them in our code.

You can then add the following code to your main.py:

from huggingface_hub import login
import subprocess
import os

os.environ['OUTLINES_CACHE_DIR'] = '/tmp/.outlines'

login(token=os.environ.get('HF_TOKEN'))
# Run vllM Server in background process
def start_server():
    while True:
        process = subprocess.Popen(
            f"subprocess.Popen(f"python -m vllm.entrypoints.openai.api_server --port 5000 --model NousResearch/Meta-Llama-3-8B-Instruct --dtype bfloat16 --api-key {os.environ.get('HF_TOKEN')} --download-dir /persistent-storage/", shell=True)",
            shell=True
        )
        process.wait()  # Wait for the process to complete
        logger.error("Server process ended unexpectedly. Restarting in 5 seconds...")
        time.sleep(5)  # Wait before restarting

# Start the server in a separate process
server_process = Process(target=start_server, daemon=True)
server_process.start()

Since PipeCat requires models to follow the OpenAI-compatible format and doesn’t support local instantiation, we run the vLLM server locally in a background process. We monitor the process for successful launch due to a known issue with rapidly starting multiple vLLM instances, retrying after 5 seconds if needed. We set the OUTLINES_CACHE_DIR environment variable to address a disk I/O bug in outlines used by vLLM (see GitHub issue here).

Note, we are running the vLLM server on port 5000 (8000 is automatically used by Cerebrium) and we set the download directory of the model so that subsequent cold starts can be much quicker.

Now we implement the Pipecat framework by instantiating the various components. Create a function call main with the following code:

import aiohttp
import os
import sys
import subprocess
import time
import requests
import asyncio
from multiprocessing import Process
from loguru import logger

from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame

from pipecat.processors.aggregators.llm_response import (
    LLMAssistantResponseAggregator, LLMUserResponseAggregator
)

from helpers import (
    ClearableDeepgramTTSService,
    AudioVolumeTimer,
    TranscriptionTimingLogger
)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

deepgram_voice: str = "aura-asteria-en"

async def main(room_url: str, token: str = None):

    async with aiohttp.ClientSession() as session:
        transport = DailyTransport(
            room_url,
            token if token else os.environ.get("DAILY_TOKEN"),
            "Respond bots",
            DailyParams(
                audio_out_enabled=True,
                transcription_enabled=False,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(params=VADParams(
                    stop_secs=0.2
                )),
                vad_audio_passthrough=True
            )
        )

        stt = DeepgramSTTService(
            name="STT",
            api_key=None,
            url='ws://127.0.0.1:8082/v1/listen'
        )

        tts = ClearableDeepgramTTSService(
            name="Voice",
            aiohttp_session=session,
            api_key=None,
            voice=deepgram_voice,
            base_url="http://127.0.0.1:8082/v1/speak"
        )

        llm = OpenAILLMService(
            name="LLM",
            api_key=os.environ.get("HF_TOKEN"),
            model="NousResearch/Meta-Llama-3-8B-Instruct",
            base_url="http://0.0.0.0:5000/v1"
        )

        messages = [
            {
                "role": "system",
                "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
            },
        ]

        avt = AudioVolumeTimer()
        tl = TranscriptionTimingLogger(avt)

        tma_in = LLMUserResponseAggregator(messages)
        tma_out = LLMAssistantResponseAggregator(messages)

        pipeline = Pipeline([
            transport.input(),   # Transport user input
            avt,                 # Audio volume timer
            stt,                 # Speech-to-text
            tl,                  # Transcription timing logger
            tma_in,              # User responses
            llm,                 # LLM
            tts,                 # TTS
            transport.output(),  # Transport bot output
            tma_out,             # Assistant spoken responses
        ])

        task = PipelineTask(
            pipeline,
            PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                report_only_initial_ttfb=True
            ))

In our main function, we initialize the Daily transport layer to handle audio/video data from the Daily room. We pass the room URL and authentication token for programmatic joining. We set the VAD stop seconds to 200 milliseconds, which defines the pause duration before the bot responds.

Next, we connect to our locally running Deepgram models from our Docker base image on port 8082. The PipeCat framework handles audio-to-text and text-to-audio conversion. We then connect to our local LLM model from the vLLM server using the same pattern.

Finally, we combine everything into a PipelineTask, which PipeCat executes. Tasks are fully customizable and support Image and Vision use cases (learn more here). Pipeline tasks include parameters for handling interruptions, swapping models, and other features with minimal code changes.

In the code above, we are importing some helper functions at the top of our file to help with our implementation. You can copy the file from the github repository here. Make sure to name the file helpers.py.

Daily Event Webhooks

The Daily Python SDK comes with a lot of event webhooks where you can trigger functionality based on events occurring within your Daily room. We would like to handle events such as a user leaving/joining a call. Continue to add the following code to the main() function.

# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
    # Kick off the conversation.
    messages.append(
        {"role": "system", "content": "Please introduce yourself to the user."})
    await task.queue_frame(LLMMessagesFrame(messages))

# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
    await task.queue_frame(EndFrame())

# If the call is ended make sure we quit as well.
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
    if state == "left":
        await task.queue_frame(EndFrame())

runner = PipelineRunner()

await runner.run(task)
await session.close()

Above we handle the following events:

  • When the first participant joins, we get the bot to introduce itself to the user. We do this by adding a message to the conversation.
  • We add support for multiple participants to join and listen/respond to the bot.
  • When a participant leaves or the call is ended, we get the bot to terminate itself.

From the code above, you will see the events are attached to “Transport”, which is the method of communication - in this case the meeting room. We then pass in our defined Pipeline task to our pipeline runner which executes indefinitely until we signal it to exit which in this case happens when a call ends. If you want to read further about the PipeCat infrastructure you can read more here

Starting Bot

We set min_replicas in our cerebrium.toml to ensure optimal user experience while supporting autoscaling. Before the bot joins a meeting, we verify the vLLM server is running with a local GET request. Note that these models take about 40 seconds to load into VRAM from disk.

We run the code in a separate execution environment to prevent multiple PipeCat instances. This background process serves as the entry point for our REST API endpoint to start the PipeCat bot. When the call ends and the bot returns, we send a response to our API endpoint. We therefore create the following function:

def check_vllm_model_status():
    url = "http://0.0.0.0:5000/v1/models"
    headers = {
        "Authorization": f"Bearer {os.environ.get('HF_TOKEN')}"
    }
    max_retries = 8
    for _ in range(max_retries):
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return True
        time.sleep(15)
    return False

def start_bot(room_url: str, token: str = None):

    def target():
        asyncio.run(main(room_url, token))

    check_vllm_model_status()
    process = Process(target=target)
    process.start()
    process.join()  # Wait for the process to complete
    return {"message": "session finished"}

That’s it! You now have a fully functioning AI bot that can interact with a user through speech in ~500ms. Imagine the possibilities!

Let us now create a user facing UI in order for you to interface with this bot.

Creating Meeting Room

Cerebrium can run any Python code, not just AI workloads. For our demo, we define two functions that use the Daily REST API to create a room and temporary token, both valid for 5 minutes.

Get your Daily developer token from your profile. If you don’t have an account, sign up here (they offer a generous free tier). Navigate to the “developers” tab to get your API key and add it to your Cerebrium Secrets.

Below we create a room that only lasts 5 minutes and a temporary token to access it

def create_room():
    url = "https://api.daily.co/v1/rooms/"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ.get('DAILY_TOKEN')}"
    }
    data = {
        "properties": {
            "exp": int(time.time()) + 60*5 ##5 mins
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        room_info = response.json()
        token = create_token(room_info['name'])
        if token and 'token' in token:
            room_info['token'] = token['token']
        else:
            logger.error("Failed to create token")
            return {"message": 'There was an error creating your room', "status_code": 500}
        return room_info
    else:
        logger.error(f"Failed to create room: {response.status_code}")
        return {"message": 'There was an error creating your room', "status_code": 500}

def create_token(room_name: str):
    url = "https://api.daily.co/v1/meeting-tokens"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ.get('DAILY_TOKEN')}"
    }
    data = {
        "properties": {
            "room_name": room_name
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        token_info = response.json()
        return token_info
    else:
        logger.error(f"Failed to create token: {response.status_code}")
        return None

Deploy to Cerebrium

To deploy this application to Cerebrium you can simply run the command: cerebrium deploy in your terminal.

If it deployed successfully, you should see something like this:

We will add these endpoints to our frontend interface.

Connect frontend

We created a public fork of the PipeCat frontend to show you a nice demo of this application. You can clone the repo here.

Follow the instructions in the README.md and then populate the following variables in your .env.development.local

VITE_SERVER_URL=https://api.cortex.cerebrium.ai/v4/p-xxxxx/<APP_NAME> #This is the base url. Do not include the function names
VITE_SERVER_AUTH= #This is the JWT token you can get from the API Keys section of your Cerebrium Dashboard.

You can now run yarn dev and go to the URL: http://localhost:5173/ to test your application!

Conclusion

This tutorial provides a foundation for implementing voice AI agents in your app and extending into image and vision capabilities. PipeCat is an extensible, open-source framework for building generative AI apps, while Cerebrium provides seamless deployment and autoscaling with pay-as-you-go compute.

Tag us as @cerebriumai to showcase your work and join our Slack or Discord communities for questions and feedback.