from typing import Type from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from maubot import Plugin, MessageEvent from maubot.handlers import command from datetime import datetime import aiohttp import asyncio import json import base64 class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("mail") helper.copy("password") class Tcl(Plugin): async def start(self) -> None: self.config.load_and_update() @command.new(name="tcl", help="") @command.argument("pattern", pass_raw=True, required=True) async def tclInfos(self, evt: MessageEvent, pattern: str) -> None: await evt.mark_read() if not pattern: await evt.respond( """ !tcl alerte ligne -> Retourne les alertes TCL sur la ligne donnée """ ) else: self.log.info("Commande : " + pattern) if pattern.__contains__("alerte"): chunks = pattern.split(None, 1) if len(chunks) != 2: await evt.respond( """ !tcl alerte ligne -> Retourne les alertes TCL sur la ligne donnée """ ) url = 'https://download.data.grandlyon.com/ws/rdata/tcl_sytral.tclalertetrafic_2/all.json?maxfeatures=200&start=1' strToEncode = self.config["mail"] + ':' + self.config["password"] base64string = base64.b64encode(strToEncode.encode('utf-8')) resp = await self.http.get(url, headers={'Authorization' : 'Basic ' + base64string.decode('utf-8')}) ans = await resp.text() try: resp.raise_for_status() except aiohttp.ClientError as Error: await evt.respond(str(Error)) return objet = json.loads(ans) try: values = objet["values"] alerte=False for value in values: if(value["ligne_cli"] == chunks[1]): alerte=True debut = datetime.fromisoformat(value["debut"]) fin = datetime.fromisoformat(value["fin"]) respText = "

" + value["type"] + "

" + value["titre"] + "

" \ + value["message"] + "

" + debut.strftime("%d/%m/%Y à %H:%M") + " au " \ + fin.strftime("%d/%m/%Y à %H:%M") + "

" await evt.respond(respText, allow_html=True) if(alerte==False): await evt.respond("

Pas d'alerte sur cette ligne

", allow_html=True) except KeyError: await evt.respond("Erreur d'analyse JSON ") @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config