# Demo: how rate-limits shape throughput and latency. - MockElevenLabs class with two fake endpoints: - clone_voice() (simulates 0.8-10 s work) - tts() (simulates 0.2-0.4 s work) Give each endpoint limits that look like the real ones - Voice-clone: only 1 job may run at a time, 1 request per second. - TTS: up to 4 jobs in parallel and 5 requests per second. How those limits are enforced - “Token bucket” → counts how many calls have happened in the last second (RPS). - “Semaphore” → counts how many calls are running right now (concurrency). - If either bucket is full, the co-routine just await asyncio.sleep(0.05) and keeps trying until a slot opens. Fire a bunch of users at it - simulate_batch() starts 2 or 5 concurrent co-routines that call either clone_voice or tts. - run_all() does this for both endpoints, prints how long the whole group had to wait. Why the times are what they are - Voice-clone requests are forced to run one after another, so 5 users need roughly 5 × (0.8-10 s). - TTS requests can overlap (max 4 at once) and start 5 per second, so they finish much faster. TLDR: throttle layer + a quick load test that shows how rate-limits shape throughput and latency.
1import asyncio, time, random, hashlib, collections
2from typing import Literal, NamedTuple, List
3import nest_asyncio, asyncio
4nest_asyncio.apply()
5
6# ------------------------------------------------------------------ #
7# ------------------------------------------------------------------ #
8
9class RateLimit(NamedTuple):
10 rps: int
11 concurrency: int
12
13class MockElevenLabs:
14 """
15 Extremely simplified model of the real rules:
16 • /voice/clone : concurrency = 1, no explicit RPS cap
17 • /tts : RPS and concurrency caps
18 """
19
20 CLONE_LIMIT = RateLimit(rps=1, concurrency=1) # 1 job at a time
21 TTS_LIMIT = RateLimit(rps=5, concurrency=4)
22
23 def __init__(self):
24 # token bucket for RPS; one for each endpoint
25 self._buckets = {
26 "clone": collections.deque(),
27 "tts": collections.deque(),
28 }
29 # in-flight counters
30 self._inflight = {"clone": 0, "tts": 0}
31
32 # ------------------------------------------------------------------ #
33 async def clone_voice(self, voice_name: str) -> str:
34 print("clone_voice", voice_name)
35 await self._acquire("clone")
36 try:
37 # simulation mode up to 10.2s
38 await asyncio.sleep(random.uniform(0.8, 10.2))
39 return f"voice_id::{hashlib.sha1(voice_name.encode()).hexdigest()[:8]}"
40 finally:
41 self._release("clone")
42
43 async def tts(self, voice_id: str, text: str) -> bytes:
44 print("tts", voice_id, text)
45 await self._acquire("tts")
46 try:
47 # simultaiton mode up to 0.4s
48 await asyncio.sleep(random.uniform(0.2, 0.4))
49 return f"AUDIO({voice_id[:4]}):{text[:10]}...".encode()
50 finally:
51 self._release("tts")
52
53 # ------------------------------------------------------------------ #
54 async def _acquire(self, endpoint: Literal["clone", "tts"]):
55 limit = self.CLONE_LIMIT if endpoint == "clone" else self.TTS_LIMIT
56 while True:
57 now = time.time()
58
59 # ---- enforce RPS ------------------------------------------------
60 bucket = self._buckets[endpoint]
61 while bucket and now - bucket[0] > 1:
62 bucket.popleft()
63 if len(bucket) < limit.rps:
64 bucket.append(now)
65 else:
66 await asyncio.sleep(0.05)
67 continue
68
69 if self._inflight[endpoint] < limit.concurrency:
70 self._inflight[endpoint] += 1
71 return
72 await asyncio.sleep(0.05)
73
74 def _release(self, endpoint: str):
75 self._inflight[endpoint] -= 1
76
77# ------------------------------------------------------------------ #
78# ------------------------------------------------------------------ #
79
80async def simulate_batch(
81 api: MockElevenLabs,
82 n_users: int,
83 mode: Literal["clone", "tts"],
84) -> float:
85 """
86 Spawn `n_users` concurrent requests of type `mode`.
87 Returns wall-clock duration.
88 """
89 start = time.perf_counter()
90
91 async def one_user(idx: int):
92 if mode == "clone":
93 await api.clone_voice(f"user{idx}")
94 else:
95 await api.tts("voice123", f"hello from user {idx}")
96
97 await asyncio.gather(*(one_user(i) for i in range(n_users)))
98 return time.perf_counter() - start
99
100
101# ------------------------------------------------------------------ #
102# ------------------------------------------------------------------ #
103
104def run_all():
105 api = MockElevenLabs()
106 for mode in ("clone", "tts"):
107 print(f"\n=== {mode.upper()} ===")
108 for n in (2, 5):
109 dur = asyncio.run(simulate_batch(api, n, mode))
110 print(f"{n:>2} concurrent → finished in {dur:5.2f} s")
111
112run_all()
Created on 6/30/2025