Streaming Drafts Implementation Plan

Overview

Add Telegram sendMessageDraft streaming support to Takopi bot, so users see Claude’s response text appearing in real-time (like ChatGPT) instead of waiting for the full answer at the end.

Architecture Analysis

Current Flow

  1. Claude subprocess runs with --output-format stream-json
  2. ClaudeRunner reads JSONL lines, translates to TakopiEvents
  3. StreamTextBlock.text (full text per content block) is captured in state.last_assistant_text but NOT emitted as an event
  4. ProgressEdits periodically edits the progress message with action list (tools used, elapsed time)
  5. Only when CompletedEvent arrives does the final answer appear
  6. Current runner does NOT use --include-partial-messages flag

Key Insight: StreamTextBlock vs text_delta

There are two types of text events in Claude Code stream-json:

  • StreamTextBlock (in StreamAssistantMessage): Contains full accumulated text for a content block. Only emitted when the assistant message is complete (between tool uses). NOT incremental.
  • stream_event with text_delta (requires --include-partial-messages): Contains token-by-token incremental deltas. This is what we need for real streaming.

Decision: We must add --include-partial-messages to get true token-by-token streaming. The existing StreamTextBlock only fires at message boundaries and would give a poor streaming experience.

sendMessageDraft API

Added in Bot API 9.3 (December 2025). The method allows bots to send draft message text that appears in the user’s chat as it’s being typed, similar to how ChatGPT shows streaming text.

Assumed parameters (based on Bot API patterns and available information):

  • chat_id (required): Target chat
  • text (required): Current draft text
  • reply_to_message_id (optional): Reply context
  • message_thread_id (optional): Forum topic
  • entities (optional): Text formatting entities
  • parse_mode (optional): Alternative to entities
  • Returns: True on success (no message_id — drafts are ephemeral)

Important limitations:

  • Drafts are ephemeral — they don’t create a message
  • The final answer must still be sent as a regular sendMessage
  • Drafts may not work in all chat types (groups may be limited)
  • No known rate limit documentation, but likely more lenient than editMessageText

Implementation Plan

Step 1: Add new event type TextDeltaEvent

File: /home/ubuntu/takopi/src/takopi/model.py

What: Add a new event type that carries incremental text deltas from the runner to the transport layer.

Changes:

# Add to TakopiEventType literal
type TakopiEventType = Literal[
    "started",
    "action",
    "completed",
    "text_delta",  # NEW
]
 
# Add new event dataclass
@dataclass(frozen=True, slots=True)
class TextDeltaEvent:
    type: Literal["text_delta"] = field(default="text_delta", init=False)
    engine: EngineId
    text: str           # incremental delta text
    accumulated: str    # full accumulated text so far
 
# Update union type
type TakopiEvent = StartedEvent | ActionEvent | CompletedEvent | TextDeltaEvent

Why: The domain model needs a way to carry streaming text from the runner layer through the event pipeline to the transport layer. Both the delta AND accumulated text are needed: delta for efficient diffing, accumulated for Telegram draft rendering.


Step 2: Add --include-partial-messages flag and emit TextDeltaEvent

File: /home/ubuntu/takopi/src/takopi/runners/claude.py

What:

  1. Add --include-partial-messages to Claude CLI args (conditionally, based on a new stream_text flag)
  2. Handle StreamEventMessage with text_delta events in translate_claude_event()
  3. Accumulate text in ClaudeStreamState and emit TextDeltaEvent

Changes to ClaudeStreamState:

@dataclass(slots=True)
class ClaudeStreamState:
    factory: EventFactory = field(default_factory=lambda: EventFactory(ENGINE))
    pending_actions: dict[str, Action] = field(default_factory=dict)
    last_assistant_text: str | None = None
    note_seq: int = 0
    # NEW fields for streaming
    accumulated_text: str = ""        # running accumulation of text deltas
    stream_text_enabled: bool = False # whether to emit TextDeltaEvent

Changes to _build_args:

def _build_args(self, prompt: str, resume: ResumeToken | None) -> list[str]:
    args: list[str] = ["-p", "--output-format", "stream-json", "--verbose"]
    if self.stream_text:  # NEW
        args.append("--include-partial-messages")
    # ... rest unchanged

