Streaming Drafts Implementation Plan
Overview
Add Telegram sendMessageDraft streaming support to Takopi bot, so users see Claude’s response text appearing in real-time (like ChatGPT) instead of waiting for the full answer at the end.
Architecture Analysis
Current Flow
- Claude subprocess runs with
--output-format stream-json ClaudeRunnerreads JSONL lines, translates toTakopiEventsStreamTextBlock.text(full text per content block) is captured instate.last_assistant_textbut NOT emitted as an eventProgressEditsperiodically edits the progress message with action list (tools used, elapsed time)- Only when
CompletedEventarrives does the final answer appear - Current runner does NOT use
--include-partial-messagesflag
Key Insight: StreamTextBlock vs text_delta
There are two types of text events in Claude Code stream-json:
StreamTextBlock(inStreamAssistantMessage): Contains full accumulated text for a content block. Only emitted when the assistant message is complete (between tool uses). NOT incremental.stream_eventwithtext_delta(requires--include-partial-messages): Contains token-by-token incremental deltas. This is what we need for real streaming.
Decision: We must add --include-partial-messages to get true token-by-token streaming. The existing StreamTextBlock only fires at message boundaries and would give a poor streaming experience.
sendMessageDraft API
Added in Bot API 9.3 (December 2025). The method allows bots to send draft message text that appears in the user’s chat as it’s being typed, similar to how ChatGPT shows streaming text.
Assumed parameters (based on Bot API patterns and available information):
chat_id(required): Target chattext(required): Current draft textreply_to_message_id(optional): Reply contextmessage_thread_id(optional): Forum topicentities(optional): Text formatting entitiesparse_mode(optional): Alternative to entities- Returns:
Trueon success (no message_id — drafts are ephemeral)
Important limitations:
- Drafts are ephemeral — they don’t create a message
- The final answer must still be sent as a regular
sendMessage - Drafts may not work in all chat types (groups may be limited)
- No known rate limit documentation, but likely more lenient than
editMessageText
Implementation Plan
Step 1: Add new event type TextDeltaEvent
File: /home/ubuntu/takopi/src/takopi/model.py
What: Add a new event type that carries incremental text deltas from the runner to the transport layer.
Changes:
# Add to TakopiEventType literal
type TakopiEventType = Literal[
"started",
"action",
"completed",
"text_delta", # NEW
]
# Add new event dataclass
@dataclass(frozen=True, slots=True)
class TextDeltaEvent:
type: Literal["text_delta"] = field(default="text_delta", init=False)
engine: EngineId
text: str # incremental delta text
accumulated: str # full accumulated text so far
# Update union type
type TakopiEvent = StartedEvent | ActionEvent | CompletedEvent | TextDeltaEventWhy: The domain model needs a way to carry streaming text from the runner layer through the event pipeline to the transport layer. Both the delta AND accumulated text are needed: delta for efficient diffing, accumulated for Telegram draft rendering.
Step 2: Add --include-partial-messages flag and emit TextDeltaEvent
File: /home/ubuntu/takopi/src/takopi/runners/claude.py
What:
- Add
--include-partial-messagesto Claude CLI args (conditionally, based on a newstream_textflag) - Handle
StreamEventMessagewithtext_deltaevents intranslate_claude_event() - Accumulate text in
ClaudeStreamStateand emitTextDeltaEvent
Changes to ClaudeStreamState:
@dataclass(slots=True)
class ClaudeStreamState:
factory: EventFactory = field(default_factory=lambda: EventFactory(ENGINE))
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
note_seq: int = 0
# NEW fields for streaming
accumulated_text: str = "" # running accumulation of text deltas
stream_text_enabled: bool = False # whether to emit TextDeltaEventChanges to _build_args:
def _build_args(self, prompt: str, resume: ResumeToken | None) -> list[str]:
args: list[str] = ["-p", "--output-format", "stream-json", "--verbose"]
if self.stream_text: # NEW
args.append("--include-partial-messages")
# ... rest unchangedChanges to translate_claude_event — add handling for StreamEventMessage:
case claude_schema.StreamEventMessage(event=event_data):
if not state.stream_text_enabled:
return []
delta = event_data.get("delta")
if not isinstance(delta, dict):
return []
if delta.get("type") != "text_delta":
return []
text_chunk = delta.get("text", "")
if not text_chunk:
return []
state.accumulated_text += text_chunk
state.last_assistant_text = state.accumulated_text
return [TextDeltaEvent(
engine=ENGINE,
text=text_chunk,
accumulated=state.accumulated_text,
)]Also: Reset accumulated_text when a new assistant turn begins (on StreamAssistantMessage with tool_use, to handle interleaved tool use + text).
Changes to ClaudeRunner:
@dataclass(slots=True)
class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
# ... existing fields ...
stream_text: bool = False # NEW: whether to enable text streaming
def new_state(self, prompt: str, resume: ResumeToken | None) -> ClaudeStreamState:
state = ClaudeStreamState()
state.stream_text_enabled = self.stream_text
return stateChanges to build_runner:
def build_runner(config: EngineConfig, _config_path: Path) -> Runner:
# ... existing code ...
stream_text = config.get("stream_text") is True # NEW
return ClaudeRunner(
# ... existing params ...
stream_text=stream_text,
)Why: This is the source of streaming text data. We need token-by-token deltas from Claude CLI, which requires --include-partial-messages. The accumulated text is tracked in state so each TextDeltaEvent carries both the delta and full text so far.
Step 3: Add send_message_draft to BotClient layers
File: /home/ubuntu/takopi/src/takopi/telegram/client_api.py
What: Add send_message_draft() method to both BotClient protocol and HttpBotClient implementation.
Changes to BotClient protocol:
async def send_message_draft(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
entities: list[dict] | None = None,
parse_mode: str | None = None,
) -> bool: ...Changes to HttpBotClient:
async def send_message_draft(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
entities: list[dict] | None = None,
parse_mode: str | None = None,
) -> bool:
params: dict[str, Any] = {"chat_id": chat_id, "text": text}
if reply_to_message_id is not None:
params["reply_to_message_id"] = reply_to_message_id
if message_thread_id is not None:
params["message_thread_id"] = message_thread_id
if entities is not None:
params["entities"] = entities
if parse_mode is not None:
params["parse_mode"] = parse_mode
result = await self._post("sendMessageDraft", params)
return bool(result)Why: Follows the existing pattern for every Telegram API method in the codebase. The BotClient protocol ensures testability.
Step 4: Add send_message_draft to TelegramClient with rate limiting
File: /home/ubuntu/takopi/src/takopi/telegram/client.py
What: Add send_message_draft() to TelegramClient with a dedicated outbox key pattern and a new priority level.
Changes to outbox.py — add new priority:
SEND_PRIORITY = 0
DELETE_PRIORITY = 1
EDIT_PRIORITY = 2
DRAFT_PRIORITY = 3 # NEW: lowest priority, can be dropped freelyChanges to TelegramClient:
async def send_message_draft(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
wait: bool = False,
) -> bool:
async def execute() -> bool:
return await self._client.send_message_draft(
chat_id=chat_id,
text=text,
reply_to_message_id=reply_to_message_id,
message_thread_id=message_thread_id,
entities=entities,
parse_mode=parse_mode,
)
# Key-based dedup: only latest draft per chat matters
result = await self.enqueue_op(
key=("draft", chat_id),
label="send_message_draft",
execute=execute,
priority=DRAFT_PRIORITY,
chat_id=chat_id,
wait=wait,
)
return bool(result)
async def drop_pending_drafts(self, *, chat_id: int) -> None:
await self._outbox.drop_pending(key=("draft", chat_id))Why: Key-based deduplication with key ("draft", chat_id) means only the LATEST draft text survives in the outbox queue. If Claude generates 10 tokens before the outbox sends, it sends only the latest accumulated text, not 10 separate API calls. This is the core rate-limiting strategy. The DRAFT_PRIORITY ensures drafts never block sends, deletes, or edits.
Step 5: Add streaming_drafts config flag
File: /home/ubuntu/takopi/src/takopi/settings.py
What: Add streaming_drafts: bool = False to TelegramTransportSettings.
Changes:
class TelegramTransportSettings(BaseModel):
# ... existing fields ...
streaming_drafts: bool = False # NEW: enable sendMessageDraft streamingAlso update TelegramBridgeConfig in bridge.py:
@dataclass(frozen=True, slots=True)
class TelegramBridgeConfig:
# ... existing fields ...
streaming_drafts: bool = False # NEWAnd propagate in loop.py where TelegramBridgeConfig is constructed (find the constructor call and pass through the config value).
And pass stream_text=True to the Claude runner config when streaming_drafts is enabled. This needs to happen in the engine config layer — find where build_runner is called and ensure the engine config includes stream_text: true when streaming_drafts is enabled. The cleanest approach: when streaming_drafts is true, inject stream_text: true into the engine config dict before building the runner.
Config example (~/.takopi/takopi.toml):
[transports.telegram]
bot_token = "..."
chat_id = 123
streaming_drafts = false # set to true to enableWhy: Feature flag allows toggling without code changes. Hot-reload compatible since config changes are detected by config_watch.py. Default false is non-destructive.
Step 6: Add StreamingDraftEdits class
File: /home/ubuntu/takopi/src/takopi/runner_bridge.py
What: Add a new class StreamingDraftEdits that runs alongside ProgressEdits, listening for TextDeltaEvents and sending draft updates to Telegram.
class StreamingDraftEdits:
"""Sends sendMessageDraft updates as Claude generates text."""
def __init__(
self,
*,
bot: BotClient, # TelegramClient with send_message_draft
chat_id: int,
reply_to_message_id: int | None,
message_thread_id: int | None,
presenter: Presenter,
tracker: ProgressTracker,
started_at: float,
clock: Callable[[], float],
min_draft_interval_s: float = 0.4, # minimum time between drafts
min_draft_chars: int = 20, # don't send tiny drafts
) -> None:
self.bot = bot
self.chat_id = chat_id
self.reply_to_message_id = reply_to_message_id
self.message_thread_id = message_thread_id
self.presenter = presenter
self.tracker = tracker
self.started_at = started_at
self.clock = clock
self.min_draft_interval_s = min_draft_interval_s
self.min_draft_chars = min_draft_chars
self._accumulated_text: str = ""
self._last_sent_text: str = ""
self._last_sent_at: float = 0.0
self._draft_active: bool = False
self._event_seq: int = 0
self._rendered_seq: int = 0
self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1)
async def run(self) -> None:
while True:
# Wait for new text
while self._rendered_seq == self._event_seq:
try:
await self.signal_recv.receive()
except anyio.EndOfStream:
return
seq_at_render = self._event_seq
text = self._accumulated_text
# Skip if text too short or same as last sent
if len(text) < self.min_draft_chars or text == self._last_sent_text:
self._rendered_seq = seq_at_render
continue
# Throttle: wait if too soon since last draft
now = self.clock()
since_last = now - self._last_sent_at
if since_last < self.min_draft_interval_s:
await anyio.sleep(self.min_draft_interval_s - since_last)
# Render text with progress header for context
# Use the accumulated text directly -- Telegram will show it as draft
draft_text = self._render_draft_text(text)
try:
await self.bot.send_message_draft(
chat_id=self.chat_id,
text=draft_text,
reply_to_message_id=self.reply_to_message_id,
message_thread_id=self.message_thread_id,
)
self._last_sent_text = text
self._last_sent_at = self.clock()
self._draft_active = True
except Exception:
# Draft sending failed -- log but continue
# Fallback: the progress edits still work
logger.debug(
"streaming_draft.send_failed",
chat_id=self.chat_id,
text_len=len(text),
)
self._rendered_seq = seq_at_render
def _render_draft_text(self, text: str) -> str:
"""Render draft text. Could add header/formatting later."""
# For now, just send the raw text.
# Telegram drafts likely don't support entities,
# so we may need plain text only.
return text
def on_text_delta(self, event: TextDeltaEvent) -> None:
"""Called when a TextDeltaEvent arrives."""
self._accumulated_text = event.accumulated
self._event_seq += 1
try:
self.signal_send.send_nowait(None)
except (anyio.WouldBlock, anyio.BrokenResourceError, anyio.ClosedResourceError):
pass
@property
def draft_active(self) -> bool:
"""Whether any draft has been sent (for cleanup)."""
return self._draft_activeWhy: Mirrors the ProgressEdits pattern (signal-based debouncing, seq comparison). The key design decisions:
- Separate from ProgressEdits: Drafts and progress edits serve different purposes and have different rate limits
- Key-based outbox dedup handles rate limiting at the transport layer
- min_draft_interval_s provides application-level throttling
- min_draft_chars avoids sending tiny 1-2 character drafts
- Graceful failure: If sendMessageDraft fails, progress edits still work
Step 7: Integrate StreamingDraftEdits into handle_message
File: /home/ubuntu/takopi/src/takopi/runner_bridge.py
What: Modify handle_message() to create and run StreamingDraftEdits when streaming is enabled. Wire TextDeltaEvent through both the existing ProgressEdits.on_event() and the new StreamingDraftEdits.on_text_delta().
Key changes to ProgressEdits.on_event:
async def on_event(self, evt: TakopiEvent) -> None:
# TextDeltaEvent should not trigger progress re-renders
if isinstance(evt, TextDeltaEvent):
return
if not self.tracker.note_event(evt):
return
# ... rest unchangedKey changes to run_runner_with_cancel:
async def run_runner_with_cancel(
runner: Runner,
*,
prompt: str,
resume_token: ResumeToken | None,
edits: ProgressEdits,
draft_edits: StreamingDraftEdits | None, # NEW parameter
running_task: RunningTask | None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None,
) -> RunOutcome:
outcome = RunOutcome()
async with anyio.create_task_group() as tg:
async def run_runner() -> None:
try:
async for evt in runner.run(prompt, resume_token):
_log_runner_event(evt)
if isinstance(evt, StartedEvent):
# ... existing logic ...
elif isinstance(evt, CompletedEvent):
outcome.resume = evt.resume or outcome.resume
outcome.completed = evt
elif isinstance(evt, TextDeltaEvent) and draft_edits is not None:
draft_edits.on_text_delta(evt) # NEW
await edits.on_event(evt)
finally:
tg.cancel_scope.cancel()
# ... rest unchangedKey changes to handle_message:
async def handle_message(
cfg: ExecBridgeConfig,
*,
runner: Runner,
incoming: IncomingMessage,
# ... existing params ...
streaming_drafts: bool = False, # NEW parameter
telegram_bot: Any = None, # NEW: TelegramClient for draft sending
) -> None:
# ... existing setup ...
# Create StreamingDraftEdits if enabled
draft_edits: StreamingDraftEdits | None = None
if streaming_drafts and telegram_bot is not None:
draft_edits = StreamingDraftEdits(
bot=telegram_bot,
chat_id=cast(int, incoming.channel_id),
reply_to_message_id=cast(int, incoming.message_id),
message_thread_id=cast(int | None, incoming.thread_id),
presenter=cfg.presenter,
tracker=progress_tracker,
started_at=started_at,
clock=clock,
)
draft_scope = anyio.CancelScope()
async def run_drafts() -> None:
if draft_edits is None:
return
try:
with draft_scope:
await draft_edits.run()
except cancel_exc_type:
return
async with anyio.create_task_group() as tg:
if progress_ref is not None:
tg.start_soon(run_edits)
if draft_edits is not None:
tg.start_soon(run_drafts) # NEW
try:
outcome = await run_runner_with_cancel(
runner,
prompt=runner_text,
resume_token=resume_token,
edits=edits,
draft_edits=draft_edits, # NEW
running_task=running_task,
on_thread_known=on_thread_known,
)
except Exception as exc:
error = exc
finally:
# ... existing cleanup ...
draft_scope.cancel() # NEW: stop draft sending
# After completion, drop any pending drafts
if draft_edits is not None and draft_edits.draft_active:
try:
await telegram_bot.drop_pending_drafts(
chat_id=cast(int, incoming.channel_id)
)
except Exception:
pass
# ... rest of handle_message unchanged (sends final message as before)Why: The integration follows the exact pattern of ProgressEdits — a background task with its own cancel scope that runs alongside the runner. TextDeltaEvents are routed to StreamingDraftEdits while ProgressEdits ignores them. The final message is still sent via the existing path.
Step 8: Thread config through the call chain
Files:
/home/ubuntu/takopi/src/takopi/telegram/loop.py(passstreaming_draftsandtelegram_bot)/home/ubuntu/takopi/src/takopi/telegram/commands/handlers.py(findrun_engineand thread through)
What: The streaming_drafts flag and telegram_bot reference need to flow from TelegramBridgeConfig down to handle_message(). Trace the call chain:
run_main_loop → run_job → run_engine → handle_message
The ExecBridgeConfig already carries transport and presenter. We need to add:
streaming_drafts: booltoExecBridgeConfig- Access to the
TelegramClient(thebotfromTelegramBridgeConfig) forsend_message_draft
The TelegramClient is cfg.bot in TelegramBridgeConfig. It needs to be passed to handle_message via ExecBridgeConfig or as a separate parameter.
Cleanest approach: Add streaming_drafts: bool = False and draft_bot: Any | None = None to ExecBridgeConfig:
@dataclass(frozen=True, slots=True)
class ExecBridgeConfig:
transport: Transport
presenter: Presenter
final_notify: bool
streaming_drafts: bool = False # NEW
draft_bot: Any = None # NEW: TelegramClient for draftsThen in the Telegram bridge setup code where ExecBridgeConfig is constructed, pass through the values.
Also ensure stream_text=True is injected into the Claude runner config when streaming_drafts is enabled. Find where engine config is assembled and add the injection.
Step 9: Handle edge cases
Interleaved tool use + text: When Claude uses a tool between text blocks, the accumulated text resets. In translate_claude_event, when a StreamAssistantMessage with StreamToolUseBlock is seen, reset state.accumulated_text = "". The next text block starts fresh.
Empty text: TextDeltaEvent is only emitted for non-empty deltas (already checked).
Very long text: Telegram messages have a 4096 character limit. For drafts, truncate at ~4000 characters and append ”…” to indicate continuation. The final message uses the existing split/trim logic.
Errors mid-stream: If the runner errors, the draft scope is cancelled. Any pending drafts are dropped. The error message is rendered via the existing path.
Cancellation: When cancel_requested fires, drafts stop. The cancelled progress message is shown as before.
Graceful fallback: If sendMessageDraft returns an error (API not supported, wrong chat type), the first failure is logged and StreamingDraftEdits can set an internal _disabled flag to stop trying. Progress edits continue working. The user experience degrades gracefully to the current behavior.
# In StreamingDraftEdits.run():
except Exception as exc:
self._failure_count += 1
if self._failure_count >= 3:
logger.warning(
"streaming_draft.disabled",
chat_id=self.chat_id,
failures=self._failure_count,
)
return # Stop tryingArchitecture Decision: Drafts vs Progress Message
Decision: Drafts and progress message coexist. They serve different purposes:
- Progress message (editMessageText): Shows actions list, elapsed time, cancel button. Keeps working as before.
- Drafts (sendMessageDraft): Shows the actual response text appearing in real-time. Ephemeral, no message to manage.
When streaming completes:
- Drafts stop automatically (scope cancelled, pending drafts dropped)
- Final message is sent as before (sendMessage or editMessageText on the progress message)
- The draft “bubble” in Telegram disappears when the real message arrives
This means the user sees:
- Progress message updating with actions (“reading file…”, “editing code…“)
- Draft text appearing in the chat simultaneously with the partial answer
- Final message replacing the progress message with the complete answer
Testing Strategy
- Unit test
TextDeltaEvent: Verify it’s correctly constructed and serialized - Unit test
translate_claude_event: MockStreamEventMessagewithtext_delta, verifyTextDeltaEventis emitted with correct accumulated text - Unit test
StreamingDraftEdits: Mock bot client, verify debouncing, verify draft text, verify graceful failure - Unit test config: Verify
streaming_draftsfield inTelegramTransportSettings - Integration test: Run with
streaming_drafts = trueagainst a test chat, verify drafts appear and final message is correct
File Summary
| File | Change |
|---|---|
model.py | Add TextDeltaEvent dataclass and update union type |
runners/claude.py | Add --include-partial-messages, handle StreamEventMessage, emit TextDeltaEvent |
schemas/claude.py | No changes needed (StreamEventMessage already defined) |
telegram/client_api.py | Add send_message_draft to BotClient + HttpBotClient |
telegram/client.py | Add send_message_draft with outbox dedup |
telegram/outbox.py | Add DRAFT_PRIORITY = 3 |
settings.py | Add streaming_drafts: bool = False |
telegram/bridge.py | Add streaming_drafts to TelegramBridgeConfig |
runner_bridge.py | Add StreamingDraftEdits class, integrate into handle_message and run_runner_with_cancel |
telegram/loop.py | Thread config through to handle_message |
events.py | No changes (TextDeltaEvent doesn’t use EventFactory) |
progress.py | No changes (TextDeltaEvent is ignored by ProgressTracker) |
presenter.py | No changes |
Implementation Order
model.py— addTextDeltaEvent(no dependencies)schemas/claude.py— verifyStreamEventMessagestructure (no changes needed)runners/claude.py— add--include-partial-messages, handle events, emitTextDeltaEventtelegram/outbox.py— addDRAFT_PRIORITYtelegram/client_api.py— addsend_message_draftmethodtelegram/client.py— addsend_message_draftwith outboxsettings.py— addstreaming_draftsconfig fieldtelegram/bridge.py— add toTelegramBridgeConfigrunner_bridge.py— addStreamingDraftEdits, integrate intohandle_messagetelegram/loop.py+ handlers — thread config through call chain
Risk Mitigation
- Feature flag (
streaming_drafts = falsedefault): Zero risk to existing behavior - Graceful fallback: If API fails, automatic disable after 3 failures
- No existing code deleted: All changes are additive
- Outbox dedup: Only latest draft survives, preventing rate limit issues
- Independent of progress edits: Both systems work independently
Open Questions
- Exact
sendMessageDraftparameters: The official API documentation was not fully accessible during research. The implementation assumes standard Telegram patterns. May need adjustment once exact API docs are consulted. - Draft rate limits: Unknown. The outbox dedup + min_draft_interval_s should be sufficient, but may need tuning.
- Draft support in groups: May only work in private chats. The graceful fallback handles this.
- Entities support in drafts: Unknown if drafts support formatting entities. Initial implementation may use plain text only.