Streaming Data
xbbg provides two interfaces for real-time Bloomberg data: stream() / astream() for simple iteration, and subscribe() / asubscribe() for full lifecycle control. All four accept the same parameters; the a-prefixed variants are the canonical async implementations — the sync variants run them in a background thread.
Choosing an Interface
Section titled “Choosing an Interface”| Use case | Function |
|---|---|
| Simple iteration, no dynamic changes | stream() (sync) or astream() (async) |
| Dynamic add/remove, health inspection | subscribe() (sync) or asubscribe() (async) |
| Running inside an existing async event loop | astream() or asubscribe() |
| Script with no event loop | stream() or subscribe() |
Every subscription is isolated — each subscribe() / asubscribe() call gets its own Bloomberg session from the pool, so topic streams cannot cross-contaminate one another.
Basic Streaming
Section titled “Basic Streaming”stream() and astream() are context-manager generators. Break out of the loop to stop the subscription cleanly.
from xbbg import blp
async def main(): async for batch in blp.astream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']): # batch is an xbbg.ArrowTable by default (or xbbg.ArrowRecordBatch if raw=True) print(batch)from xbbg import blp
for batch in blp.stream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']): print(batch)Each batch is a Narwhals DataFrame by default, backed by PyArrow when PyArrow is installed and otherwise by the same warned fallback chain as request endpoints. Pass backend='native' for raw xbbg.ArrowTable batches, backend='pyarrow' for a real pyarrow.Table, or another backend for explicit conversion. Use raw=True to receive raw xbbg.ArrowRecordBatch objects for maximum throughput.
async for batch in blp.astream(['AAPL US Equity'], ['LAST_PRICE'], raw=True): rows = batch.to_pylist()Full Subscription Lifecycle
Section titled “Full Subscription Lifecycle”asubscribe() returns a Subscription handle that you iterate separately. This gives you dynamic add/remove and access to health metadata.
Create and iterate
Section titled “Create and iterate”from xbbg import blp
async def main(): sub = await blp.asubscribe( ['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK'], )
async for batch in sub: print(batch)Context manager
Section titled “Context manager”Use async with to guarantee cleanup on exit or exception:
async with await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE']) as sub: count = 0 async for batch in sub: print(batch) count += 1 if count >= 100: breakAdd tickers dynamically
Section titled “Add tickers dynamically”Tickers can be added while the subscription is active. The new topics are subscribed on the same Bloomberg session:
sub = await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE', 'BID', 'ASK'])
async for batch in sub: print(batch) if should_add_more: await sub.add(['MSFT US Equity', 'GOOG US Equity']) should_add_more = FalseRemove tickers
Section titled “Remove tickers”await sub.remove(['AAPL US Equity'])Removed tickers stop delivering updates immediately. The remaining tickers continue unaffected.
Unsubscribe
Section titled “Unsubscribe”# Clean close — discard buffered dataawait sub.unsubscribe()
# Drain — return any remaining buffered batches before closingremaining = await sub.unsubscribe(drain=True)Subscription properties
Section titled “Subscription properties”| Property | Type | Description |
|---|---|---|
sub.tickers | list[str] | Currently subscribed tickers |
sub.fields | list[str] | Subscribed fields |
sub.is_active | bool | Whether the subscription is open |
sub.all_failed | bool | True if every ticker has failed or terminated |
Subscription Configuration
Section titled “Subscription Configuration”Per-subscription tuning is done via keyword arguments on asubscribe() and astream().
all_fields
Section titled “all_fields”By default, Bloomberg delivers only the fields you requested. Setting all_fields=True exposes all top-level scalar fields Bloomberg sends (including INITPAINT summary fields for the initial snapshot):
# Receive INITPAINT fields and any extra scalar fields Bloomberg sendssub = await blp.asubscribe( ['XBTUSD Curncy'], ['LAST_PRICE', 'BID', 'ASK'], all_fields=True,)flush_threshold, stream_capacity, overflow_policy
Section titled “flush_threshold, stream_capacity, overflow_policy”These control the backpressure pipeline between the Bloomberg SDK event thread and your Python consumer.
from xbbg import blp, configure
# Global defaults (applied to all new subscriptions)configure( subscription_flush_threshold=10, # buffer up to 10 ticks before flushing subscription_stream_capacity=512, # internal channel depth overflow_policy='drop_newest', # what to do when the channel is full)You can also pass them per-subscription to asubscribe() and astream():
sub = await blp.asubscribe( ['AAPL US Equity'], ['LAST_PRICE', 'BID', 'ASK'], flush_threshold=5, stream_capacity=1024, overflow_policy='block', # block the Rust thread until Python catches up)| Parameter | Default | Description |
|---|---|---|
flush_threshold | 1 | Ticks buffered before flushing to Python. Increase for throughput, decrease for latency. |
stream_capacity | 256 | Backpressure buffer size (number of batches). |
overflow_policy | 'drop_newest' | Slow-consumer policy: 'drop_newest' or 'block'. |
Specialized Streams
Section titled “Specialized Streams”xbbg provides dedicated functions for Bloomberg’s specialized subscription services. All return a Subscription handle and follow the same async iteration pattern. Sync wrappers (vwap, mktbar, depth, chains) are also available.
VWAP — avwap() / vwap()
Section titled “VWAP — avwap() / vwap()”Streaming Volume Weighted Average Price via //blp/mktvwap.
# Basic — default fields: RT_PX_VWAP, RT_VWAP_VOLUMEsub = await blp.avwap(['AAPL US Equity'])async for batch in sub: print(batch)await sub.unsubscribe()# Custom time windowsub = await blp.avwap( ['AAPL US Equity', 'MSFT US Equity'], start_time='09:30', end_time='16:00',)# Request additional VWAP fieldssub = await blp.avwap( 'AAPL US Equity', ['RT_PX_VWAP', 'RT_VWAP_VOLUME', 'RT_VWAP_TURNOVER'],)| Parameter | Default | Description |
|---|---|---|
tickers | required | One or more securities |
fields | ['RT_PX_VWAP', 'RT_VWAP_VOLUME'] | Fields to subscribe to |
start_time | None | VWAP calculation start (e.g., '09:30') |
end_time | None | VWAP calculation end (e.g., '16:00') |
Market Bars — amktbar() / mktbar()
Section titled “Market Bars — amktbar() / mktbar()”Streaming real-time OHLCV bars via //blp/mktbar. Like bdib() but live as bars form.
Default fields: OPEN, HIGH, LOW, CLOSE, VOLUME, NUM_TRADES.
# 1-minute bars (default interval)sub = await blp.amktbar('AAPL US Equity')async for batch in sub: print(batch)# 5-minute bars with a session windowasync with await blp.amktbar( ['AAPL US Equity', 'MSFT US Equity'], interval=5, start_time='09:30', end_time='16:00',) as sub: async for batch in sub: print(batch)| Parameter | Default | Description |
|---|---|---|
tickers | required | One or more securities |
interval | 1 | Bar interval in minutes |
start_time | None | Session start in HH:MM format |
end_time | None | Session end in HH:MM format |
Market Depth — adepth() / depth()
Section titled “Market Depth — adepth() / depth()”Streaming Level 2 order book data via //blp/mktdepth. Fields are implicit (provided by Bloomberg).
from xbbg.exceptions import BlpBPipeErrorfrom xbbg import blp
try: async with await blp.adepth('AAPL US Equity') as sub: async for batch in sub: print(batch) # Bid/ask levels and sizesexcept BlpBPipeError as e: print(f"B-PIPE required: {e}")Option and Futures Chains — achains() / chains()
Section titled “Option and Futures Chains — achains() / chains()”Streaming option or futures chain updates via //blp/mktlist.
# Option chain (default)async with await blp.achains('AAPL US Equity') as sub: async for batch in sub: print(batch)# Futures chainsub = await blp.achains('ES1 Index', chain_type='FUTURES')async for batch in sub: print(batch)await sub.unsubscribe()| Parameter | Default | Description |
|---|---|---|
underlying | required | Underlying security identifier |
chain_type | 'OPTIONS' | 'OPTIONS' or 'FUTURES' |
Health and Observability
Section titled “Health and Observability”The Subscription object exposes several properties for runtime inspection. None of these require pausing iteration — they are safe to read at any point.
status
Section titled “status”A combined snapshot of the subscription’s operational state:
print(sub.status)# {# 'active': True,# 'all_failed': False,# 'tickers': ['AAPL US Equity', 'MSFT US Equity'],# 'failed_tickers': [],# 'topic_states': {# 'AAPL US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000000},# 'MSFT US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000001},# },# 'session': {...},# 'admin': {...},# 'services': {...},# }topic_states
Section titled “topic_states”Per-ticker lifecycle state keyed by ticker string. Useful for confirming that tickers are in SUBSCRIBED state before processing data:
for ticker, state_info in sub.topic_states.items(): print(f"{ticker}: {state_info['state']}")failed_tickers
Section titled “failed_tickers”List of tickers Bloomberg rejected or terminated. In a mixed-subscription scenario (some tickers valid, some invalid), this will be non-empty while is_active remains True as long as at least one ticker is healthy:
if sub.failed_tickers: print("Failed:", sub.failed_tickers)failures
Section titled “failures”Detailed failure records, each containing ticker, reason, and kind ("failure" or "terminated"):
for f in sub.failures: print(f"{f['ticker']}: {f['reason']} ({f['kind']})")events
Section titled “events”Bounded lifecycle event history for the subscription. Each entry contains at_us (microsecond timestamp), category, level, message_type, topic, and detail:
for event in sub.events: print(f"[{event['level']}] {event['message_type']}: {event['detail']}")Subscription throughput metrics:
print(sub.stats)# {# 'messages_received': 4821,# 'dropped_batches': 0,# 'batches_sent': 4821,# 'slow_consumer': False,# 'data_loss_events': 0,# 'last_message_us': 1711000123456,# 'effective_overflow_policy': 'drop_newest',# }| Key | Description |
|---|---|
messages_received | Total messages received from Bloomberg |
dropped_batches | Batches dropped due to overflow |
batches_sent | Batches successfully delivered to Python |
slow_consumer | True if Bloomberg signalled DATALOSS |
data_loss_events | Total Bloomberg data-loss signals observed |
effective_overflow_policy | Runtime policy used by the Rust stream |
Error Handling
Section titled “Error Handling”Topic failure isolation
Section titled “Topic failure isolation”Bloomberg’s subscription model isolates failures at the topic level. If one ticker is invalid or gets terminated, the other tickers in the same subscription continue delivering data. Your loop keeps running; only failed_tickers grows.
sub = await blp.asubscribe( ['AAPL US Equity', 'INVALID_TICKER'], ['LAST_PRICE'],)
async for batch in sub: # AAPL ticks still arrive; INVALID_TICKER never produces data print(batch) print("Failed so far:", sub.failed_tickers)Stopping when all tickers fail
Section titled “Stopping when all tickers fail”Use all_failed to detect the total-failure case and exit cleanly:
sub = await blp.asubscribe(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE'])
async for batch in sub: process(batch) if sub.all_failed: print("All tickers failed:", sub.failed_tickers) break
await sub.unsubscribe()Handling partial failures at startup
Section titled “Handling partial failures at startup”Check topic_states after a short wait to confirm which tickers successfully subscribed before committing downstream resources:
import asyncio
sub = await blp.asubscribe( ['AAPL US Equity', 'INVALID_TICKER', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK'],)
# Brief wait for Bloomberg's INITPAINT / subscription status eventsawait asyncio.sleep(2)
active = [t for t, s in sub.topic_states.items() if s['state'] == 'SUBSCRIBED']failed = sub.failed_tickersprint(f"Active: {active}")print(f"Failed: {failed}")
if not active: await sub.unsubscribe() raise RuntimeError("No tickers subscribed successfully")
async for batch in sub: process(batch)Connection interruption
Section titled “Connection interruption”Each subscription session is isolated. If the session drops and reconnects, Bloomberg will resend INITPAINT (initial snapshot) events. The events property records session lifecycle transitions you can inspect to detect reconnect cycles.
See Also
Section titled “See Also”- API Reference —
subscribe/asubscribe - API Reference —
stream/astream - Engine Configuration for
subscription_pool_size, global flush/overflow defaults