Changes to translate_claude_event — add handling for StreamEventMessage:

case claude_schema.StreamEventMessage(event=event_data):
    if not state.stream_text_enabled:
        return []
    delta = event_data.get("delta")
    if not isinstance(delta, dict):
        return []
    if delta.get("type") != "text_delta":
        return []
    text_chunk = delta.get("text", "")
    if not text_chunk:
        return []
    state.accumulated_text += text_chunk
    state.last_assistant_text = state.accumulated_text
    return [TextDeltaEvent(
        engine=ENGINE,
        text=text_chunk,
        accumulated=state.accumulated_text,
    )]

Also: Reset accumulated_text when a new assistant turn begins (on StreamAssistantMessage with tool_use, to handle interleaved tool use + text).

Changes to ClaudeRunner:

@dataclass(slots=True)
class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
    # ... existing fields ...
    stream_text: bool = False  # NEW: whether to enable text streaming
 
    def new_state(self, prompt: str, resume: ResumeToken | None) -> ClaudeStreamState:
        state = ClaudeStreamState()
        state.stream_text_enabled = self.stream_text
        return state

Changes to build_runner:

def build_runner(config: EngineConfig, _config_path: Path) -> Runner:
    # ... existing code ...
    stream_text = config.get("stream_text") is True  # NEW
    return ClaudeRunner(
        # ... existing params ...
        stream_text=stream_text,
    )

Why: This is the source of streaming text data. We need token-by-token deltas from Claude CLI, which requires --include-partial-messages. The accumulated text is tracked in state so each TextDeltaEvent carries both the delta and full text so far.


Step 3: Add send_message_draft to BotClient layers

File: /home/ubuntu/takopi/src/takopi/telegram/client_api.py

What: Add send_message_draft() method to both BotClient protocol and HttpBotClient implementation.

Changes to BotClient protocol:

async def send_message_draft(
    self,
    chat_id: int,
    text: str,
    reply_to_message_id: int | None = None,
    message_thread_id: int | None = None,
    entities: list[dict] | None = None,
    parse_mode: str | None = None,
) -> bool: ...

Changes to HttpBotClient:

async def send_message_draft(
    self,
    chat_id: int,
    text: str,
    reply_to_message_id: int | None = None,
    message_thread_id: int | None = None,
    entities: list[dict] | None = None,
    parse_mode: str | None = None,
) -> bool:
    params: dict[str, Any] = {"chat_id": chat_id, "text": text}
    if reply_to_message_id is not None:
        params["reply_to_message_id"] = reply_to_message_id
    if message_thread_id is not None:
        params["message_thread_id"] = message_thread_id
    if entities is not None:
        params["entities"] = entities
    if parse_mode is not None:
        params["parse_mode"] = parse_mode
    result = await self._post("sendMessageDraft", params)
    return bool(result)

Why: Follows the existing pattern for every Telegram API method in the codebase. The BotClient protocol ensures testability.


Step 4: Add send_message_draft to TelegramClient with rate limiting

File: /home/ubuntu/takopi/src/takopi/telegram/client.py

What: Add send_message_draft() to TelegramClient with a dedicated outbox key pattern and a new priority level.

Changes to outbox.py — add new priority:

SEND_PRIORITY = 0
DELETE_PRIORITY = 1
EDIT_PRIORITY = 2
DRAFT_PRIORITY = 3  # NEW: lowest priority, can be dropped freely

Changes to TelegramClient:

async def send_message_draft(
    self,
    chat_id: int,
    text: str,
    reply_to_message_id: int | None = None,
    message_thread_id: int | None = None,
    entities: list[dict] | None = None,
    parse_mode: str | None = None,
    *,
    wait: bool = False,
) -> bool:
    async def execute() -> bool:
        return await self._client.send_message_draft(
            chat_id=chat_id,
            text=text,
            reply_to_message_id=reply_to_message_id,
            message_thread_id=message_thread_id,
            entities=entities,
            parse_mode=parse_mode,
        )
 
    # Key-based dedup: only latest draft per chat matters
    result = await self.enqueue_op(
        key=("draft", chat_id),
        label="send_message_draft",
        execute=execute,
        priority=DRAFT_PRIORITY,
        chat_id=chat_id,
        wait=wait,
    )
    return bool(result)
 
