Skip to content

Streaming Data

xbbg provides two interfaces for real-time Bloomberg data: stream() / astream() for simple iteration, and subscribe() / asubscribe() for full lifecycle control. All four accept the same parameters; the a-prefixed variants are the canonical async implementations — the sync variants run them in a background thread.

Use caseFunction
Simple iteration, no dynamic changesstream() (sync) or astream() (async)
Dynamic add/remove, health inspectionsubscribe() (sync) or asubscribe() (async)
Running inside an existing async event loopastream() or asubscribe()
Script with no event loopstream() or subscribe()

Every subscription is isolated — each subscribe() / asubscribe() call gets its own Bloomberg session from the pool, so topic streams cannot cross-contaminate one another.


stream() and astream() are context-manager generators. Break out of the loop to stop the subscription cleanly.

from xbbg import blp
async def main():
async for batch in blp.astream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']):
# batch is an xbbg.ArrowTable by default (or xbbg.ArrowRecordBatch if raw=True)
print(batch)

Each batch is a Narwhals DataFrame by default, backed by PyArrow when PyArrow is installed and otherwise by the same warned fallback chain as request endpoints. Pass backend='native' for raw xbbg.ArrowTable batches, backend='pyarrow' for a real pyarrow.Table, or another backend for explicit conversion. Use raw=True to receive raw xbbg.ArrowRecordBatch objects for maximum throughput.

xbbg.ArrowRecordBatch
async for batch in blp.astream(['AAPL US Equity'], ['LAST_PRICE'], raw=True):
rows = batch.to_pylist()

asubscribe() returns a Subscription handle that you iterate separately. This gives you dynamic add/remove and access to health metadata.

from xbbg import blp
async def main():
sub = await blp.asubscribe(
['AAPL US Equity', 'MSFT US Equity'],
['LAST_PRICE', 'BID', 'ASK'],
)
async for batch in sub:
print(batch)

Use async with to guarantee cleanup on exit or exception:

async with await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE']) as sub:
count = 0
async for batch in sub:
print(batch)
count += 1
if count >= 100:
break

Tickers can be added while the subscription is active. The new topics are subscribed on the same Bloomberg session:

sub = await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE', 'BID', 'ASK'])
async for batch in sub:
print(batch)
if should_add_more:
await sub.add(['MSFT US Equity', 'GOOG US Equity'])
should_add_more = False
await sub.remove(['AAPL US Equity'])

Removed tickers stop delivering updates immediately. The remaining tickers continue unaffected.

# Clean close — discard buffered data
await sub.unsubscribe()
# Drain — return any remaining buffered batches before closing
remaining = await sub.unsubscribe(drain=True)
PropertyTypeDescription
sub.tickerslist[str]Currently subscribed tickers
sub.fieldslist[str]Subscribed fields
sub.is_activeboolWhether the subscription is open
sub.all_failedboolTrue if every ticker has failed or terminated

Per-subscription tuning is done via keyword arguments on asubscribe() and astream().

By default, Bloomberg delivers only the fields you requested. Setting all_fields=True exposes all top-level scalar fields Bloomberg sends (including INITPAINT summary fields for the initial snapshot):

# Receive INITPAINT fields and any extra scalar fields Bloomberg sends
sub = await blp.asubscribe(
['XBTUSD Curncy'],
['LAST_PRICE', 'BID', 'ASK'],
all_fields=True,
)

flush_threshold, stream_capacity, overflow_policy

Section titled “flush_threshold, stream_capacity, overflow_policy”

These control the backpressure pipeline between the Bloomberg SDK event thread and your Python consumer.

from xbbg import blp, configure
# Global defaults (applied to all new subscriptions)
configure(
subscription_flush_threshold=10, # buffer up to 10 ticks before flushing
subscription_stream_capacity=512, # internal channel depth
overflow_policy='drop_newest', # what to do when the channel is full
)

You can also pass them per-subscription to asubscribe() and astream():

sub = await blp.asubscribe(
['AAPL US Equity'],
['LAST_PRICE', 'BID', 'ASK'],
flush_threshold=5,
stream_capacity=1024,
overflow_policy='block', # block the Rust thread until Python catches up
)
ParameterDefaultDescription
flush_threshold1Ticks buffered before flushing to Python. Increase for throughput, decrease for latency.
stream_capacity256Backpressure buffer size (number of batches).
overflow_policy'drop_newest'Slow-consumer policy: 'drop_newest' or 'block'.

xbbg provides dedicated functions for Bloomberg’s specialized subscription services. All return a Subscription handle and follow the same async iteration pattern. Sync wrappers (vwap, mktbar, depth, chains) are also available.

Streaming Volume Weighted Average Price via //blp/mktvwap.

# Basic — default fields: RT_PX_VWAP, RT_VWAP_VOLUME
sub = await blp.avwap(['AAPL US Equity'])
async for batch in sub:
print(batch)
await sub.unsubscribe()
# Custom time window
sub = await blp.avwap(
['AAPL US Equity', 'MSFT US Equity'],
start_time='09:30',
end_time='16:00',
)
# Request additional VWAP fields
sub = await blp.avwap(
'AAPL US Equity',
['RT_PX_VWAP', 'RT_VWAP_VOLUME', 'RT_VWAP_TURNOVER'],
)
ParameterDefaultDescription
tickersrequiredOne or more securities
fields['RT_PX_VWAP', 'RT_VWAP_VOLUME']Fields to subscribe to
start_timeNoneVWAP calculation start (e.g., '09:30')
end_timeNoneVWAP calculation end (e.g., '16:00')

