from enum import Enum import json from django.db import models from django.contrib.auth.models import User from django.conf import settings import requests class GotifyUser(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE, primary_key=True) id = models.PositiveIntegerField(verbose_name="Gotify User ID") class GotifyMessageType(Enum): plain = "text/plain" markdown = "text/markdown" class GotifyMessage(): type: GotifyMessageType message: str title: str | None priority: int url: str | None def __init__(self, message: str, title: str | None = None, priority: int = 5, url: str | None = None, type: str = 'text/plain'): self.message = message self.title = title if not 0 <= priority <= 10: raise ValueError(f"Priority must be 0 to 10.") self.priority = priority self.url = url self.type = GotifyMessageType(type) def as_dict(self) -> dict: obj = { "message": self.message, "priority": self.priority, "extras": { "client::display": { "contentType": self.type.value } } } if self.title: obj["title"] = self.title if self.url: obj["extras"]["client::notification"] = { "click": { "url": self.url } } return obj class GotifyApplication(models.Model): user = models.OneToOneField(GotifyUser, on_delete=models.CASCADE, primary_key=True) id = models.PositiveIntegerField(verbose_name="Gotify Application ID") token = models.CharField(max_length=256, verbose_name="Gotify Application Token") def send_message(self, message: GotifyMessage): endpoint_url = f"http://{settings.GOTIFY_CONFIG['HOST']}/message" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.post( url=endpoint_url, headers=headers, data=json.dumps(message.as_dict()) ) if response is not None: response.raise_for_status()