async def drop_pending_drafts(self, *, chat_id: int) -> None:
    await self._outbox.drop_pending(key=("draft", chat_id))

Why: Key-based deduplication with key ("draft", chat_id) means only the LATEST draft text survives in the outbox queue. If Claude generates 10 tokens before the outbox sends, it sends only the latest accumulated text, not 10 separate API calls. This is the core rate-limiting strategy. The DRAFT_PRIORITY ensures drafts never block sends, deletes, or edits.


Step 5: Add streaming_drafts config flag

File: /home/ubuntu/takopi/src/takopi/settings.py

What: Add streaming_drafts: bool = False to TelegramTransportSettings.

Changes:

class TelegramTransportSettings(BaseModel):
    # ... existing fields ...
    streaming_drafts: bool = False  # NEW: enable sendMessageDraft streaming

Also update TelegramBridgeConfig in bridge.py:

@dataclass(frozen=True, slots=True)
class TelegramBridgeConfig:
    # ... existing fields ...
    streaming_drafts: bool = False  # NEW

And propagate in loop.py where TelegramBridgeConfig is constructed (find the constructor call and pass through the config value).

And pass stream_text=True to the Claude runner config when streaming_drafts is enabled. This needs to happen in the engine config layer — find where build_runner is called and ensure the engine config includes stream_text: true when streaming_drafts is enabled. The cleanest approach: when streaming_drafts is true, inject stream_text: true into the engine config dict before building the runner.

Config example (~/.takopi/takopi.toml):

[transports.telegram]
bot_token = "..."
chat_id = 123
streaming_drafts = false  # set to true to enable

Why: Feature flag allows toggling without code changes. Hot-reload compatible since config changes are detected by config_watch.py. Default false is non-destructive.


Step 6: Add StreamingDraftEdits class

File: /home/ubuntu/takopi/src/takopi/runner_bridge.py

What: Add a new class StreamingDraftEdits that runs alongside ProgressEdits, listening for TextDeltaEvents and sending draft updates to Telegram.

class StreamingDraftEdits:
    """Sends sendMessageDraft updates as Claude generates text."""
 
    def __init__(
        self,
        *,
        bot: BotClient,  # TelegramClient with send_message_draft
        chat_id: int,
        reply_to_message_id: int | None,
        message_thread_id: int | None,
        presenter: Presenter,
        tracker: ProgressTracker,
        started_at: float,
        clock: Callable[[], float],
        min_draft_interval_s: float = 0.4,  # minimum time between drafts
        min_draft_chars: int = 20,           # don't send tiny drafts
    ) -> None:
        self.bot = bot
        self.chat_id = chat_id
        self.reply_to_message_id = reply_to_message_id
        self.message_thread_id = message_thread_id
        self.presenter = presenter
        self.tracker = tracker
        self.started_at = started_at
        self.clock = clock
        self.min_draft_interval_s = min_draft_interval_s
        self.min_draft_chars = min_draft_chars
 
        self._accumulated_text: str = ""
        self._last_sent_text: str = ""
        self._last_sent_at: float = 0.0
        self._draft_active: bool = False
        self._event_seq: int = 0
        self._rendered_seq: int = 0
        self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1)
 
    async def run(self) -> None:
        while True:
            # Wait for new text
            while self._rendered_seq == self._event_seq:
                try:
                    await self.signal_recv.receive()
                except anyio.EndOfStream:
                    return
 
            seq_at_render = self._event_seq
            text = self._accumulated_text
 
            # Skip if text too short or same as last sent
            if len(text) < self.min_draft_chars or text == self._last_sent_text:
                self._rendered_seq = seq_at_render
                continue
 
            # Throttle: wait if too soon since last draft
            now = self.clock()
            since_last = now - self._last_sent_at
            if since_last < self.min_draft_interval_s:
                await anyio.sleep(self.min_draft_interval_s - since_last)
 
            # Render text with progress header for context
            # Use the accumulated text directly -- Telegram will show it as draft
            draft_text = self._render_draft_text(text)
 
            try:
                await self.bot.send_message_draft(
                    chat_id=self.chat_id,
                    text=draft_text,
                    reply_to_message_id=self.reply_to_message_id,
                    message_thread_id=self.message_thread_id,
                )
                self._last_sent_text = text
                self._last_sent_at = self.clock()
                self._draft_active = True
            except Exception:
                # Draft sending failed -- log but continue
                # Fallback: the progress edits still work
                logger.debug(
                    "streaming_draft.send_failed",
                    chat_id=self.chat_id,
                    text_len=len(text),
                )
 
            self._rendered_seq = seq_at_render
 
    def _render_draft_text(self, text: str) -> str:
        """Render draft text. Could add header/formatting later."""
        # For now, just send the raw text.
        # Telegram drafts likely don't support entities,
        # so we may need plain text only.
        return text
 
    def on_text_delta(self, event: TextDeltaEvent) -> None:
        """Called when a TextDeltaEvent arrives."""
        self._accumulated_text = event.accumulated
        self._event_seq += 1
        try:
            self.signal_send.send_nowait(None)
        except (anyio.WouldBlock, anyio.BrokenResourceError, anyio.ClosedResourceError):
            pass
 
    @property
    def draft_active(self) -> bool:
        """Whether any draft has been sent (for cleanup)."""
        return self._draft_active

