Skip to content

Runners API

Auto-generated API documentation for StreamForge runners.


BinanceRunner

BinanceRunner

BinanceRunner(stream_input: DataInput, websocket_client=BinanceWS, processor_class=BinanceProcessor, source='Binance', active_warmup=True, emit_warmup=False, emit_only_closed_candles=True, verbose=False)

Bases: Runner

Runner for Binance exchange data ingestion.

BinanceRunner provides a simple interface for streaming real-time market data from Binance, including kline/candlestick data, with support for multiple timeframe aggregation and various output formats.

Parameters:

Name Type Description Default
stream_input DataInput

DataInput configuration specifying what to stream

required
websocket_client

WebSocket handler class (default: BinanceWS)

BinanceWS
processor_class

Data processor class (default: BinanceProcessor)

BinanceProcessor
source

Exchange name (default: "Binance")

'Binance'
active_warmup

Whether to fetch historical data on startup (default: True)

True
emit_warmup

Whether to emit warmup data to outputs (default: False)

False
emit_only_closed_candles

Only emit completed candles (default: True)

True
verbose

Enable verbose logging (default: False)

False

Examples:

Basic usage:

>>> import asyncio
>>> from streamforge.ingestion.binance.runner import BinanceRunner
>>> from streamforge.base.stream_input import DataInput
>>> from streamforge.base.emitters import CSVEmitter
>>> 
>>> async def main():
...     # Configure stream
...     stream = DataInput(
...         type="kline",
...         symbols=["BTCUSDT"],
...         timeframe="1m"
...     )
...     
...     # Create runner
...     runner = BinanceRunner(stream_input=stream)
...     
...     # Add CSV output
...     runner.register_emitter(CSVEmitter(file_path="btc_1m.csv"))
...     
...     # Start streaming
...     await runner.run()
>>> 
>>> asyncio.run(main())

With aggregation:

>>> stream = DataInput(
...     type="kline",
...     symbols=["BTCUSDT", "ETHUSDT"],
...     timeframe="1m",
...     aggregate_list=["5m", "15m", "1h"]
... )
>>> runner = BinanceRunner(stream_input=stream, active_warmup=True)
>>> await runner.run()
Note

Binance WebSocket streams are free and don't require API keys for public market data. Rate limits apply for API calls during warmup.


KrakenRunner

KrakenRunner

KrakenRunner(stream_input: DataInput, websocket_client=KrakenWS, processor_class=KrakenProcessor, source='Kraken', active_warmup=True, emit_warmup=False, verbose=False)

Bases: Runner

Runner for Kraken exchange data ingestion.

KrakenRunner provides an interface for streaming real-time OHLC data from Kraken exchange, with support for timeframe aggregation and multiple output formats.

Parameters:

Name Type Description Default
stream_input DataInput

DataInput configuration specifying what to stream

required
websocket_client

WebSocket handler class (default: KrakenWS)

KrakenWS
processor_class

Data processor class (default: KrakenProcessor)

KrakenProcessor
source

Exchange name (default: "Kraken")

'Kraken'
active_warmup

Whether to fetch historical data on startup (default: True)

True
emit_warmup

Whether to emit warmup data to outputs (default: False)

False
verbose

Enable verbose logging (default: False)

False

Examples:

>>> import asyncio
>>> from streamforge.ingestion.kraken.runner import KrakenRunner
>>> from streamforge.base.stream_input import DataInput
>>> 
>>> async def main():
...     stream = DataInput(
...         type="ohlc",
...         symbols=["BTC/USD"],
...         timeframe="1"  # Kraken uses minutes as integers
...     )
...     runner = KrakenRunner(stream_input=stream)
...     await runner.run()
>>> 
>>> asyncio.run(main())
Note

Kraken uses different symbol formats (BTC/USD vs BTCUSDT) and timeframe representations (integer minutes vs strings like '1m').


OKXRunner

OKXRunner

OKXRunner(stream_input: DataInput, websocket_client=OkxWS, processor_class=OkxProcessor, source='OKX', active_warmup=True, emit_warmup=False, verbose=False)

Bases: Runner

Runner for OKX exchange data ingestion.

OKXRunner provides an interface for streaming real-time candlestick data from OKX exchange, with support for timeframe aggregation and multiple output formats.

Parameters:

Name Type Description Default
stream_input DataInput

DataInput configuration specifying what to stream

required
websocket_client

WebSocket handler class (default: OkxWS)

OkxWS
processor_class

Data processor class (default: OkxProcessor)

OkxProcessor
source

Exchange name (default: "OKX")

'OKX'
active_warmup

Whether to fetch historical data on startup (default: True)

True
emit_warmup

Whether to emit warmup data to outputs (default: False)

False
verbose

Enable verbose logging (default: False)

False

Examples:

>>> import asyncio
>>> from streamforge.ingestion.okx.runner import OKXRunner
>>> from streamforge.base.stream_input import DataInput
>>> 
>>> async def main():
...     stream = DataInput(
...         type="candle",
...         symbols=["BTC-USDT"],
...         timeframe="1m"
...     )
...     runner = OKXRunner(stream_input=stream)
...     await runner.run()
>>> 
>>> asyncio.run(main())
Note

OKX uses hyphenated symbol format (BTC-USDT) and supports various timeframe intervals for candlestick data.


Common Usage

Creating a Runner

import streamforge as sf

runner = sf.BinanceRunner(
    stream_input=sf.DataInput(
        type="kline",
        symbols=["BTCUSDT"],
        timeframe="1m"
    ),
    active_warmup=False,
    emit_warmup=False
)

Registering Emitters

runner.register_emitter(sf.Logger())
runner.register_emitter(csv_emitter)
runner.register_emitter(postgres_emitter)

Running

# Continuous streaming
await runner.run()

# Manual iteration
async for data in runner.stream():
    print(data)

Back to API Reference →