StreamForge¶
Real-time cryptocurrency and financial data ingestion made simple
What is StreamForge?¶
StreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.
Key Features¶
- Real-time WebSocket Streaming - Live market data from multiple exchanges
- Multiple Output Formats - CSV, PostgreSQL, Kafka
- Multi-Exchange Support - Binance, Kraken, OKX with unified API
- Timeframe Aggregation - Automatic aggregation to higher timeframes
- Historical Backfilling - Load months of historical data effortlessly
- Data Transformation - Built-in transformers for custom data processing
- Stream Merging - Combine multiple exchanges into unified streams
- Type-Safe - Full type hints and Pydantic validation
Quick Start¶
Install StreamForge in seconds:
Stream Bitcoin data in 3 lines:
import asyncio
import streamforge as sf
async def main():
# Configure what to stream
stream = sf.DataInput(type="kline", symbols=["BTCUSDT"], timeframe="1m")
# Create runner and add logger
runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(sf.Logger(prefix="Binance"))
# Start streaming!
await runner.run()
asyncio.run(main())
Output:
2025-10-14 16:21:32 - INFO - Aggregation Deactivated
2025-10-14 16:21:33 - INFO - Binance | Subscribed Successful to params: {'method': 'SUBSCRIBE', 'params': ['btcusdt@kline_1m'], 'id': 999} | Websocket Input: DataInput(type='kline', symbols=['BTCUSDT'], timeframe='1m', aggregate_list=[]).
2025-10-14 16:21:33 - INFO - Binance | Websocket Connection established successfully!
2025-10-14 16:22:00 - INFO - Binance | Data Received: source='binance' symbol='BTCUSDT' timeframe='1m' open_ts=1760469660 end_ts=1760469719 open=113329.98 high=113411.45 low=113329.98 close=113383.03 volume=11.95122 quote_volume=1355147.9103971 vwap=None n_trades=5228 is_closed=True
2025-10-14 16:22:00 - INFO - Binance | Received Data | source='binance' symbol='BTCUSDT' timeframe='1m' open_ts=1760469660 end_ts=1760469719 open=113329.98 high=113411.45 low=113329.98 close=113383.03 volume=11.95122 quote_volume=1355147.9103971 vwap=None n_trades=5228 is_closed=True
Supported Exchanges¶
Use Cases¶
Stream to Database¶
Continuously save market data to PostgreSQL:
emitter = (sf.PostgresEmitter(host="localhost", dbname="crypto")
.set_model(KlineTable)
.on_conflict(["source", "symbol", "timeframe", "open_ts"]))
runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(emitter)
await runner.run()
Multi-Timeframe Aggregation¶
Stream 1-minute data and auto-aggregate to higher timeframes:
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m",
aggregate_list=["5m", "15m", "1h", "4h"] # Auto-aggregate!
)
runner = sf.BinanceRunner(stream_input=stream, active_warmup=True)
Historical Backfilling¶
Load months of historical data:
backfiller = sf.BinanceBackfilling(
symbol="BTCUSDT",
timeframe="1m",
from_date="2024-01-01",
to_date="2025-01-01"
)
backfiller.register_emitter(postgres_emitter)
backfiller.run()
Multi-Exchange Comparison¶
Merge streams from multiple exchanges:
from streamforge.merge_stream import merge_streams
binance_runner = sf.BinanceRunner(stream_input=binance_stream)
okx_runner = sf.OKXRunner(stream_input=okx_stream)
async for data in merge_streams(binance_runner, okx_runner):
print(f"{data.source} | {data.symbol} | ${data.close:,.2f}")
Architecture¶
StreamForge uses a simple, composable architecture:
graph LR
A[Exchange WebSocket] --> B[Runner]
B --> C[Normalizer]
C --> D[Processor]
D --> E[Aggregator]
E --> F[Transformer]
F --> G1[CSV Emitter]
F --> G2[PostgreSQL Emitter]
F --> G3[Kafka Emitter]
F --> G4[Custom Emitter]
- Runner - Manages WebSocket connections and coordinates data flow
- Normalizer - Standardizes data format across exchanges
- Processor - Buffers and processes incoming data
- Aggregator - Aggregates to higher timeframes (optional)
- Transformer - Applies custom transformations (optional)
- Emitter - Outputs data to your destination(s)
Why StreamForge?¶
Simple & Intuitive
Clean, Pythonic API that's easy to learn. Get started in minutes, not hours.
Production Ready
Async-first architecture handles high-frequency data streams efficiently.
Extensible
Create custom emitters, transformers, and processors. Built for customization.
Type Safe
Full type hints and Pydantic validation catch errors before runtime.
What's Next?¶
-
Get StreamForge installed and ready to use
-
Your first stream in 5 minutes
-
Deep dive into features and capabilities
-
Copy-paste examples for common tasks
-
Complete API documentation
Community & Support¶
- Documentation: You're reading it!
- GitHub: Issues & Discussions
- PyPI: Package Repository
- License: MIT License
Requirements¶
- Python 3.8+
- asyncio support
- Modern Python async libraries (aiohttp, websockets)
Ready to start? Install StreamForge →