Why: Mirrors the ProgressEdits pattern (signal-based debouncing, seq comparison). The key design decisions:

  • Separate from ProgressEdits: Drafts and progress edits serve different purposes and have different rate limits
  • Key-based outbox dedup handles rate limiting at the transport layer
  • min_draft_interval_s provides application-level throttling
  • min_draft_chars avoids sending tiny 1-2 character drafts
  • Graceful failure: If sendMessageDraft fails, progress edits still work

Step 7: Integrate StreamingDraftEdits into handle_message

File: /home/ubuntu/takopi/src/takopi/runner_bridge.py

What: Modify handle_message() to create and run StreamingDraftEdits when streaming is enabled. Wire TextDeltaEvent through both the existing ProgressEdits.on_event() and the new StreamingDraftEdits.on_text_delta().

Key changes to ProgressEdits.on_event:

async def on_event(self, evt: TakopiEvent) -> None:
    # TextDeltaEvent should not trigger progress re-renders
    if isinstance(evt, TextDeltaEvent):
        return
    if not self.tracker.note_event(evt):
        return
    # ... rest unchanged

Key changes to run_runner_with_cancel:

async def run_runner_with_cancel(
    runner: Runner,
    *,
    prompt: str,
    resume_token: ResumeToken | None,
    edits: ProgressEdits,
    draft_edits: StreamingDraftEdits | None,  # NEW parameter
    running_task: RunningTask | None,
    on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None,
) -> RunOutcome:
    outcome = RunOutcome()
    async with anyio.create_task_group() as tg:
        async def run_runner() -> None:
            try:
                async for evt in runner.run(prompt, resume_token):
                    _log_runner_event(evt)
                    if isinstance(evt, StartedEvent):
                        # ... existing logic ...
                    elif isinstance(evt, CompletedEvent):
                        outcome.resume = evt.resume or outcome.resume
                        outcome.completed = evt
                    elif isinstance(evt, TextDeltaEvent) and draft_edits is not None:
                        draft_edits.on_text_delta(evt)  # NEW
                    await edits.on_event(evt)
            finally:
                tg.cancel_scope.cancel()
        # ... rest unchanged

Key changes to handle_message:

