API Reference
FluxApp
- class fluxgraph.FluxApp(title: str = 'FluxGraph API', description: str = 'Enterprise AI agent orchestration framework v3.2 - 100% Complete', version: str = '3.2.0', memory_store: Memory | None = None, rag_connector: UniversalRAG | None = None, auto_init_rag: bool = True, enable_analytics: bool = True, enable_advanced_features: bool = True, enable_workflows: bool = True, enable_advanced_memory: bool = True, enable_agent_cache: bool = True, cache_strategy: str = 'hybrid', enable_enhanced_memory: bool = False, enable_connectors: bool = False, enable_visual_workflows: bool = True, database_url: str | None = None, enable_chains: bool = True, enable_tracing: bool = True, enable_batch_optimization: bool = True, enable_streaming_optimization: bool = True, enable_langserve_api: bool = True, tracing_export_path: str = './traces', tracing_project_name: str | None = None, enable_security: bool = True, enable_audit_logging: bool = True, enable_pii_detection: bool = True, enable_prompt_shield: bool = True, enable_rbac: bool = False, enable_orchestration: bool = True, enable_handoffs: bool = True, enable_hitl: bool = True, enable_task_adherence: bool = True, log_level: str = 'INFO', cors_origins: List[str] = ['*'])[source]
FluxGraph v3.2 - 100% COMPLETE ENTERPRISE APPLICATION (Now with integrated Checkpointing, Logging, and RAG Pipelines)
- agent(name: str | None = None, track_performance: bool = True)[source]
Decorator to register agent.
- chain(name: str | None = None, description: str | None = None)[source]
Decorator to register chain.
- register_chain(name: str, chain: Runnable, description: str | None = None)[source]
Register a chain.
Agent Decorator
@app.agent(name: str)
async def agent_function(**kwargs):
pass
Endpoints
Execute Agent
POST /ask/{agent_name}
Content-Type: application/json
{
"message": "User message",
"session_id": "optional_session_id"
}
Response:
{
"response": "Agent response",
"intent": "detected_intent",
"action": "respond_to_user"
}