Quick Start¶
Get streaming in 5 minutes! This guide walks you through your first StreamForge application.
Prerequisites¶
Make sure you have StreamForge installed:
Your First Stream¶
Let's create a simple script that streams Bitcoin price data from Binance.
Step 1: Create a Python File¶
Create a new file called my_first_stream.py
:
import asyncio
import streamforge as sf
async def main():
# Configure what to stream
stream = sf.DataInput(
type="kline", # Candlestick/OHLC data
symbols=["BTCUSDT"], # Bitcoin/USDT pair
timeframe="1m" # 1-minute candles
)
# Create Binance runner
runner = sf.BinanceRunner(stream_input=stream)
# Add logger to see output
runner.register_emitter(sf.Logger(prefix="Binance"))
# Start streaming!
await runner.run()
if __name__ == "__main__":
asyncio.run(main())
Step 2: Run It¶
Step 3: See the Magic!¶
You'll see live Bitcoin data streaming to your console:
2025-10-14 16:21:32 - INFO - Aggregation Deactivated
2025-10-14 16:21:33 - INFO - Binance | Subscribed Successful to params: {'method': 'SUBSCRIBE', 'params': ['btcusdt@kline_1m'], 'id': 999} | Websocket Input: DataInput(type='kline', symbols=['BTCUSDT'], timeframe='1m', aggregate_list=[]).
2025-10-14 16:21:33 - INFO - Binance | Websocket Connection established successfully!
2025-10-14 16:22:00 - INFO - Binance | Data Received: source='binance' symbol='BTCUSDT' timeframe='1m' open_ts=1760469660 end_ts=1760469719 open=113329.98 high=113411.45 low=113329.98 close=113383.03 volume=11.95122 quote_volume=1355147.9103971 vwap=None n_trades=5228 is_closed=True
2025-10-14 16:22:00 - INFO - Binance | Received Data | source='binance' symbol='BTCUSDT' timeframe='1m' open_ts=1760469660 end_ts=1760469719 open=113329.98 high=113411.45 low=113329.98 close=113383.03 volume=11.95122 quote_volume=1355147.9103971 vwap=None n_trades=5228 is_closed=True
...
Press Ctrl+C
to stop.
Congratulations!
You just streamed live cryptocurrency data! 🎉
Understanding the Code¶
Let's break down what each part does:
1. DataInput - Configure What to Stream¶
stream = sf.DataInput(
type="kline", # What type of data
symbols=["BTCUSDT"], # Which trading pairs
timeframe="1m" # Time interval
)
DataInput tells StreamForge:
- type: What kind of data (
kline
,trade
,depth
, etc.) - symbols: Which cryptocurrencies to track
- timeframe: For OHLC data, the candle interval
2. Runner - Connect to Exchange¶
The Runner manages the WebSocket connection and data flow from the exchange.
Each exchange has its own runner:
BinanceRunner
- For BinanceKrakenRunner
- For KrakenOKXRunner
- For OKX
3. Emitter - Output the Data¶
Emitters determine where data goes:
Logger
- Print to console (debugging)CSVEmitter
- Save to CSV filePostgresEmitter
- Save to databaseKafkaEmitter
- Stream to Kafka
You can register multiple emitters!
4. Run - Start Streaming¶
This starts the WebSocket connection and begins streaming data.
Save to CSV¶
Let's modify the script to save data to a CSV file:
import asyncio
import streamforge as sf
async def main():
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
runner = sf.BinanceRunner(stream_input=stream)
# Add CSV emitter
csv_emitter = sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="btc_data.csv"
)
runner.register_emitter(csv_emitter)
await runner.run()
if __name__ == "__main__":
asyncio.run(main())
Now data is saved to btc_data.csv
!
Multiple Symbols¶
Stream multiple cryptocurrencies at once:
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"], # 3 symbols!
timeframe="1m"
)
runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(kafka_emitter)
await runner.run()
Multiple Emitters¶
Send data to multiple destinations:
import asyncio
import streamforge as sf
async def main():
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
runner = sf.BinanceRunner(stream_input=stream)
# Register multiple emitters
runner.register_emitter(sf.Logger(prefix="Binance"))
runner.register_emitter(sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="btc_data.csv"
))
# Data goes to BOTH logger AND CSV!
await runner.run()
if __name__ == "__main__":
asyncio.run(main())
Different Timeframes¶
Change the candle interval:
# 5-minute candles
stream = sf.DataInput(type="kline", symbols=["BTCUSDT"], timeframe="5m")
# 1-hour candles
stream = sf.DataInput(type="kline", symbols=["BTCUSDT"], timeframe="1h")
# Daily candles
stream = sf.DataInput(type="kline", symbols=["BTCUSDT"], timeframe="1d")
Supported timeframes: 1m
, 5m
, 15m
, 30m
, 1h
, 4h
, 1d
Different Exchanges¶
StreamForge supports multiple exchanges with the same API:
Symbol Formats
Each exchange uses different symbol formats:
- Binance:
BTCUSDT
- OKX:
BTC-USDT
- Kraken:
BTC/USD
See Exchange Guides for details.
Common Patterns¶
Pattern 1: Debug with Logger¶
Use Logger
to see what data looks like:
Pattern 2: Save Everything to CSV¶
Simple data collection:
csv = sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="all_data.csv"
)
runner.register_emitter(csv)
Pattern 3: Logger + CSV¶
Debug while saving:
Stopping Gracefully¶
StreamForge runs forever until interrupted. To stop:
- Keyboard: Press
Ctrl+C
- Programmatically: Use
asyncio
timeout
import asyncio
async def main():
runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(sf.Logger())
# Run for 60 seconds
try:
await asyncio.wait_for(runner.run(), timeout=60)
except asyncio.TimeoutError:
print("Done!")
asyncio.run(main())
Troubleshooting¶
Connection Errors¶
If you can't connect:
- Check your internet connection
- Verify the exchange is not down
- Try a different symbol
No Data Appearing¶
If nothing prints:
- Make sure you registered an emitter
- Check the symbol format for your exchange
- Verify the timeframe is valid
Import Errors¶
If imports fail:
What's Next?¶
Now that you've created your first stream:
-
Understand how StreamForge works
-
Save to databases, Kafka, and more
-
See more complete examples
-
Learn exchange-specific details
Complete Example¶
Here's a complete, production-ready example:
"""
Production-ready StreamForge example
Streams BTC and ETH data to both console and CSV
"""
import asyncio
import streamforge as sf
from datetime import datetime
async def main():
print(f"Starting StreamForge at {datetime.now()}")
print("Press Ctrl+C to stop\n")
# Configure stream
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT"],
timeframe="1m"
)
# Create runner
runner = sf.BinanceRunner(stream_input=stream)
# Add emitters
runner.register_emitter(sf.Logger(prefix="💰 Binance"))
runner.register_emitter(sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="binance_data.csv"
))
# Start streaming
try:
await runner.run()
except KeyboardInterrupt:
print("\n\nStopped by user")
except Exception as e:
print(f"\n\nError: {e}")
if __name__ == "__main__":
asyncio.run(main())
Save and run:
Happy streaming! 🚀