API Reference

FluxApp

class fluxgraph.FluxApp(title: str = 'FluxGraph API', description: str = 'Enterprise AI agent orchestration framework v3.2 - 100% Complete', version: str = '3.2.0', memory_store: Memory | None = None, rag_connector: UniversalRAG | None = None, auto_init_rag: bool = True, enable_analytics: bool = True, enable_advanced_features: bool = True, enable_workflows: bool = True, enable_advanced_memory: bool = True, enable_agent_cache: bool = True, cache_strategy: str = 'hybrid', enable_enhanced_memory: bool = False, enable_connectors: bool = False, enable_visual_workflows: bool = True, database_url: str | None = None, enable_chains: bool = True, enable_tracing: bool = True, enable_batch_optimization: bool = True, enable_streaming_optimization: bool = True, enable_langserve_api: bool = True, tracing_export_path: str = './traces', tracing_project_name: str | None = None, enable_security: bool = True, enable_audit_logging: bool = True, enable_pii_detection: bool = True, enable_prompt_shield: bool = True, enable_rbac: bool = False, enable_orchestration: bool = True, enable_handoffs: bool = True, enable_hitl: bool = True, enable_task_adherence: bool = True, log_level: str = 'INFO', cors_origins: List[str] = ['*'])[source]

FluxGraph v3.2 - 100% COMPLETE ENTERPRISE APPLICATION (Now with integrated Checkpointing, Logging, and RAG Pipelines)

async add_connector(name: str, connector_type: str, config: Dict)[source]
agent(name: str | None = None, track_performance: bool = True)[source]

Decorator to register agent.

chain(name: str | None = None, description: str | None = None)[source]

Decorator to register chain.

register(name: str, agent: Any)[source]

Register an agent.

register_chain(name: str, chain: Runnable, description: str | None = None)[source]

Register a chain.

run(host: str = '127.0.0.1', port: int = 8000, reload: bool = False, **kwargs)[source]

Start the FluxGraph API server.

tool(name: str | None = None)[source]

Decorator to register tool.

Agent Decorator

@app.agent(name: str)
async def agent_function(**kwargs):
    pass

Endpoints

Execute Agent

POST /ask/{agent_name}
Content-Type: application/json

{
  "message": "User message",
  "session_id": "optional_session_id"
}

Response:

{
  "response": "Agent response",
  "intent": "detected_intent",
  "action": "respond_to_user"
}

Advanced Memory