Emitters API¶
Auto-generated API documentation for StreamForge emitters.
Logger¶
CSVEmitter¶
CSVEmitter
¶
CSVEmitter(source: str, symbol: str, timeframe: str, name: str = EMITTER, file_path: Optional[str] = None, transformer_function: Callable[[Dict[str, Any]], dict] = None)
Bases: DataEmitter
PostgresEmitter¶
PostgresEmitter
¶
PostgresEmitter(name: str = EMITTER, url: Optional[str] = None, host: Optional[str] = None, dbname: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, port: int = 5432, upsert: bool = False, index_elements: Optional[list[str]] = None, transformer: Callable[[Dict[str, Any]], dict] = None)
Bases: DataEmitter
set_transformer
¶
KafkaEmitter¶
KafkaEmitter
¶
KafkaEmitter(topic: str, bootstrap_servers: str = 'localhost:9092', key_serializer: Optional[callable] = None, value_serializer: Optional[callable] = None, name: str = EMITTER)
Bases: DataEmitter
DataEmitter (Base Class)¶
DataEmitter
¶
Bases: ABC
Abstract base class for data output emitters.
DataEmitter defines the interface for outputting processed market data to various destinations. Implementations include CSV files, PostgreSQL databases, Kafka streams, and logging outputs.
Attributes:
| Name | Type | Description |
|---|---|---|
EMITTER_TYPE |
Category of emitter (e.g., 'database', 'file writer', 'stream') |
|
EMITTER |
Specific emitter name (e.g., 'postgresql', 'csv', 'kafka') |
|
DATA_MODEL |
Optional data model for structured outputs |
Examples:
>>> # Use concrete implementations
>>> emitter = CSVEmitter(file_path="btc_data.csv")
>>> await emitter.connect()
>>> await emitter.emit(kline_data)
>>> await emitter.close()
Note
Subclasses must implement all abstract methods: register_map, set_model, emit, connect, and close.
Common Usage¶
Logger¶
CSV¶
csv = sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="data.csv"
)
runner.register_emitter(csv)
PostgreSQL¶
postgres = (sf.PostgresEmitter(host="localhost", dbname="crypto")
.set_model(KlineTable)
.on_conflict(["source", "symbol", "timeframe", "open_ts"]))
runner.register_emitter(postgres)
Kafka¶
kafka = sf.KafkaEmitter(
bootstrap_servers="localhost:9092",
topic="crypto"
)
runner.register_emitter(kafka)