Skip to content

Emitters API

Auto-generated API documentation for StreamForge emitters.


Logger

Logger

Logger(prefix='')

Bases: DataEmitter


CSVEmitter

CSVEmitter

CSVEmitter(source: str, symbol: str, timeframe: str, name: str = EMITTER, file_path: Optional[str] = None, transformer_function: Callable[[Dict[str, Any]], dict] = None)

Bases: DataEmitter


PostgresEmitter

PostgresEmitter

PostgresEmitter(name: str = EMITTER, url: Optional[str] = None, host: Optional[str] = None, dbname: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, port: int = 5432, upsert: bool = False, index_elements: Optional[list[str]] = None, transformer: Callable[[Dict[str, Any]], dict] = None)

Bases: DataEmitter

set_model

set_model(model: type[Base], inplace=False)

on_conflict

on_conflict(index_elements: list[str], inplace=False)

set_transformer

set_transformer(transformer_function: Callable[[Dict[str, Any]], dict], inplace=False)

connect async

connect()

Initializes the SQLAlchemy async engine and sessionmaker.

emit async

emit(data: BaseModel)

close async

close()

Disposes the engine, closing the connection pool.


KafkaEmitter

KafkaEmitter

KafkaEmitter(topic: str, bootstrap_servers: str = 'localhost:9092', key_serializer: Optional[callable] = None, value_serializer: Optional[callable] = None, name: str = EMITTER)

Bases: DataEmitter

set_model

set_model(model: Type[BaseModel])

Set the canonical Pydantic model for this emitter.

register_map

register_map(columns_map: Dict[str, str])

Optional field mapping for output.

connect async

connect()

Initialize the Kafka producer.

emit async

emit(data: BaseModel, key: Optional[str] = None)

Send a single message to Kafka.

close async

close()

Close the Kafka producer.


DataEmitter (Base Class)

DataEmitter

Bases: ABC

Abstract base class for data output emitters.

DataEmitter defines the interface for outputting processed market data to various destinations. Implementations include CSV files, PostgreSQL databases, Kafka streams, and logging outputs.

Attributes:

Name Type Description
EMITTER_TYPE

Category of emitter (e.g., 'database', 'file writer', 'stream')

EMITTER

Specific emitter name (e.g., 'postgresql', 'csv', 'kafka')

DATA_MODEL

Optional data model for structured outputs

Examples:

>>> # Use concrete implementations
>>> emitter = CSVEmitter(file_path="btc_data.csv")
>>> await emitter.connect()
>>> await emitter.emit(kline_data)
>>> await emitter.close()
Note

Subclasses must implement all abstract methods: register_map, set_model, emit, connect, and close.


Common Usage

Logger

logger = sf.Logger(prefix="Binance")
runner.register_emitter(logger)

CSV

csv = sf.CSVEmitter(
    source="Binance",
    symbol="BTCUSDT",
    timeframe="1m",
    file_path="data.csv"
)
runner.register_emitter(csv)

PostgreSQL

postgres = (sf.PostgresEmitter(host="localhost", dbname="crypto")
    .set_model(KlineTable)
    .on_conflict(["source", "symbol", "timeframe", "open_ts"]))
runner.register_emitter(postgres)

Kafka

kafka = sf.KafkaEmitter(
    bootstrap_servers="localhost:9092",
    topic="crypto"
)
runner.register_emitter(kafka)

Back to API Reference →