Skip to content

StreamForge

Real-time cryptocurrency and financial data ingestion made simple

PyPI version Python Support License: MIT

Get Started View Examples


What is StreamForge?

StreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.

Key Features

  • Real-time WebSocket Streaming - Live market data from multiple exchanges
  • Multiple Output Formats - CSV, PostgreSQL, Kafka
  • Multi-Exchange Support - Binance, Kraken, OKX with unified API
  • Timeframe Aggregation - Automatic aggregation to higher timeframes
  • Historical Backfilling - Load months of historical data effortlessly
  • Data Transformation - Built-in transformers for custom data processing
  • Stream Merging - Combine multiple exchanges into unified streams
  • Type-Safe - Full type hints and Pydantic validation

Quick Start

Install StreamForge in seconds:

pip install streamforge

Stream Bitcoin data in 3 lines:

import asyncio
import streamforge as sf

async def main():
    # Configure what to stream
    stream = sf.DataInput(type="kline", symbols=["BTCUSDT"], timeframe="1m")

    # Create runner and add logger
    runner = sf.BinanceRunner(stream_input=stream)
    runner.register_emitter(sf.Logger(prefix="Binance"))

    # Start streaming!
    await runner.run()

asyncio.run(main())

Output:

2025-10-14 16:21:32 - INFO - Aggregation Deactivated
2025-10-14 16:21:33 - INFO - Binance    | Subscribed Successful to params: {'method': 'SUBSCRIBE', 'params': ['btcusdt@kline_1m'], 'id': 999} | Websocket Input: DataInput(type='kline', symbols=['BTCUSDT'], timeframe='1m', aggregate_list=[]).
2025-10-14 16:21:33 - INFO - Binance    | Websocket Connection established successfully!
2025-10-14 16:22:00 - INFO - Binance    | Data Received: source='binance' symbol='BTCUSDT' timeframe='1m' open_ts=1760469660 end_ts=1760469719 open=113329.98 high=113411.45 low=113329.98 close=113383.03 volume=11.95122 quote_volume=1355147.9103971 vwap=None n_trades=5228 is_closed=True
2025-10-14 16:22:00 - INFO - Binance | Received Data | source='binance' symbol='BTCUSDT' timeframe='1m' open_ts=1760469660 end_ts=1760469719 open=113329.98 high=113411.45 low=113329.98 close=113383.03 volume=11.95122 quote_volume=1355147.9103971 vwap=None n_trades=5228 is_closed=True

Learn More →


Supported Exchanges

  • Binance


    World's largest cryptocurrency exchange

    • Kline/OHLC data
    • Real-time WebSocket
    • Historical backfilling

    Guide

  • Kraken


    Trusted US-based exchange

    • OHLC data streams
    • WebSocket integration

    Guide

  • OKX


    Leading global crypto exchange

    • Candlestick data
    • Real-time streaming
    • Historical support

    Guide


Use Cases

Stream to Database

Continuously save market data to PostgreSQL:

emitter = (sf.PostgresEmitter(host="localhost", dbname="crypto")
    .set_model(KlineTable)
    .on_conflict(["source", "symbol", "timeframe", "open_ts"]))

runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(emitter)
await runner.run()

PostgreSQL Guide →

Multi-Timeframe Aggregation

Stream 1-minute data and auto-aggregate to higher timeframes:

stream = sf.DataInput(
    type="kline",
    symbols=["BTCUSDT"],
    timeframe="1m",
    aggregate_list=["5m", "15m", "1h", "4h"]  # Auto-aggregate!
)

runner = sf.BinanceRunner(stream_input=stream, active_warmup=True)

Aggregation Guide →

Historical Backfilling

Load months of historical data:

backfiller = sf.BinanceBackfilling(
    symbol="BTCUSDT",
    timeframe="1m",
    from_date="2024-01-01",
    to_date="2025-01-01"
)

backfiller.register_emitter(postgres_emitter)
backfiller.run()

Backfilling Guide →

Multi-Exchange Comparison

Merge streams from multiple exchanges:

from streamforge.merge_stream import merge_streams

binance_runner = sf.BinanceRunner(stream_input=binance_stream)
okx_runner = sf.OKXRunner(stream_input=okx_stream)

async for data in merge_streams(binance_runner, okx_runner):
    print(f"{data.source} | {data.symbol} | ${data.close:,.2f}")

Multi-Exchange Guide →


Architecture

StreamForge uses a simple, composable architecture:

graph LR
    A[Exchange WebSocket] --> B[Runner]
    B --> C[Normalizer]
    C --> D[Processor]
    D --> E[Aggregator]
    E --> F[Transformer]
    F --> G1[CSV Emitter]
    F --> G2[PostgreSQL Emitter]
    F --> G3[Kafka Emitter]
    F --> G4[Custom Emitter]
  1. Runner - Manages WebSocket connections and coordinates data flow
  2. Normalizer - Standardizes data format across exchanges
  3. Processor - Buffers and processes incoming data
  4. Aggregator - Aggregates to higher timeframes (optional)
  5. Transformer - Applies custom transformations (optional)
  6. Emitter - Outputs data to your destination(s)

Core Concepts →


Why StreamForge?

Simple & Intuitive

Clean, Pythonic API that's easy to learn. Get started in minutes, not hours.

Production Ready

Async-first architecture handles high-frequency data streams efficiently.

Extensible

Create custom emitters, transformers, and processors. Built for customization.

Type Safe

Full type hints and Pydantic validation catch errors before runtime.


What's Next?


Community & Support


Requirements

  • Python 3.8+
  • asyncio support
  • Modern Python async libraries (aiohttp, websockets)

Ready to start? Install StreamForge →