import json
import os
from fastapi import FastAPI, HTTPException, Request
app = FastAPI()
def verify_memoryos_webhook(body_bytes: bytes, signature_header: str, webhook_secret: str) -> bool:
import hashlib
import hmac
expected = hmac.new(
webhook_secret.encode("utf-8"),
body_bytes,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature_header)
@app.post("/webhooks/memoryos")
async def memoryos_webhook(request: Request) -> dict:
raw_body = await request.body()
signature = request.headers.get("X-MemoryOS-Signature")
if not signature:
raise HTTPException(status_code=401, detail="Missing signature")
webhook_secret = os.environ["MEMORYOS_WEBHOOK_SECRET"]
if not verify_memoryos_webhook(raw_body, signature, webhook_secret):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = json.loads(raw_body.decode("utf-8"))
event = payload["event"]
if event == "quota.warning":
print("Quota warning received", payload["data"])
elif event == "processing.delayed":
print("Processing delay received", payload["data"])
return {"received": True}