async def handle_message(
    cfg: ExecBridgeConfig,
    *,
    runner: Runner,
    incoming: IncomingMessage,
    # ... existing params ...
    streaming_drafts: bool = False,  # NEW parameter
    telegram_bot: Any = None,       # NEW: TelegramClient for draft sending
) -> None:
    # ... existing setup ...
 
    # Create StreamingDraftEdits if enabled
    draft_edits: StreamingDraftEdits | None = None
    if streaming_drafts and telegram_bot is not None:
        draft_edits = StreamingDraftEdits(
            bot=telegram_bot,
            chat_id=cast(int, incoming.channel_id),
            reply_to_message_id=cast(int, incoming.message_id),
            message_thread_id=cast(int | None, incoming.thread_id),
            presenter=cfg.presenter,
            tracker=progress_tracker,
            started_at=started_at,
            clock=clock,
        )
 
    draft_scope = anyio.CancelScope()
 
    async def run_drafts() -> None:
        if draft_edits is None:
            return
        try:
            with draft_scope:
                await draft_edits.run()
        except cancel_exc_type:
            return
 
    async with anyio.create_task_group() as tg:
        if progress_ref is not None:
            tg.start_soon(run_edits)
        if draft_edits is not None:
            tg.start_soon(run_drafts)  # NEW
 
        try:
            outcome = await run_runner_with_cancel(
                runner,
                prompt=runner_text,
                resume_token=resume_token,
                edits=edits,
                draft_edits=draft_edits,  # NEW
                running_task=running_task,
                on_thread_known=on_thread_known,
            )
        except Exception as exc:
            error = exc
        finally:
            # ... existing cleanup ...
            draft_scope.cancel()  # NEW: stop draft sending
 
    # After completion, drop any pending drafts
    if draft_edits is not None and draft_edits.draft_active:
        try:
            await telegram_bot.drop_pending_drafts(
                chat_id=cast(int, incoming.channel_id)
            )
        except Exception:
            pass
 
    # ... rest of handle_message unchanged (sends final message as before)

Why: The integration follows the exact pattern of ProgressEdits — a background task with its own cancel scope that runs alongside the runner. TextDeltaEvents are routed to StreamingDraftEdits while ProgressEdits ignores them. The final message is still sent via the existing path.


Step 8: Thread config through the call chain

Files:

  • /home/ubuntu/takopi/src/takopi/telegram/loop.py (pass streaming_drafts and telegram_bot)
  • /home/ubuntu/takopi/src/takopi/telegram/commands/handlers.py (find run_engine and thread through)

What: The streaming_drafts flag and telegram_bot reference need to flow from TelegramBridgeConfig down to handle_message(). Trace the call chain:

run_main_loop run_job run_engine handle_message

The ExecBridgeConfig already carries transport and presenter. We need to add:

  • streaming_drafts: bool to ExecBridgeConfig
  • Access to the TelegramClient (the bot from TelegramBridgeConfig) for send_message_draft

The TelegramClient is cfg.bot in TelegramBridgeConfig. It needs to be passed to handle_message via ExecBridgeConfig or as a separate parameter.

Cleanest approach: Add streaming_drafts: bool = False and draft_bot: Any | None = None to ExecBridgeConfig:

@dataclass(frozen=True, slots=True)
class ExecBridgeConfig:
    transport: Transport
    presenter: Presenter
    final_notify: bool
    streaming_drafts: bool = False  # NEW
    draft_bot: Any = None           # NEW: TelegramClient for drafts

Then in the Telegram bridge setup code where ExecBridgeConfig is constructed, pass through the values.

Also ensure stream_text=True is injected into the Claude runner config when streaming_drafts is enabled. Find where engine config is assembled and add the injection.


Step 9: Handle edge cases

Interleaved tool use + text: When Claude uses a tool between text blocks, the accumulated text resets. In translate_claude_event, when a StreamAssistantMessage with StreamToolUseBlock is seen, reset state.accumulated_text = "". The next text block starts fresh.

Empty text: TextDeltaEvent is only emitted for non-empty deltas (already checked).

Very long text: Telegram messages have a 4096 character limit. For drafts, truncate at ~4000 characters and append ”…” to indicate continuation. The final message uses the existing split/trim logic.

Errors mid-stream: If the runner errors, the draft scope is cancelled. Any pending drafts are dropped. The error message is rendered via the existing path.

Cancellation: When cancel_requested fires, drafts stop. The cancelled progress message is shown as before.

Graceful fallback: If sendMessageDraft returns an error (API not supported, wrong chat type), the first failure is logged and StreamingDraftEdits can set an internal _disabled flag to stop trying. Progress edits continue working. The user experience degrades gracefully to the current behavior.

