Runners API¶
Auto-generated API documentation for StreamForge runners.
BinanceRunner¶
BinanceRunner
¶
BinanceRunner(stream_input: DataInput, websocket_client=BinanceWS, processor_class=BinanceProcessor, source='Binance', active_warmup=True, emit_warmup=False, emit_only_closed_candles=True, verbose=False)
Bases: Runner
Runner for Binance exchange data ingestion.
BinanceRunner provides a simple interface for streaming real-time market data from Binance, including kline/candlestick data, with support for multiple timeframe aggregation and various output formats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_input
|
DataInput
|
DataInput configuration specifying what to stream |
required |
websocket_client
|
WebSocket handler class (default: BinanceWS) |
BinanceWS
|
|
processor_class
|
Data processor class (default: BinanceProcessor) |
BinanceProcessor
|
|
source
|
Exchange name (default: "Binance") |
'Binance'
|
|
active_warmup
|
Whether to fetch historical data on startup (default: True) |
True
|
|
emit_warmup
|
Whether to emit warmup data to outputs (default: False) |
False
|
|
emit_only_closed_candles
|
Only emit completed candles (default: True) |
True
|
|
verbose
|
Enable verbose logging (default: False) |
False
|
Examples:
Basic usage:
>>> import asyncio
>>> from streamforge.ingestion.binance.runner import BinanceRunner
>>> from streamforge.base.stream_input import DataInput
>>> from streamforge.base.emitters import CSVEmitter
>>>
>>> async def main():
... # Configure stream
... stream = DataInput(
... type="kline",
... symbols=["BTCUSDT"],
... timeframe="1m"
... )
...
... # Create runner
... runner = BinanceRunner(stream_input=stream)
...
... # Add CSV output
... runner.register_emitter(CSVEmitter(file_path="btc_1m.csv"))
...
... # Start streaming
... await runner.run()
>>>
>>> asyncio.run(main())
With aggregation:
>>> stream = DataInput(
... type="kline",
... symbols=["BTCUSDT", "ETHUSDT"],
... timeframe="1m",
... aggregate_list=["5m", "15m", "1h"]
... )
>>> runner = BinanceRunner(stream_input=stream, active_warmup=True)
>>> await runner.run()
Note
Binance WebSocket streams are free and don't require API keys for public market data. Rate limits apply for API calls during warmup.
KrakenRunner¶
KrakenRunner
¶
KrakenRunner(stream_input: DataInput, websocket_client=KrakenWS, processor_class=KrakenProcessor, source='Kraken', active_warmup=True, emit_warmup=False, verbose=False)
Bases: Runner
Runner for Kraken exchange data ingestion.
KrakenRunner provides an interface for streaming real-time OHLC data from Kraken exchange, with support for timeframe aggregation and multiple output formats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_input
|
DataInput
|
DataInput configuration specifying what to stream |
required |
websocket_client
|
WebSocket handler class (default: KrakenWS) |
KrakenWS
|
|
processor_class
|
Data processor class (default: KrakenProcessor) |
KrakenProcessor
|
|
source
|
Exchange name (default: "Kraken") |
'Kraken'
|
|
active_warmup
|
Whether to fetch historical data on startup (default: True) |
True
|
|
emit_warmup
|
Whether to emit warmup data to outputs (default: False) |
False
|
|
verbose
|
Enable verbose logging (default: False) |
False
|
Examples:
>>> import asyncio
>>> from streamforge.ingestion.kraken.runner import KrakenRunner
>>> from streamforge.base.stream_input import DataInput
>>>
>>> async def main():
... stream = DataInput(
... type="ohlc",
... symbols=["BTC/USD"],
... timeframe="1" # Kraken uses minutes as integers
... )
... runner = KrakenRunner(stream_input=stream)
... await runner.run()
>>>
>>> asyncio.run(main())
Note
Kraken uses different symbol formats (BTC/USD vs BTCUSDT) and timeframe representations (integer minutes vs strings like '1m').
OKXRunner¶
OKXRunner
¶
OKXRunner(stream_input: DataInput, websocket_client=OkxWS, processor_class=OkxProcessor, source='OKX', active_warmup=True, emit_warmup=False, verbose=False)
Bases: Runner
Runner for OKX exchange data ingestion.
OKXRunner provides an interface for streaming real-time candlestick data from OKX exchange, with support for timeframe aggregation and multiple output formats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_input
|
DataInput
|
DataInput configuration specifying what to stream |
required |
websocket_client
|
WebSocket handler class (default: OkxWS) |
OkxWS
|
|
processor_class
|
Data processor class (default: OkxProcessor) |
OkxProcessor
|
|
source
|
Exchange name (default: "OKX") |
'OKX'
|
|
active_warmup
|
Whether to fetch historical data on startup (default: True) |
True
|
|
emit_warmup
|
Whether to emit warmup data to outputs (default: False) |
False
|
|
verbose
|
Enable verbose logging (default: False) |
False
|
Examples:
>>> import asyncio
>>> from streamforge.ingestion.okx.runner import OKXRunner
>>> from streamforge.base.stream_input import DataInput
>>>
>>> async def main():
... stream = DataInput(
... type="candle",
... symbols=["BTC-USDT"],
... timeframe="1m"
... )
... runner = OKXRunner(stream_input=stream)
... await runner.run()
>>>
>>> asyncio.run(main())
Note
OKX uses hyphenated symbol format (BTC-USDT) and supports various timeframe intervals for candlestick data.
Common Usage¶
Creating a Runner¶
import streamforge as sf
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
),
active_warmup=False,
emit_warmup=False
)
Registering Emitters¶
runner.register_emitter(sf.Logger())
runner.register_emitter(csv_emitter)
runner.register_emitter(postgres_emitter)
Running¶
# Continuous streaming
await runner.run()
# Manual iteration
async for data in runner.stream():
print(data)