Streaming real-time OHLCV bars via //blp/mktbar. Like bdib() but live as bars form.

Default fields: OPEN, HIGH, LOW, CLOSE, VOLUME, NUM_TRADES.

# 1-minute bars (default interval)
sub = await blp.amktbar('AAPL US Equity')
async for batch in sub:
print(batch)
# 5-minute bars with a session window
async with await blp.amktbar(
['AAPL US Equity', 'MSFT US Equity'],
interval=5,
start_time='09:30',
end_time='16:00',
) as sub:
async for batch in sub:
print(batch)
ParameterDefaultDescription
tickersrequiredOne or more securities
interval1Bar interval in minutes
start_timeNoneSession start in HH:MM format
end_timeNoneSession end in HH:MM format

Streaming Level 2 order book data via //blp/mktdepth. Fields are implicit (provided by Bloomberg).

from xbbg.exceptions import BlpBPipeError
from xbbg import blp
try:
async with await blp.adepth('AAPL US Equity') as sub:
async for batch in sub:
print(batch) # Bid/ask levels and sizes
except BlpBPipeError as e:
print(f"B-PIPE required: {e}")

Option and Futures Chains — achains() / chains()

Section titled “Option and Futures Chains — achains() / chains()”

Streaming option or futures chain updates via //blp/mktlist.

# Option chain (default)
async with await blp.achains('AAPL US Equity') as sub:
async for batch in sub:
print(batch)
# Futures chain
sub = await blp.achains('ES1 Index', chain_type='FUTURES')
async for batch in sub:
print(batch)
await sub.unsubscribe()
ParameterDefaultDescription
underlyingrequiredUnderlying security identifier
chain_type'OPTIONS''OPTIONS' or 'FUTURES'

The Subscription object exposes several properties for runtime inspection. None of these require pausing iteration — they are safe to read at any point.

A combined snapshot of the subscription’s operational state:

print(sub.status)
# {
# 'active': True,
# 'all_failed': False,
# 'tickers': ['AAPL US Equity', 'MSFT US Equity'],
# 'failed_tickers': [],
# 'topic_states': {
# 'AAPL US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000000},
# 'MSFT US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000001},
# },
# 'session': {...},
# 'admin': {...},
# 'services': {...},
# }

Per-ticker lifecycle state keyed by ticker string. Useful for confirming that tickers are in SUBSCRIBED state before processing data:

for ticker, state_info in sub.topic_states.items():
print(f"{ticker}: {state_info['state']}")

List of tickers Bloomberg rejected or terminated. In a mixed-subscription scenario (some tickers valid, some invalid), this will be non-empty while is_active remains True as long as at least one ticker is healthy:

if sub.failed_tickers:
print("Failed:", sub.failed_tickers)

Detailed failure records, each containing ticker, reason, and kind ("failure" or "terminated"):

for f in sub.failures:
print(f"{f['ticker']}: {f['reason']} ({f['kind']})")

Bounded lifecycle event history for the subscription. Each entry contains at_us (microsecond timestamp), category, level, message_type, topic, and detail:

for event in sub.events:
print(f"[{event['level']}] {event['message_type']}: {event['detail']}")

Subscription throughput metrics:

print(sub.stats)
# {
# 'messages_received': 4821,
# 'dropped_batches': 0,
# 'batches_sent': 4821,
# 'slow_consumer': False,
# 'data_loss_events': 0,
# 'last_message_us': 1711000123456,
# 'effective_overflow_policy': 'drop_newest',
# }
KeyDescription
messages_receivedTotal messages received from Bloomberg
dropped_batchesBatches dropped due to overflow
batches_sentBatches successfully delivered to Python
slow_consumerTrue if Bloomberg signalled DATALOSS
data_loss_eventsTotal Bloomberg data-loss signals observed
effective_overflow_policyRuntime policy used by the Rust stream

Bloomberg’s subscription model isolates failures at the topic level. If one ticker is invalid or gets terminated, the other tickers in the same subscription continue delivering data. Your loop keeps running; only failed_tickers grows.

sub = await blp.asubscribe(
['AAPL US Equity', 'INVALID_TICKER'],
['LAST_PRICE'],
)
async for batch in sub:
# AAPL ticks still arrive; INVALID_TICKER never produces data
print(batch)
print("Failed so far:", sub.failed_tickers)

Use all_failed to detect the total-failure case and exit cleanly:

sub = await blp.asubscribe(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE'])
async for batch in sub:
process(batch)
if sub.all_failed:
print("All tickers failed:", sub.failed_tickers)
break
await sub.unsubscribe()

Check topic_states after a short wait to confirm which tickers successfully subscribed before committing downstream resources:

import asyncio
sub = await blp.asubscribe(
['AAPL US Equity', 'INVALID_TICKER', 'MSFT US Equity'],
['LAST_PRICE', 'BID', 'ASK'],
)
# Brief wait for Bloomberg's INITPAINT / subscription status events
await asyncio.sleep(2)
active = [t for t, s in sub.topic_states.items() if s['state'] == 'SUBSCRIBED']
failed = sub.failed_tickers
print(f"Active: {active}")
print(f"Failed: {failed}")
if not active:
await sub.unsubscribe()
raise RuntimeError("No tickers subscribed successfully")
async for batch in sub:
process(batch)

Each subscription session is isolated. If the session drops and reconnects, Bloomberg will resend INITPAINT (initial snapshot) events. The events property records session lifecycle transitions you can inspect to detect reconnect cycles.