Core Concepts¶
Understanding StreamForge's architecture will help you build robust, maintainable data pipelines.
Architecture Overview¶
StreamForge follows a simple data flow:
graph LR
A[Exchange API] --> B[Runner]
B --> C[WebSocket Handler]
C --> D[Normalizer]
D --> E[Processor]
E --> F[Aggregator]
F --> G[Transformer]
G --> H1[Emitter 1]
G --> H2[Emitter 2]
G --> H3[Emitter N]
Let's understand each component:
1. DataInput¶
DataInput is your configuration object. It tells StreamForge what data to stream.
import streamforge as sf
stream = sf.DataInput(
type="kline", # Data type
symbols=["BTCUSDT", "ETHUSDT"], # Trading pairs
timeframe="1m", # Candle interval
aggregate_list=["5m", "15m", "1h"] # Optional aggregation
)
Attributes¶
| Attribute | Type | Description | Example |
|---|---|---|---|
type |
str |
Data stream type | "kline", "candle", "ohlc" |
symbols |
list[str] |
Trading pairs to track | ["BTCUSDT"] |
timeframe |
str |
Candle interval | "1m", "5m", "1h" |
aggregate_list |
list[str] |
Higher timeframes (optional) | ["5m", "1h"] |
Supported Types¶
Different exchanges use different names:
| Exchange | Type Name | Description |
|---|---|---|
| Binance | kline |
Candlestick/OHLC data |
| OKX | candle |
Candlestick/OHLC data |
| Kraken | ohlc |
Candlestick/OHLC data |
Symbol Formats¶
Each exchange has its own symbol format:
2. Runner¶
Runner manages the entire data pipeline for a specific exchange.
Responsibilities¶
- Connection Management - WebSocket lifecycle
- Data Flow - Coordinates all components
- Error Handling - Reconnection logic
- Emitter Registry - Manages output destinations
Available Runners¶
# Binance
runner = sf.BinanceRunner(stream_input=stream)
# Kraken
runner = sf.KrakenRunner(stream_input=stream)
# OKX
runner = sf.OKXRunner(stream_input=stream)
Configuration Options¶
runner = sf.BinanceRunner(
stream_input=stream,
active_warmup=True, # Load today's data (used in aggregation)
emit_warmup=False # Don't emit warmup data
)
| Parameter | Type | Default | Description |
|---|---|---|---|
stream_input |
DataInput |
Required | Stream configuration |
active_warmup |
bool |
False |
Load historical data for context |
emit_warmup |
bool |
False |
Emit historical data too |
Warmup
Warmup loads historical data before streaming. Required for aggregation.
active_warmup=True- Load history for contextemit_warmup=True- Also emit historical data
3. Emitters¶
Emitters define where data goes. You can register multiple emitters.
Built-in Emitters¶
Logger¶
Print to console (debugging):
CSV¶
Save to CSV file:
csv = sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="data.csv"
)
runner.register_emitter(csv)
PostgreSQL¶
Save to database:
postgres = sf.PostgresEmitter(
host="localhost",
dbname="crypto",
user="postgres",
password="secret"
)
postgres.set_model(MyTable)
runner.register_emitter(postgres)
Kafka¶
Stream to Kafka:
kafka = sf.KafkaEmitter(
bootstrap_servers="localhost:9092",
topic="crypto-stream"
)
runner.register_emitter(kafka)
Multiple Emitters¶
Data flows to all registered emitters:
# Register 3 emitters
runner.register_emitter(logger)
runner.register_emitter(csv)
runner.register_emitter(postgres)
runner.register_emitter(kafka)
# Data goes to all 3!
await runner.run()
4. Processors¶
Processors handle incoming data internally. You typically don't interact with them directly.
What They Do¶
- Buffer incoming messages
- Parse WebSocket frames
- Convert to normalized format
- Aggregate timeframes
- Apply transformations
5. Normalizers¶
Normalizers convert exchange-specific formats to a unified Kline model.
Purpose¶
Different exchanges send different formats:
# Binance format
{
"e": "kline",
"s": "BTCUSDT",
"k": {
"t": 1735689600000,
"o": "43250.00",
"h": "43275.00",
...
}
}
# Normalized format (all exchanges)
Kline(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
open_ts=1735689600000,
open=43250.00,
high=43275.00,
...
)
Kline Data Model¶
from typing import List, Any, Optional
from pydantic import BaseModel, Field, AliasChoices, field_validator
class Kline(BaseModel):
# Metadata Variables
source: Optional[str] = Field(None, alias="source", validation_alias=AliasChoices("source"))
symbol: str = Field(alias="s", validation_alias=AliasChoices("s", "symbol", "ticker", "pair"))
timeframe: str = Field(alias="i", validation_alias=AliasChoices("i", "timeframe", "tf"))
# Time related Variables
open_ts: int = Field(alias="t", validation_alias=AliasChoices("t","interval_begin","ts"))
end_ts: int = Field(alias="T", validation_alias=AliasChoices("T","timestamp"))
# Price Variables
open: float = Field(alias="o", validation_alias=AliasChoices("o", "open", "open_price"))
high: float = Field(alias="h", validation_alias=AliasChoices("h", "high", "high_price"))
low: float = Field(alias="l", validation_alias=AliasChoices("l", "low", "low_price"))
close: float = Field(alias="c", validation_alias=AliasChoices("c", "close", "close_price"))
# Volume Variables
volume: float = Field(alias="v", validation_alias=AliasChoices("v", "volume", "base_volume"))
quote_volume: Optional[float] = Field(None, alias="q",validation_alias=AliasChoices("q", "quote_volume", "Quote asset volume"))
vwap: Optional[float] = Field(None, alias="vwap", validation_alias=AliasChoices("vwap", "volume_weighted_avg_price"))
n_trades: Optional[int] = Field(None, alias="n", validation_alias=AliasChoices("n", "count", "trades"))
is_closed: Optional[bool] = Field(None, alias="is_closed", validation_alias=AliasChoices("is_closed", "x"))
... # methods
6. Aggregators¶
Aggregators create higher timeframe candles from base candles.
How It Works¶
Stream 1-minute candles, get 5m/15m/1h automatically:
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m", # Base timeframe
aggregate_list=["5m", "15m", "1h"] # Aggregate to these
)
runner = sf.BinanceRunner(
stream_input=stream,
active_warmup=True # Required for aggregation!
)
Rules¶
- Base timeframe must be smaller than aggregate timeframes
- Aggregation preserves OHLCV integrity
- Requires warmup for accurate candles in aggregation
Example:
# ✓ Valid
timeframe="1m", aggregate_list=["5m", "1h"]
# ✗ Invalid
timeframe="1h", aggregate_list=["5m"] # Can't aggregate down
7. Transformers¶
Transformers modify data before it reaches emitters.
def my_transformer(data: dict) -> dict:
"""Add computed fields"""
return {
**data,
"price_range": data["high"] - data["low"],
"is_bullish": data["close"] > data["open"]
}
emitter.set_transformer(my_transformer)
Use Cases¶
- Rename fields
- Add computed columns
- Convert units
- Filter data
8. Backfilling¶
Backfilling loads historical data instead of streaming live data.
backfiller = sf.BinanceBackfilling(
symbol="BTCUSDT",
timeframe="1m",
from_date="2024-01-01",
to_date="2024-12-31"
)
backfiller.register_emitter(postgres_emitter)
backfiller.run() # Sync, not async!
Data Flow Example¶
Let's trace a complete data flow:
import asyncio
import streamforge as sf
async def main():
# 1. Configure what to stream
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
# 2. Create runner
runner = sf.BinanceRunner(stream_input=stream)
# 3. Add emitter
runner.register_emitter(some_emitter)
# 4. Start!
await runner.run()
asyncio.run(main())
What Happens¶
- Runner connects to Binance WebSocket
- WebSocket receives raw JSON message
- Normalizer converts to
Klineobject - Processor buffers and validates
- Emitter receives
Klineand logs it
Execution Modes¶
StreamForge has two execution modes:
1. Internal Run Mode (async) (.run())¶
For real-time WebSocket streaming internally:
async def main():
runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(emitter)
await runner.run() # Async
asyncio.run(main())
2. Streaming Mode (async) (.stream())¶
For real-time WebSocket streaming and handling data manually:
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
# No emitters registered - we'll handle data manually
async for kline in runner.stream():
# Custom logic for each data point
price = kline.close
# Example: Only log when price crosses certain thresholds
if price > 50000:
print(f"🚀 BTC above $50k! Current: ${price:,.2f}")
elif price < 30000:
print(f"📉 BTC below $30k! Current: ${price:,.2f}")
else:
print(f"📊 BTC: ${price:,.2f}")
3. Backfill Mode (sync)¶
For historical data loading:
def main():
backfiller = sf.BinanceBackfilling(
symbol="BTCUSDT",
timeframe="1m",
from_date="2024-01-01",
to_date="2024-12-31"
)
backfiller.register_emitter(emitter)
backfiller.run() # Sync
main()
Automatic Reconnection¶
WebSocket disconnects are handled:
# Runner automatically reconnects on:
# - Network errors
# - Exchange disconnects
# - Timeout errors
Advanced: Stream Iterator¶
For advanced use cases, use the stream iterator:
async def main():
runner = sf.BinanceRunner(stream_input=stream)
# Iterate over data manually
async for kline in runner.stream():
print(f"Received: {kline.symbol} @ ${kline.close}")
# Custom logic here
if kline.close > 50000:
print("BTC above $50k!")
This gives you full control over data handling.
Performance Tips¶
1. Use Appropriate Timeframes¶
Higher timeframes = less data:
2. Limit Symbols¶
Only stream what you need:
Key Takeaways¶
- DataInput - Configures what to stream
- Runner - Manages the pipeline
- Emitters - Define where data goes
- Normalizers - Unify exchange formats
- Aggregators - Create higher timeframes
- Transformers - Modify data
- Backfilling - Load historical data
What's Next?¶
-
Deep dive into output destinations
-
Learn data transformation
-
Multi-timeframe streaming
-
See it all in action