# In StreamingDraftEdits.run():
except Exception as exc:
    self._failure_count += 1
    if self._failure_count >= 3:
        logger.warning(
            "streaming_draft.disabled",
            chat_id=self.chat_id,
            failures=self._failure_count,
        )
        return  # Stop trying

Architecture Decision: Drafts vs Progress Message

Decision: Drafts and progress message coexist. They serve different purposes:

  • Progress message (editMessageText): Shows actions list, elapsed time, cancel button. Keeps working as before.
  • Drafts (sendMessageDraft): Shows the actual response text appearing in real-time. Ephemeral, no message to manage.

When streaming completes:

  1. Drafts stop automatically (scope cancelled, pending drafts dropped)
  2. Final message is sent as before (sendMessage or editMessageText on the progress message)
  3. The draft “bubble” in Telegram disappears when the real message arrives

This means the user sees:

  • Progress message updating with actions (“reading file…”, “editing code…“)
  • Draft text appearing in the chat simultaneously with the partial answer
  • Final message replacing the progress message with the complete answer

Testing Strategy

  1. Unit test TextDeltaEvent: Verify it’s correctly constructed and serialized
  2. Unit test translate_claude_event: Mock StreamEventMessage with text_delta, verify TextDeltaEvent is emitted with correct accumulated text
  3. Unit test StreamingDraftEdits: Mock bot client, verify debouncing, verify draft text, verify graceful failure
  4. Unit test config: Verify streaming_drafts field in TelegramTransportSettings
  5. Integration test: Run with streaming_drafts = true against a test chat, verify drafts appear and final message is correct

File Summary

FileChange
model.pyAdd TextDeltaEvent dataclass and update union type
runners/claude.pyAdd --include-partial-messages, handle StreamEventMessage, emit TextDeltaEvent
schemas/claude.pyNo changes needed (StreamEventMessage already defined)
telegram/client_api.pyAdd send_message_draft to BotClient + HttpBotClient
telegram/client.pyAdd send_message_draft with outbox dedup
telegram/outbox.pyAdd DRAFT_PRIORITY = 3
settings.pyAdd streaming_drafts: bool = False
telegram/bridge.pyAdd streaming_drafts to TelegramBridgeConfig
runner_bridge.pyAdd StreamingDraftEdits class, integrate into handle_message and run_runner_with_cancel
telegram/loop.pyThread config through to handle_message
events.pyNo changes (TextDeltaEvent doesn’t use EventFactory)
progress.pyNo changes (TextDeltaEvent is ignored by ProgressTracker)
presenter.pyNo changes

Implementation Order

  1. model.py — add TextDeltaEvent (no dependencies)
  2. schemas/claude.py — verify StreamEventMessage structure (no changes needed)
  3. runners/claude.py — add --include-partial-messages, handle events, emit TextDeltaEvent
  4. telegram/outbox.py — add DRAFT_PRIORITY
  5. telegram/client_api.py — add send_message_draft method
  6. telegram/client.py — add send_message_draft with outbox
  7. settings.py — add streaming_drafts config field
  8. telegram/bridge.py — add to TelegramBridgeConfig
  9. runner_bridge.py — add StreamingDraftEdits, integrate into handle_message
  10. telegram/loop.py + handlers — thread config through call chain

Risk Mitigation

  • Feature flag (streaming_drafts = false default): Zero risk to existing behavior
  • Graceful fallback: If API fails, automatic disable after 3 failures
  • No existing code deleted: All changes are additive
  • Outbox dedup: Only latest draft survives, preventing rate limit issues
  • Independent of progress edits: Both systems work independently

Open Questions

  1. Exact sendMessageDraft parameters: The official API documentation was not fully accessible during research. The implementation assumes standard Telegram patterns. May need adjustment once exact API docs are consulted.
  2. Draft rate limits: Unknown. The outbox dedup + min_draft_interval_s should be sufficient, but may need tuning.
  3. Draft support in groups: May only work in private chats. The graceful fallback handles this.
  4. Entities support in drafts: Unknown if drafts support formatting entities. Initial implementation may use plain text only.