Commit 9dec16f1 authored by Benjamin Graillot's avatar Benjamin Graillot

[publinet] nouvelle source

parent ad4e1124
......@@ -9,12 +9,13 @@ import wiki
import news
import phabricator
import prometheus
import publinet
import ml
import gitlabissue
MAIL_SOURCES = [wiki.Wiki(), phabricator.Phabricator(), ml.Ml(), github.Github()]
SOURCES = [source.MailSources(MAIL_SOURCES), news.News(), prometheus.Prometheus(), gitlabissue.GitlabIssue()]
SOURCES = [source.MailSources(MAIL_SOURCES), news.News(), prometheus.Prometheus(), publinet.Publinet(), gitlabissue.GitlabIssue()]
if __name__ == "__main__":
with open(config.socket_path, 'w') as fifo:
......
import datetime
import re
import socket
import sys
import time
import urllib3
from lxml import etree
import source
def trim(st):
return re.sub('[\t\n\r]', '', st)
def state_selector(cols):
return cols[1].text if cols[1].text != '\xa0' else cols[2].text
def url_selector(cols):
base_url = 'http://publinetce2.education.fr'
return base_url + cols[4].findall('a')[0].attrib['href']
class Publinet(source.Source):
def __init__(self):
self.get_page()
self.dic_url = self.gen_dic(url_selector)
self.dic_state = self.gen_dic(state_selector)
source.Source.__init__(self, 'publinet', 'publinetce2.education.fr')
def get_page(self):
self.table = None
url = 'http://publinetce2.education.fr/publinet/Servlet/PublinetServlet?_concours=EAE&_page=LISTE_SECTION&_sort=ALPHA'
http = urllib3.PoolManager()
try:
text = http.request('GET', url).data.decode('ISO-8859-1')
except urllib3.exceptions.MaxRetryError:
return False
self.table = etree.HTML(text).find('body/div/div/table').findall('tr')
return True
def gen_dic(self,selector) :
if self.table is None :
return None
dic = {}
for row in self.table:
cols = row.findall('td')
if len(cols) > 0:
state = selector(cols)
if len(cols[0].getchildren()) > 0:
dic[trim(cols[0].getchildren()[0].text)] = trim(state)
else:
dic[trim(cols[0].text)] = trim(state)
return dic
def change(self):
self.get_page()
if self.table is None or self.dic_state is None:
self.dic_state = self.gen_dic(state_selector)
return []
response = []
dic = self.gen_dic(state_selector)
self.dic_url = self.gen_dic(url_selector)
for key in dic.keys():
if dic[key] != self.dic_state[key]:
response.append((
"Les resultats d'\x0303{}\x03 sont sortis pour \x0310{}\x03 : {}".format(dic[key], key, self.dic_url[key]),
key
))
self.dic_state = dic
return response
def loop(self, *args, **kwargs):
while True:
for response in self.change():
time.sleep(1.7)
self.push_update(response[1], response[0])
time.sleep(90)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment