Source code for goodfire.utils.comparison

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Generator

import ipywidgets
from IPython.display import display

from ..api.chat.interfaces import StreamingChatCompletionChunk


[docs] def StreamingMultiplexer( streaming_generators: list[Generator[StreamingChatCompletionChunk, Any, Any]], boxes_per_row: int = 2, ): """Display multiple streaming outputs simultaneously in a grid layout. Uses ThreadPoolExecutor to concurrently process multiple streaming generators and display their outputs in a grid of text areas. Each stream gets its own text area that updates in real-time. Args: streaming_generators: List of generators yielding StreamingChatCompletionChunks boxes_per_row: Number of text areas to display per row. Defaults to 2. Example: >>> generators = [chat.stream() for chat in chats] >>> StreamingMultiplexer(generators, boxes_per_row=3) """ def _stream_output( generator: Generator[StreamingChatCompletionChunk, Any, Any], output_widget: ipywidgets.Textarea, ): for chunk in generator: output_widget.value += chunk.choices[0].delta.content textareas = [ ipywidgets.Textarea( value="", placeholder=f"{index}", disabled=False, layout=ipywidgets.Layout(width="45%", height="200px"), ) for index in range(len(streaming_generators)) ] for textarea in range(0, len(textareas), boxes_per_row): hbox = ipywidgets.HBox(textareas[textarea : textarea + boxes_per_row]) display(hbox) with ThreadPoolExecutor(max_workers=len(streaming_generators)) as executor: futures = [] for index, generator in enumerate(streaming_generators): future = executor.submit(_stream_output, generator, textareas[index]) futures.append(future) for future in as_completed(futures): pass