299 lines
13 KiB
Python
299 lines
13 KiB
Python
from typing import Type
|
||
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
||
from maubot import Plugin, MessageEvent
|
||
from maubot.handlers import command
|
||
from datetime import datetime
|
||
import aiohttp
|
||
import asyncio
|
||
import json
|
||
import base64
|
||
import difflib
|
||
import re
|
||
|
||
|
||
class Terminus:
|
||
def __init__(self, name):
|
||
self.name = name
|
||
self.horaires = []
|
||
self.estimated = False
|
||
|
||
def add_horaire(self, horaire, est):
|
||
if self.estimated == False:
|
||
if est == "E":
|
||
self.horaires = []
|
||
self.horaires.append(horaire)
|
||
self.estimated = True
|
||
else:
|
||
self.horaires.append(horaire)
|
||
else:
|
||
if est == "E":
|
||
self.horaires.append(horaire)
|
||
|
||
def get_horaires(self):
|
||
return self.sort_horaires()
|
||
|
||
def get_name(self):
|
||
return self.name
|
||
|
||
def sort_horaires(self):
|
||
inth = []
|
||
for h in self.horaires:
|
||
if h == "Proche":
|
||
inth.append(0)
|
||
else :
|
||
inth.append(int(h[:-4]))
|
||
inth.sort()
|
||
return inth
|
||
|
||
def get_estimated(self):
|
||
return self.estimated
|
||
|
||
class Ligne:
|
||
def __init__(self, name):
|
||
self.name = name
|
||
self.terminus = []
|
||
|
||
def add_terminus(self, newterminus):
|
||
if newterminus in self.terminus:
|
||
return
|
||
self.terminus.append(newterminus)
|
||
|
||
def get_terminus(self, term):
|
||
for t in self.terminus:
|
||
if t.get_name() == term:
|
||
return t
|
||
return None
|
||
|
||
def get_all_terminus(self):
|
||
return self.terminus
|
||
|
||
def get_name(self):
|
||
return self.name
|
||
|
||
|
||
class Config(BaseProxyConfig):
|
||
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
||
helper.copy("mail")
|
||
helper.copy("password")
|
||
|
||
|
||
class Tcl(Plugin):
|
||
async def start(self) -> None:
|
||
self.config.load_and_update()
|
||
|
||
def help_msg(self):
|
||
return(
|
||
"""
|
||
!tcl alerte ligne -> Retourne les alertes TCL sur la ligne donnée
|
||
ligne peut être :
|
||
- Metros : A, B, C, D
|
||
- Trams : T1, T2 ...
|
||
- Renforcés : C1, C2 ...
|
||
- Bus : 1, 2 ...
|
||
- Funis : F1, F2
|
||
ATTENTION : Le bot ne vérifie pas que la ligne existe
|
||
!tcl horaires arrêt (-- ligne) -> Retourne les horaires à l'arrêt (et filtre par ligne)
|
||
La réponse indique si les horaires sont théoriques (T) ou estimés (E)
|
||
Les horaires estimés sont préférés, si pas dispo, les horaires théoriques
|
||
sont affichés
|
||
Optionnellement le paramètre "-- ligne" permet de filtrer l'affichage pour ne garder que la ligne souhaitée
|
||
"""
|
||
)
|
||
|
||
@command.new(name="tcl", help="Permet d'avoir des informations sur les TCL")
|
||
@command.argument("pattern", pass_raw=True, required=True)
|
||
async def tclInfos(self, evt: MessageEvent, pattern: str) -> None:
|
||
await evt.mark_read()
|
||
strToEncode = self.config["mail"] + ':' + self.config["password"]
|
||
base64string = base64.b64encode(strToEncode.encode('utf-8'))
|
||
if not pattern:
|
||
await evt.respond(self.help_msg())
|
||
else:
|
||
self.log.info("Commande : " + pattern)
|
||
if pattern.__contains__("alerte"):
|
||
chunks = pattern.split(None, 1)
|
||
if len(chunks) != 2:
|
||
await evt.respond(
|
||
"""
|
||
!tcl alerte ligne -> Retourne les alertes TCL sur la ligne donnée
|
||
ligne peut être :
|
||
- Metros : A, B, C, D
|
||
- Trams : T1, T2 ...
|
||
- Renforcés : C1, C2 ...
|
||
- Bus : 1, 2 ...
|
||
- Funis : F1, F2
|
||
ATTENTTION : Le bot ne vérifie pas que la ligne existe
|
||
"""
|
||
)
|
||
url = 'https://download.data.grandlyon.com/ws/rdata/tcl_sytral.tclalertetrafic_2/all.json?maxfeatures=300&start=1'
|
||
resp = await self.http.get(url, headers={'Authorization' : 'Basic ' + base64string.decode('utf-8')})
|
||
ans = await resp.text()
|
||
try:
|
||
resp.raise_for_status()
|
||
except aiohttp.ClientError as Error:
|
||
await evt.respond(str(Error))
|
||
return
|
||
objet = json.loads(ans)
|
||
try:
|
||
values = objet["values"]
|
||
alerte=False
|
||
if(len(objet["values"]) == 0):
|
||
await evt.respond("<h3>Erreur, aucune réponse</h3>", allow_html=True)
|
||
else:
|
||
msg = []
|
||
for value in values:
|
||
if(value["ligne_cli"] == chunks[1].upper()):
|
||
alerte=True
|
||
#Il peut y avoir 100 fois le même message...
|
||
if value["message"] not in msg:
|
||
msg.append(value["message"])
|
||
debut = datetime.fromisoformat(value["debut"])
|
||
fin = datetime.fromisoformat(value["fin"])
|
||
newMsg = value["message"].replace('\t', '<br>')
|
||
newMsg = re.sub(r"(?<![ '-])[A-Z]", r"<br>\g<0>", newMsg)
|
||
newMsg = newMsg.replace('<br> <br>', '<br>')
|
||
self.log.info(newMsg)
|
||
respText = "<h3>" + value["type"] + "</h3><p><strong>" + value["titre"] + "</strong></p><p>" \
|
||
+ newMsg + "</p><strong>" + debut.strftime("%d/%m/%Y à %H:%M") + " au " \
|
||
+ fin.strftime("%d/%m/%Y à %H:%M") + "</strong></p>"
|
||
await evt.respond(respText, allow_html=True)
|
||
if(alerte==False):
|
||
await evt.respond("<h3>Pas d'alerte sur cette ligne</h3>", allow_html=True)
|
||
except KeyError:
|
||
await evt.respond("Erreur d'analyse JSON ")
|
||
elif pattern.__contains__("horaire"):
|
||
chunks = pattern.split(None, 1)
|
||
if len(chunks) != 2:
|
||
await evt.respond(
|
||
"""
|
||
!tcl horaires arrêt (-- ligne) -> Retourne les horaires à l'arrêt (et filtre par ligne)
|
||
La réponse indique si les horaires sont théoriques (T) ou estimés (E)
|
||
Les horaires estimés sont préférés, si pas dispo, les horaires théoriques
|
||
sont affichés
|
||
Optionnellement le paramètre "-- ligne" permet de filtrer l'affichage pour ne garder que la ligne souhaitée
|
||
"""
|
||
)
|
||
return
|
||
with open("./tcl/tcl_sytral.tclarret.json") as file:
|
||
objet = json.load(file)
|
||
values = objet["values"]
|
||
arrets = []
|
||
for value in values:
|
||
arrets.append(value["nom"].lower())
|
||
|
||
subCmd = chunks[1].split("--",1)
|
||
arret = difflib.get_close_matches(subCmd[0].lower(),arrets,n=1,cutoff=0.8)
|
||
if len(arret) == 0:
|
||
await evt.respond("Arrêt non trouvé")
|
||
return
|
||
Ids =[]
|
||
nomArret = ""
|
||
for value in values:
|
||
if value["nom"].lower() == arret[0]:
|
||
nomArret = value["nom"]
|
||
Ids.append(value["id"])
|
||
|
||
|
||
url = 'https://download.data.grandlyon.com/ws/rdata/tcl_sytral.tclpassagearret/all.json?maxfeatures=30000&start=1'
|
||
respText = "### Prochains départs " + nomArret + " :\n"
|
||
self.log.info(respText)
|
||
lines = []
|
||
while len(url) > 0:
|
||
resp = await self.http.get(url, headers={'Authorization' : 'Basic ' + base64string.decode('utf-8')})
|
||
ans = await resp.text()
|
||
try:
|
||
resp.raise_for_status()
|
||
except aiohttp.ClientError as Error:
|
||
await evt.respond(str(Error))
|
||
return
|
||
objet = json.loads(ans)
|
||
try:
|
||
url = objet["next"]
|
||
except KeyError:
|
||
url = ""
|
||
values = objet["values"]
|
||
|
||
for value in values:
|
||
if value["id"] in Ids:
|
||
# Les metros ont leur propre code
|
||
if value["coursetheorique"].split('-')[0] == "301A" :
|
||
transport = "A"
|
||
elif value["coursetheorique"].split('-')[0] == "303" :
|
||
transport = "C"
|
||
elif value["coursetheorique"].split('-')[0] == "325" :
|
||
transport = "F1"
|
||
elif value["coursetheorique"].split('-')[0] == "326" :
|
||
transport = "F2"
|
||
elif value["coursetheorique"].split('-')[0] == "304" :
|
||
transport = "D"
|
||
elif value["coursetheorique"].split('-')[0] == "302A" :
|
||
transport = "B"
|
||
else:
|
||
l = value["coursetheorique"].split('-')[0]
|
||
if l[-1:].isdigit():
|
||
transport = l
|
||
else:
|
||
transport = value["coursetheorique"].split('-')[0][:-1]
|
||
if value["delaipassage"].find('min') < 0 :
|
||
continue
|
||
found = False
|
||
for line in lines:
|
||
if line.get_name() == transport:
|
||
term = line.get_terminus(value["direction"])
|
||
if term is None:
|
||
newterm = Terminus(value["direction"])
|
||
newterm.add_horaire(value["delaipassage"], value["type"])
|
||
line.add_terminus(newterm)
|
||
else:
|
||
term.add_horaire(value["delaipassage"], value["type"])
|
||
found = True
|
||
if not found :
|
||
newLine = Ligne(transport)
|
||
newterm = Terminus(value["direction"])
|
||
newterm.add_horaire(value["delaipassage"], value["type"])
|
||
newLine.add_terminus(newterm)
|
||
lines.append(newLine)
|
||
for line in lines:
|
||
if len(subCmd) == 2:
|
||
if line.get_name() != "".join(subCmd[1].split()).upper():
|
||
continue
|
||
self.log.info("Ligne " + line.get_name())
|
||
respText += "#### Ligne " + line.get_name() + "\n"
|
||
terms = line.get_all_terminus()
|
||
for t in terms:
|
||
self.log.info(t.get_name())
|
||
if t.get_name() == nomArret:
|
||
continue
|
||
if t.get_name() is not None:
|
||
respText += "***Direction " + t.get_name()
|
||
else:
|
||
respText += "***Direction ?"
|
||
|
||
if t.get_estimated() == True:
|
||
respText += " (E)***\n"
|
||
else:
|
||
respText += " (T)***\n"
|
||
hs = t.get_horaires()
|
||
hs.sort()
|
||
for idx, h in enumerate(hs):
|
||
self.log.info(h)
|
||
if h == 0:
|
||
respText += "- Proche\n"
|
||
else:
|
||
respText += "- " + str(h) + " min\n"
|
||
if idx == 2: ##On limite à 3 items
|
||
break
|
||
respText += "\n"
|
||
await evt.respond(respText)
|
||
|
||
else:
|
||
await evt.respond(self.help_msg())
|
||
|
||
|
||
@classmethod
|
||
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
||
return Config
|
||
|
||
|
||
|