-
Notifications
You must be signed in to change notification settings - Fork 679
feat(SGLang): Add DP-aware routing and dp_rank propagation across Prefill/Decode #4221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
👋 Hi YAMY1234! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
WalkthroughThis PR implements data-parallel rank-aware routing in the Dynamo system by modifying the router to return 3-tuples with dp_rank information, registering prefill readiness with endpoints, exposing data_parallel_size in runtime configuration, and propagating dp_rank through request handlers (decode and prefill). New tests validate dp_rank coverage and distribution. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (1)
205-210: Consider using explicit variable initialization instead oflocals()check.The use of
'prefill_dp_rank' in locals()to check if the variable was assigned works, but is fragile and unusual. Consider initializingprefill_dp_rank = Nonebefore the router response handling (around line 135) and then setting it conditionally.Apply this diff to make the logic more explicit:
if ( self.prefill_router_client is not None and self.prefill_router_client.instance_ids() ): + prefill_dp_rank = None # Will be set by router if DP-aware token_ids = request["token_ids"] stream = await self.prefill_router_client.generate(token_ids) result = await anext(stream)Then simplify lines 205-210:
- # Use router-selected dp_rank (fallback to request-level if not provided) - if "prefill_dp_rank" in locals() and prefill_dp_rank is not None: - effective_dp_rank = prefill_dp_rank - elif data_parallel_rank is not None: - effective_dp_rank = data_parallel_rank - else: - effective_dp_rank = None + # Use router-selected dp_rank (fallback to request-level if not provided) + effective_dp_rank = prefill_dp_rank if prefill_dp_rank is not None else data_parallel_ranktests/router/test_dp_rank_routing.py (2)
101-143: Improve test hygiene with unused variable naming and exception logging.The test logic correctly validates dp_rank bounds and type. However, consider these minor improvements:
- Use underscore prefix for intentionally unused unpacked variables
- Use
logging.exceptioninstead oflogging.errorin exception handlers to include tracebackApply these diffs:
if hasattr(kv_push_router, "best_worker"): - worker_id, dp_rank, overlap = await kv_push_router.best_worker( + _worker_id, dp_rank, _overlap = await kv_push_router.best_worker( [1, 2, 3, 4, 5] )except Exception as e: - logger.error(f"Test failed: {e}") + logger.exception(f"Test failed: {e}") raise
146-197: Apply same test hygiene improvements as the first test.The coverage test correctly exercises the router with varied token sequences and validates distribution across DP ranks. The threshold of 2 ranks is conservative but reasonable given that routing depends on cache overlap patterns.
Apply the same improvements as in test_router_returns_valid_dp_rank:
for i in range(50): test_tokens = list(range(i * 7, i * 7 + 10)) - worker_id, dp_rank, overlap = await kv_push_router.best_worker( + _worker_id, dp_rank, _overlap = await kv_push_router.best_worker( test_tokens )except Exception as e: - logger.error(f"Test failed: {e}") + logger.exception(f"Test failed: {e}") raise
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
components/src/dynamo/router/__main__.py(1 hunks)components/src/dynamo/sglang/main.py(2 hunks)components/src/dynamo/sglang/register.py(1 hunks)components/src/dynamo/sglang/request_handlers/llm/decode_handler.py(4 hunks)components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py(1 hunks)tests/router/test_dp_rank_routing.py(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 2989
File: lib/llm/src/block_manager/distributed/transfer.rs:6-6
Timestamp: 2025-09-18T21:47:44.143Z
Learning: For PR ai-dynamo/dynamo#2989, the ConnectorTransferBatcher architectural issues will be addressed in a follow-up PR by removing the duplicate batching logic and integrating distributed transfers with the existing TransferBatcher + LocalTransferManager pipeline, rather than adding bounded concurrency primitives like Semaphore.
🧬 Code graph analysis (5)
components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py (2)
lib/llm/src/block_manager/kv_consolidator/subscriber.rs (1)
data_parallel_rank(40-42)components/src/dynamo/sglang/request_handlers/handler_base.py (1)
_get_input_param(70-87)
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (2)
lib/llm/src/block_manager/kv_consolidator/subscriber.rs (1)
data_parallel_rank(40-42)components/src/dynamo/sglang/protocol.py (1)
DisaggPreprocessedRequest(63-66)
components/src/dynamo/router/__main__.py (2)
lib/bindings/python/rust/llm/kv.rs (1)
best_worker(1186-1219)lib/bindings/python/src/dynamo/_core.pyi (1)
best_worker(1404-1426)
tests/router/test_dp_rank_routing.py (2)
lib/bindings/python/src/dynamo/_core.pyi (3)
DistributedRuntime(35-65)KvPushRouter(1342-1520)KvRouterConfig(1044-1046)tests/utils/managed_process.py (1)
ManagedProcess(71-568)
components/src/dynamo/sglang/main.py (2)
components/src/dynamo/sglang/register.py (1)
register_llm_with_readiness_gate(142-182)lib/bindings/python/src/dynamo/_core.pyi (2)
ModelInput(1023-1025)ModelType(1027-1034)
🪛 Ruff (0.14.4)
components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py
79-82: Avoid specifying long messages outside the exception class
(TRY003)
components/src/dynamo/router/__main__.py
133-133: Avoid specifying long messages outside the exception class
(TRY003)
tests/router/test_dp_rank_routing.py
28-28: Standard pseudo-random generators are not suitable for cryptographic purposes
(S311)
35-35: Do not catch blind exception: Exception
(BLE001)
127-127: Unpacked variable worker_id is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
127-127: Unpacked variable overlap is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
142-142: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
177-177: Unpacked variable worker_id is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
177-177: Unpacked variable overlap is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
196-196: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build and Test - dynamo
🔇 Additional comments (11)
components/src/dynamo/sglang/register.py (1)
97-102: LGTM! Clean exposure of data_parallel_size for DP-aware routing.The changes correctly extract dp_size from server_args with a sensible default of 1, assign it to runtime_config, and log when DP mode is active. This provides the necessary foundation for DP-aware routing decisions.
components/src/dynamo/sglang/main.py (1)
113-113: Minor comment cleanup.components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (3)
118-124: LGTM! Correctly preserves dp_rank=0 with explicit None check.The explicit check for the key's presence and non-None value ensures that dp_rank=0 is not misinterpreted as falsy. The fallback to dp_rank provides backward compatibility.
135-171: Well-designed DP-rank injection with backward compatibility.The implementation correctly:
- Handles both 3-tuple (new) and 2-tuple (legacy) router responses with appropriate fallback
- Logs DP routing status once to avoid log spam
- Injects dp_rank into the inner request after serialization to work around Pydantic's field validation
- Uses defensive type checking before dictionary access
225-242: LGTM! Consistent DP-rank propagation in aggregated mode.The aggregated mode correctly mirrors the decode path pattern by conditionally including data_parallel_rank in generate_kwargs when present.
components/src/dynamo/router/__main__.py (1)
126-138: LGTM! Clean delegation to best_worker for DP-aware routing.The changes correctly:
- Update the method to call
best_worker()which returns a 3-tuple including dp_rank- Update the docstring to reflect the new return signature
- Simplify error handling with direct RuntimeError raise
The decode handler (decode_handler.py lines 137-155) already handles backward compatibility by checking tuple length, so this breaking change is properly managed at the caller level.
components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py (2)
77-93: LGTM! Proper validation and DP-rank extraction for Prefill.The changes correctly:
- Validate the disaggregated request format with clear error messages
- Extract inner_request and sampling_params
- Preserve dp_rank=0 with explicit None checking, consistent with decode_handler.py
95-111: LGTM! Consistent DP-rank propagation to SGLang engine.The generate_kwargs construction mirrors the decode handler pattern and correctly includes data_parallel_rank when present, with appropriate logging for DP-aware generation.
tests/router/test_dp_rank_routing.py (3)
26-28: LGTM! Simple random suffix generator for test isolation.Note: Static analysis flagged S311 (non-cryptographic random), but this is acceptable for test namespace isolation where cryptographic security is not required.
31-43: LGTM! Robust runtime acquisition with fallback handling.The broad exception handling (flagged by static analysis) is appropriate here for test utilities that need to handle various runtime scenarios gracefully.
46-98: LGTM! Well-structured mocker process wrapper for DP testing.The MockerProcess class correctly:
- Configures the mocker with data-parallel-size for DP support
- Uses appropriate timeout and logging settings
- Implements context manager protocol for clean resource management
components/src/dynamo/sglang/main.py
Outdated
| # Register Prefill to expose dp_size to Router | ||
| await register_llm_with_readiness_gate( | ||
| engine, | ||
| generate_endpoint, | ||
| server_args, | ||
| dynamo_args, | ||
| input_type=ModelInput.Tokens, | ||
| output_type=ModelType.Chat | ModelType.Completions, | ||
| readiness_gate=None, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify registration failure handling in Prefill initialization.
The early registration exposes dp_size to the Router, which is essential for DP-aware routing. However, since this registration occurs before endpoint serving starts (line 174), any registration failure will prevent the Prefill worker from becoming available.
Run the following script to check if there are any error recovery mechanisms or if registration failures should be handled differently:
🏁 Script executed:
#!/bin/bash
# Check error handling patterns in other init functions
rg -A 5 "register_llm_with_readiness_gate" components/src/dynamo/sglang/main.pyLength of output: 1044
I need to gather more context to properly verify the concern. Let me check the surrounding code and error handling patterns.
---
🏁 Script executed:
```shell
#!/bin/bash
# Get context around lines 163-172 to see if there's error handling
sed -n '150,185p' components/src/dynamo/sglang/main.py
Length of output: 1075
🏁 Script executed:
#!/bin/bash
# Check the implementation of register_llm_with_readiness_gate
fd -t f "register.py" components/src/dynamo/sglang/ --exec cat {}Length of output: 7565
🏁 Script executed:
#!/bin/bash
# Look for try-catch or exception handling around registration calls
rg -B 10 -A 10 "await register_llm_with_readiness_gate" components/src/dynamo/sglang/main.py | head -40Length of output: 675
I need to check the outer context to see if there's higher-level error handling that catches exceptions from this registration call.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find the function containing the registration call (lines 163-172)
rg -B 30 "await register_llm_with_readiness_gate" components/src/dynamo/sglang/main.py | grep -E "^(async )?def |--" | head -20
Length of output: 68
🏁 Script executed:
#!/bin/bash
# Get full context - check if there's try-catch wrapping this function
rg "def.*register_prefill" components/src/dynamo/sglang/main.py -A 200 | grep -E "(try:|except|raise RuntimeError)" | head -20Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check how other registration calls handle failures (look at all call sites)
rg "register_llm_with_readiness_gate" components/src/dynamo/sglang/main.py -B 2 -A 8 | grep -E "(try:|except|readiness_gate)" Length of output: 437
---
🏁 Script executed:
```shell
#!/bin/bash
# Get the complete init_prefill function
sed -n '130,200p' components/src/dynamo/sglang/main.py
Length of output: 2169
🏁 Script executed:
#!/bin/bash
# Check if there's a wrapper around init_prefill that handles exceptions
rg "init_prefill\(" components/src/dynamo/sglang/main.py -B 5 -A 5Length of output: 732
🏁 Script executed:
#!/bin/bash
# Get all worker init functions to compare error handling patterns
rg "^async def init_" components/src/dynamo/sglang/main.pyLength of output: 473
---
🏁 Script executed:
```shell
#!/bin/bash
# Compare registration error handling across all init functions
for func in init_embedding init_multimodal_encode_worker init_multimodal_prefill_worker; do
echo "=== $func ==="
rg "$func" components/src/dynamo/sglang/main.py -A 100 | grep -B 5 -A 5 "register_llm_with_readiness_gate"
done
Length of output: 2138
🏁 Script executed:
#!/bin/bash
# Check if there's a top-level try-catch around all init calls
rg "if config.serving_mode" components/src/dynamo/sglang/main.py -B 10 -A 10Length of output: 2422
🏁 Script executed:
#!/bin/bash
# Check how the main entry point handles init function exceptions
rg "async def main" components/src/dynamo/sglang/main.py -A 30Length of output: 0
---
🏁 Script executed:
```shell
#!/bin/bash
# Get full init_embedding to see the pattern with tasks
sed -n '200,280p' components/src/dynamo/sglang/main.py
Length of output: 2717
🏁 Script executed:
#!/bin/bash
# Search for the main entry point or where these init functions are called
rg "def main|if __name__" components/src/dynamo/sglang/main.py -A 20Length of output: 76
🏁 Script executed:
#!/bin/bash
# Check if there's error handling around the caller (look for try/except at module level)
rg "(asyncio.run|try:|except)" components/src/dynamo/sglang/main.py | head -20Length of output: 407
Add error handling around Prefill registration to match other worker patterns.
The verification confirms that init_prefill lacks error handling for registration failures, while init_embedding and other workers properly wrap registration in try-catch blocks within asyncio.gather(). Registration failures will cause an uncaught RuntimeError that crashes the entire worker initialization.
Add try-catch around the registration call (lines 163-172) or refactor to use the concurrent pattern with asyncio.gather() and readiness_gate like init_embedding does (lines 220-237), allowing requests to queue until registration completes.
🤖 Prompt for AI Agents
In components/src/dynamo/sglang/main.py around lines 163 to 172, the Prefill
registration call is unprotected and can raise an uncaught RuntimeError; wrap
the await register_llm_with_readiness_gate(...) in a try/except that logs the
error and sets/uses a readiness_gate (or refactor to start registration inside
an asyncio.gather() task like init_embedding does) so failures do not crash
initialization and requests can queue until registration completes; ensure the
except block logs the exception and marks readiness_gate as failed/completed
consistent with other workers.
|
Hi @YAMY1234 thanks for the contribution. Could you please check the following
|
components/src/dynamo/sglang/main.py
Outdated
| health_check_payload = SglangPrefillHealthCheckPayload(engine).to_dict() | ||
|
|
||
| # Register Prefill to expose dp_size to Router | ||
| await register_llm_with_readiness_gate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably not register the prefill workers as llms for now, as there may be unintended consequences. For now, let's assume (and comment) that dp rank routing is not supporting for disagg. (WIP to enable integration of sglang with our new disagg frontend)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized the scope of this PR is to enable dp rank propagation for prefill, so please ignore my above comment. Can you briefly motivate why we would want decode and prefill to run on the same dp_rank?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the question!
In this PR currently, prefill and decode to run on the same dp_rank because SGLang shards the KV-cache and other DP-scoped state per rank. Keeping the request on the same rank: Matches SGLang’s PD design and recent fixes. After sglang #10169, the decode side explicitly consumes the prefill’s dp_rank (prefill_dp_rank) and targets that rank on the decode stage; this is the path SGLang supports for PD correctness.
So this does not “force” a specific DP size; it just propagates and honors the rank that the router picked for prefill so decode lands on the same rank—mirroring SGLang’s behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but I think this is a really good point to think whether this design makes sense and can truly benefit in Dynamo...
In Dynamo the prefill and decode workers are not necessarily on the same GPU. Same dp_rank doesn’t imply same device in Dynamo. If prefill and decode are on different GPUs/nodes, we still minimize shuffle cardinality by sending the request to the decode worker that “owns” shard k. When co-location is possible (e.g., same worker/device), this naturally degrades to zero cross-rank/device copy; otherwise the transport performs a single well-defined transfer to the right shard instead of an arbitrary one.
Overview:
Enable DP-aware routing across Dynamo ↔ SGLang so Prefill and Decode run on the same data_parallel_rank, improving cache locality and correctness in disaggregated mode.
Details:
Router
StandaloneRouterHandler.best_worker_id()now usesbest_worker()and returns (worker_id, dp_rank, overlap_blocks).Registration
data_parallel_sizein runtime config; log whendp_size > 1.Handlers
data_parallel_rank(preserving0), honor router-selecteddp_rank, and inject it into the disaggregated prefill request (post-serialization).data_parallel_rankfrom inner request and pass to engine.Init
dp_size.Tests
tests/router/test_dp_rank_routing.py: validates dp_rank range and basic coverage across ranks.Conducted sample verification on DP=4
Where should the reviewer start?
components/src/dynamo/router/__main__.pycomponents/src/dynamo/sglang/request_handlers/llm/decode_handler.pycomponents/src/dynamo/sglang/request_handlers/llm/prefill_handler.pycomponents/src/dynamo/sglang/register.py,components/src/dynamo/sglang/main.pytests/router/test_dp_rank_routing.pyRelated Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
New Features
Improvements
Tests