Use this SDK to add realtime video, audio and data features to your Python app. By connecting to LiveKit Cloud or a self-hosted server, you can quickly build applications such as multi-modal AI, live streaming, or video calls with just a few lines of code.
This repo contains two packages
- livekit: Real-time SDK for connecting to LiveKit as a participant
- livekit-api: Access token generation and server APIs
$ pip install livekit-api
from livekit import api
import os
# will automatically use the LIVEKIT_API_KEY and LIVEKIT_API_SECRET env vars
token = api.AccessToken() \
.with_identity("python-bot") \
.with_name("Python Bot") \
.with_grants(api.VideoGrants(
room_join=True,
room="my-room",
)).to_jwt()
RoomService uses asyncio and aiohttp to make API calls. It needs to be used with an event loop.
from livekit import api
import asyncio
async def main():
lkapi = api.LiveKitAPI("https://my-project.livekit.cloud")
room_info = await lkapi.room.create_room(
api.CreateRoomRequest(name="my-room"),
)
print(room_info)
results = await lkapi.room.list_rooms(api.ListRoomsRequest())
print(results)
await lkapi.aclose()
asyncio.run(main())
Services can be accessed via the LiveKitAPI object.
lkapi = api.LiveKitAPI("https://my-project.livekit.cloud")
# Room Service
room_svc = lkapi.room
# Egress Service
egress_svc = lkapi.egress
# Ingress Service
ingress_svc = lkapi.ingress
# Sip Service
sip_svc = lkapi.sip
# Agent Dispatch
dispatch_svc = lkapi.agent_dispatch
$ pip install livekit
see room_example for full example
from livekit import rtc
async def main():
room = rtc.Room()
@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
logging.info(
"participant connected: %s %s", participant.sid, participant.identity)
async def receive_frames(stream: rtc.VideoStream):
async for frame in stream:
# received a video frame from the track, process it here
pass
# track_subscribed is emitted whenever the local participant is subscribed to a new track
@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
logging.info("track subscribed: %s", publication.sid)
if track.kind == rtc.TrackKind.KIND_VIDEO:
video_stream = rtc.VideoStream(track)
asyncio.ensure_future(receive_frames(video_stream))
# By default, autosubscribe is enabled. The participant will be subscribed to
# all published tracks in the room
await room.connect(URL, TOKEN)
logging.info("connected to room %s", room.name)
# participants and tracks that are already available in the room
# participant_connected and track_published events will *not* be emitted for them
for identity, participant in room.remote_participants.items():
print(f"identity: {identity}")
print(f"participant: {participant}")
for tid, publication in participant.track_publications.items():
print(f"\ttrack id: {publication}")
room = rtc.Room()
...
chat = rtc.ChatManager(room)
# receiving chat
@chat.on("message_received")
def on_message_received(msg: rtc.ChatMessage):
print(f"message received: {msg.participant.identity}: {msg.message}")
# sending chat
await chat.send_message("hello world")
Perform your own predefined method calls from one participant to another.
This feature is especially powerful when used with Agents, for instance to forward LLM function calls to your client application.
The participant who implements the method and will receive its calls must first register support:
@room.local_participant.register_rpc_method("greet")
async def handle_greet(data: RpcInvocationData):
print(f"Received greeting from {data.caller_identity}: {data.payload}")
return f"Hello, {data.caller_identity}!"
In addition to the payload, your handler will also receive response_timeout
, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.
The caller may then initiate an RPC call like so:
try:
response = await room.local_participant.perform_rpc(
destination_identity='recipient-identity',
method='greet',
payload='Hello from RPC!'
)
print(f"RPC response: {response}")
except Exception as e:
print(f"RPC call failed: {e}")
You may find it useful to adjust the response_timeout
parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.
LiveKit is a dynamic realtime environment and calls can fail for various reasons.
You may throw errors of the type RpcError
with a string message
in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as 1500
("Application Error"). Other built-in errors are detailed in RpcError
.
- Facelandmark: Use mediapipe to detect face landmarks (eyes, nose ...)
- Basic room: Connect to a room
- Publish hue: Publish a rainbow video track
- Publish wave: Publish a sine wave
Please join us on Slack to get help from our devs / community members. We welcome your contributions(PRs) and details can be discussed there.
LiveKit Ecosystem | |
---|---|
Realtime SDKs | React Components · Browser · Swift Components · iOS/macOS/visionOS · Android · Flutter · React Native · Rust · Node.js · Python · Unity (web) · Unity (beta) |
Server APIs | Node.js · Golang · Ruby · Java/Kotlin · Python · Rust · PHP (community) |
Agents Frameworks | Python · Playground |
Services | LiveKit server · Egress · Ingress · SIP |
Resources | Docs · Example apps · Cloud · Self-hosting · CLI |