Throughput and latency controls by rate-limits

PY
S
Python

# Demo: how rate-limits shape throughput and latency. - MockElevenLabs class with two fake endpoints: - clone_voice() (simulates 0.8-10 s work) - tts() (simulates 0.2-0.4 s work) Give each endpoint limits that look like the real ones - Voice-clone: only 1 job may run at a time, 1 request per second. - TTS: up to 4 jobs in parallel and 5 requests per second. How those limits are enforced - “Token bucket” → counts how many calls have happened in the last second (RPS). - “Semaphore” → counts how many calls are running right now (concurrency). - If either bucket is full, the co-routine just await asyncio.sleep(0.05) and keeps trying until a slot opens. Fire a bunch of users at it - simulate_batch() starts 2 or 5 concurrent co-routines that call either clone_voice or tts. - run_all() does this for both endpoints, prints how long the whole group had to wait. Why the times are what they are - Voice-clone requests are forced to run one after another, so 5 users need roughly 5 × (0.8-10 s). - TTS requests can overlap (max 4 at once) and start 5 per second, so they finish much faster. TLDR: throttle layer + a quick load test that shows how rate-limits shape throughput and latency.

1import asyncio, time, random, hashlib, collections
2from typing import Literal, NamedTuple, List
3import nest_asyncio, asyncio
4nest_asyncio.apply()
5
6# ------------------------------------------------------------------ #
7# ------------------------------------------------------------------ #
8
9class RateLimit(NamedTuple):
10    rps: int              
11    concurrency: int       
12
13class MockElevenLabs:
14    """
15    Extremely simplified model of the real rules:
16      • /voice/clone   : concurrency = 1, no explicit RPS cap
17      • /tts           : RPS and concurrency caps
18    """
19
20    CLONE_LIMIT   = RateLimit(rps=1, concurrency=1)  # 1 job at a time
21    TTS_LIMIT     = RateLimit(rps=5, concurrency=4)
22
23    def __init__(self):
24        # token bucket for RPS; one for each endpoint
25        self._buckets = {
26            "clone": collections.deque(),
27            "tts":   collections.deque(),
28        }
29        # in-flight counters
30        self._inflight = {"clone": 0, "tts": 0}
31
32    # ------------------------------------------------------------------ #
33    async def clone_voice(self, voice_name: str) -> str:
34        print("clone_voice", voice_name)
35        await self._acquire("clone")
36        try:
37            # simulation mode up to 10.2s
38            await asyncio.sleep(random.uniform(0.8, 10.2))
39            return f"voice_id::{hashlib.sha1(voice_name.encode()).hexdigest()[:8]}"
40        finally:
41            self._release("clone")
42
43    async def tts(self, voice_id: str, text: str) -> bytes:
44        print("tts", voice_id, text)
45        await self._acquire("tts")
46        try:
47            # simultaiton mode up to 0.4s
48            await asyncio.sleep(random.uniform(0.2, 0.4))
49            return f"AUDIO({voice_id[:4]}):{text[:10]}...".encode()
50        finally:
51            self._release("tts")
52
53    # ------------------------------------------------------------------ #
54    async def _acquire(self, endpoint: Literal["clone", "tts"]):
55        limit = self.CLONE_LIMIT if endpoint == "clone" else self.TTS_LIMIT
56        while True:
57            now = time.time()
58
59            # ---- enforce RPS ------------------------------------------------
60            bucket = self._buckets[endpoint]
61            while bucket and now - bucket[0] > 1:
62                bucket.popleft()
63            if len(bucket) < limit.rps:
64                bucket.append(now)
65            else:
66                await asyncio.sleep(0.05)
67                continue
68
69            if self._inflight[endpoint] < limit.concurrency:
70                self._inflight[endpoint] += 1
71                return
72            await asyncio.sleep(0.05)
73
74    def _release(self, endpoint: str):
75        self._inflight[endpoint] -= 1
76
77# ------------------------------------------------------------------ #
78# ------------------------------------------------------------------ #
79
80async def simulate_batch(
81    api: MockElevenLabs,
82    n_users: int,
83    mode: Literal["clone", "tts"],
84) -> float:
85    """
86    Spawn `n_users` concurrent requests of type `mode`.
87    Returns wall-clock duration.
88    """
89    start = time.perf_counter()
90
91    async def one_user(idx: int):
92        if mode == "clone":
93            await api.clone_voice(f"user{idx}")
94        else:
95            await api.tts("voice123", f"hello from user {idx}")
96
97    await asyncio.gather(*(one_user(i) for i in range(n_users)))
98    return time.perf_counter() - start
99
100
101# ------------------------------------------------------------------ #
102# ------------------------------------------------------------------ #
103
104def run_all():
105    api = MockElevenLabs()
106    for mode in ("clone", "tts"):
107        print(f"\n=== {mode.upper()} ===")
108        for n in (2, 5):
109            dur = asyncio.run(simulate_batch(api, n, mode))
110            print(f"{n:>2} concurrent → finished in {dur:5.2f} s")
111
112run_all()

Created on 6/30/2025