Multi-Exchange¶
Stream and merge data from multiple cryptocurrency exchanges simultaneously. Perfect for price comparison, arbitrage monitoring, and unified data collection.
What is Multi-Exchange?¶
Multi-exchange streaming allows you to:
- Stream from multiple exchanges at once
- Merge streams into a unified pipeline
- Compare prices across exchanges
- Detect arbitrage opportunities
- Store in unified database with exchange labels
Merge Streams¶
Basic Merging¶
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
async def main():
# Create runners for different exchanges
binance_runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
okx_runner = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT"], # OKX format
timeframe="1m"
)
)
# Merge and process
async for data in merge_streams(binance_runner, okx_runner):
print(f"{data.source:8} | {data.symbol:10} | ${data.close:,.2f}")
asyncio.run(main())
Output:
Binance | BTCUSDT | $43,260.00
OKX | BTC-USDT | $43,255.00
Binance | BTCUSDT | $43,265.00
OKX | BTC-USDT | $43,258.00
Three Exchanges¶
async def three_exchanges():
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT"],
timeframe="1m"
)
)
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT", "ETH-USDT"],
timeframe="1m"
)
)
kraken = sf.KrakenRunner(
stream_input=sf.DataInput(
type="ohlc",
symbols=["BTC/USD", "ETH/USD"],
timeframe="1m"
)
)
# Merge all 3
async for data in merge_streams(binance, okx, kraken):
print(f"📡 {data.source:8} | {data.symbol:12} | ${data.close:,.2f}")
asyncio.run(three_exchanges())
Symbol Format Mapping¶
Each exchange uses different symbol formats:
| Exchange | BTC/USD Format | ETH/USD Format |
|---|---|---|
| Binance | BTCUSDT |
ETHUSDT |
| OKX | BTC-USDT |
ETH-USDT |
| Kraken | BTC/USD |
ETH/USD |
When merging, StreamForge preserves the original symbol format. For comparison, normalize symbols in your code:
def normalize_symbol(symbol: str, source: str) -> str:
"""Normalize symbol to common format"""
if source == "Binance":
return symbol # Already normalized
elif source == "OKX":
return symbol.replace("-", "") # BTC-USDT → BTCUSDT
elif source == "Kraken":
return symbol.replace("/", "") # BTC/USD → BTCUSD
return symbol
async for data in merge_streams(binance, okx, kraken):
normalized = normalize_symbol(data.symbol, data.source)
print(f"{data.source} {normalized}: ${data.close:,.2f}")
Unified Database¶
Store data from all exchanges in one table:
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Float, BigInteger
Base = declarative_base()
class UnifiedKlineTable(Base):
"""One table for all exchanges"""
__tablename__ = 'unified_klines'
source = Column(String, primary_key=True) # Exchange name
symbol = Column(String, primary_key=True)
timeframe = Column(String, primary_key=True)
open_ts = Column(BigInteger, primary_key=True)
end_ts = Column(BigInteger)
open = Column(Float)
high = Column(Float)
low = Column(Float)
close = Column(Float)
volume = Column(Float)
async def unified_database():
# Create shared emitter
postgres = (sf.PostgresEmitter(
host="localhost",
dbname="crypto",
user="postgres",
password="secret"
)
.set_model(UnifiedKlineTable)
.on_conflict(["source", "symbol", "timeframe", "open_ts"])
)
await postgres.connect()
# Create runners
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT"],
timeframe="1m"
)
)
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT", "ETH-USDT"],
timeframe="1m"
)
)
# Merge and emit to shared database
async for data in merge_streams(binance, okx):
await postgres.emit(data)
print(f"💾 Saved {data.source} {data.symbol}")
asyncio.run(unified_database())
Query the unified table:
-- All BTC data across exchanges
SELECT source, symbol, close, volume
FROM unified_klines
WHERE symbol LIKE '%BTC%'
ORDER BY open_ts DESC
LIMIT 10;
-- Compare prices by exchange
SELECT
source,
AVG(close) as avg_price,
MAX(close) as max_price,
MIN(close) as min_price
FROM unified_klines
WHERE symbol LIKE '%BTC%'
AND timeframe = '1m'
AND open_ts > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 hour') * 1000
GROUP BY source;
Price Comparison¶
Track price differences in real-time:
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
async def price_comparison():
# Store latest prices
latest_prices = {}
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT"],
timeframe="1m"
)
)
async for data in merge_streams(binance, okx):
# Update latest price
latest_prices[data.source] = data.close
# Compare when we have both
if len(latest_prices) >= 2:
binance_price = latest_prices.get("Binance", 0)
okx_price = latest_prices.get("OKX", 0)
if binance_price and okx_price:
diff = binance_price - okx_price
diff_pct = (diff / binance_price) * 100
print(f"\n💰 Price Comparison:")
print(f" Binance: ${binance_price:,.2f}")
print(f" OKX: ${okx_price:,.2f}")
print(f" Diff: ${diff:+.2f} ({diff_pct:+.4f}%)")
# Alert on large difference
if abs(diff_pct) > 0.1:
print(f" 🚨 ARBITRAGE OPPORTUNITY!")
asyncio.run(price_comparison())
Arbitrage Detection¶
Detect and alert on arbitrage opportunities:
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
from collections import defaultdict
class ArbitrageDetector:
"""Detect arbitrage opportunities"""
def __init__(self, threshold_pct=0.1):
self.threshold = threshold_pct
self.prices = defaultdict(dict) # {symbol: {exchange: price}}
def update(self, data):
"""Update price and check for arbitrage"""
# Normalize symbol for comparison
symbol = self.normalize_symbol(data.symbol)
# Update price
self.prices[symbol][data.source] = data.close
# Check arbitrage
self.check_arbitrage(symbol)
def normalize_symbol(self, symbol: str) -> str:
"""Normalize to BTC/USD format"""
return symbol.replace("-", "/").replace("USDT", "/USDT")
def check_arbitrage(self, symbol: str):
"""Check for arbitrage opportunities"""
prices = self.prices[symbol]
if len(prices) < 2:
return
# Find highest and lowest
exchanges = list(prices.keys())
highest_exchange = max(exchanges, key=lambda e: prices[e])
lowest_exchange = min(exchanges, key=lambda e: prices[e])
highest_price = prices[highest_exchange]
lowest_price = prices[lowest_exchange]
# Calculate difference
diff = highest_price - lowest_price
diff_pct = (diff / lowest_price) * 100
# Alert if above threshold
if diff_pct > self.threshold:
print(f"\n🚨 ARBITRAGE ALERT: {symbol}")
print(f" Buy @ {lowest_exchange:8}: ${lowest_price:,.2f}")
print(f" Sell @ {highest_exchange:8}: ${highest_price:,.2f}")
print(f" Profit: ${diff:,.2f} ({diff_pct:.2f}%)")
async def arbitrage_monitor():
"""Monitor for arbitrage opportunities"""
detector = ArbitrageDetector(threshold_pct=0.1)
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT"],
timeframe="1m"
)
)
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT", "ETH-USDT"],
timeframe="1m"
)
)
kraken = sf.KrakenRunner(
stream_input=sf.DataInput(
type="ohlc",
symbols=["BTC/USD", "ETH/USD"],
timeframe="1m"
)
)
async for data in merge_streams(binance, okx, kraken):
detector.update(data)
asyncio.run(arbitrage_monitor())
Complete Examples¶
Example 1: Multi-Symbol Multi-Exchange¶
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
async def multi_everything():
"""Multiple symbols from multiple exchanges"""
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
timeframe="1m"
)
)
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT", "ETH-USDT", "SOL-USDT"],
timeframe="1m"
)
)
print("✓ Streaming:")
print(" - 3 symbols (BTC, ETH, SOL)")
print(" - 2 exchanges (Binance, OKX)")
print(" = 6 data streams merged!\n")
async for data in merge_streams(binance, okx):
print(f"📈 {data.source:8} | {data.symbol:10} | ${data.close:>10,.2f} | Vol: {data.volume:>10,.2f}")
asyncio.run(multi_everything())
Example 2: Logger Per Exchange¶
Separate loggers for each exchange:
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
async def separate_loggers():
"""Use different emitters per exchange"""
# Binance with its own logger
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
binance.register_emitter(sf.Logger(prefix="🟡 Binance"))
# OKX with its own logger
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT"],
timeframe="1m"
)
)
okx.register_emitter(sf.Logger(prefix="🔵 OKX"))
# Merge streams (emitters already attached)
async for data in merge_streams(binance, okx):
pass # Data already logged by individual emitters
asyncio.run(separate_loggers())
Example 3: CSV Per Exchange¶
Separate CSV files per exchange:
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
async def csv_per_exchange():
"""Save each exchange to separate CSV"""
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
binance.register_emitter(sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="binance_btc.csv"
))
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT"],
timeframe="1m"
)
)
okx.register_emitter(sf.CSVEmitter(
source="OKX",
symbol="BTC-USDT",
timeframe="1m",
file_path="okx_btc.csv"
))
async for data in merge_streams(binance, okx):
pass # Data automatically saved by emitters
asyncio.run(csv_per_exchange())
Use Cases¶
1. Price Comparison¶
Monitor price differences for trading decisions:
2. Arbitrage Trading¶
Detect profitable price differences:
3. Data Redundancy¶
Multiple sources for reliability:
# If one exchange goes down, others continue
async for data in merge_streams(binance, okx, kraken):
save_to_database(data)
4. Volume Analysis¶
Compare trading volumes across exchanges:
volumes = {}
async for data in merge_streams(binance, okx):
volumes[data.source] = volumes.get(data.source, 0) + data.volume
5. Exchange Health Monitoring¶
Track exchange uptime and data quality:
Performance Considerations¶
Memory Usage¶
Each runner maintains its own connection and buffers:
# 3 runners = 3 WebSocket connections
binance = sf.BinanceRunner(...)
okx = sf.OKXRunner(...)
kraken = sf.KrakenRunner(...)
Network Load¶
Each exchange has its own WebSocket:
# Each runner connects independently
# Total connections = number of runners
async for data in merge_streams(r1, r2, r3): # 3 connections
pass
Data Rate¶
Merged stream combines data rates:
# Binance: ~60 updates/min for 1m candles
# OKX: ~60 updates/min for 1m candles
# Merged: ~120 updates/min total
Best Practices¶
1. Use Consistent Timeframes¶
# ✓ Good - same timeframe
binance = sf.BinanceRunner(DataInput(timeframe="1m", ...))
okx = sf.OKXRunner(DataInput(timeframe="1m", ...))
# ⚠️ Mixed - harder to compare
binance = sf.BinanceRunner(DataInput(timeframe="1m", ...))
okx = sf.OKXRunner(DataInput(timeframe="5m", ...))
2. Normalize Symbols¶
def normalize_symbol(symbol: str) -> str:
"""Convert all to common format"""
return symbol.replace("-", "").replace("/", "")
3. Handle Exchange-Specific Features¶
# Some exchanges may have features others don't
if data.source == "Binance":
# Binance-specific logic
pass
elif data.source == "OKX":
# OKX-specific logic
pass
4. Monitor Connection Health¶
from collections import defaultdict
from datetime import datetime, timedelta
last_update = defaultdict(lambda: datetime.now())
async for data in merge_streams(binance, okx, kraken):
last_update[data.source] = datetime.now()
# Check for stale connections
for exchange, last_time in last_update.items():
if datetime.now() - last_time > timedelta(minutes=5):
print(f"⚠️ {exchange} hasn't sent data in 5 minutes!")
Troubleshooting¶
Different Update Rates¶
Problem: Exchanges update at different rates.
Solution: This is normal. Exchanges have different internal mechanisms.
Symbol Format Confusion¶
Problem: Same asset has different symbols.
Solution: Normalize symbols for comparison:
Missing Data from One Exchange¶
Problem: One exchange stops sending data.
Solution: Add timeout detection:
from datetime import datetime, timedelta
last_seen = {}
async for data in merge_streams(binance, okx):
last_seen[data.source] = datetime.now()
# Alert if an exchange goes silent
for exchange in ["Binance", "OKX"]:
if exchange not in last_seen:
continue
if datetime.now() - last_seen[exchange] > timedelta(minutes=2):
print(f"⚠️ {exchange} connection may be down!")
Next Steps¶
- Examples → - See multi-exchange examples
- API Reference → - Complete runner documentation
- Exchange Guides → - Exchange-specific details