asterisk.py 16.9 KB
Newer Older
1
# -*- mode: python; coding: utf-8 -
2 3 4 5 6 7 8 9
# liste d'event http://www.voip-info.org/wiki/view/asterisk+manager+events
# liste d'action http://www.voip-info.org/wiki/view/Asterisk+manager+API
import sys
import time
import syslog
import socket
import base64
import psycopg2
10
import threading
11
import psycopg2.extras
12

Daniel Stan's avatar
Daniel Stan committed
13 14
if '/usr/scripts' not in sys.path:
    sys.path.append('/usr/scripts')
15
import lc_ldap.shortcuts
16 17 18
import lc_ldap.objets

import gestion.secrets_new as secrets
19

20

21 22
class NullRecv(EnvironmentError):
    pass
23 24


25
class AsteriskError(ValueError):
26
    def __init__(self, message, action, params):
27 28 29
        self.message = message
        self.action = action
        self.params = params
30 31 32

    def __str__(self):
        return '%s, Action:%s, params:%s' % (self.message, self.action, self.params)
33

34

35
class Profile(object):
Valentin Samir's avatar
Valentin Samir committed
36
    def __init__(self, sql_params=None, database=None):
37
        self.sql_params = sql_params
38 39
        self.database = database

40 41 42
    def update_pin(self, num, pin):
        conn = psycopg2.connect(self.sql_params)
        cur = conn.cursor()
43 44 45 46
        cur.execute(
            "UPDATE %s SET voicemail_password=%%s WHERE num=%%s" % self.database,
            (pin, num)
        )
47 48 49
        conn.commit()
        cur.close()
        conn.close()
Valentin Samir's avatar
Valentin Samir committed
50

51
    def right_to_nums(self, right):
52 53 54
        conn = lc_ldap.shortcuts.lc_ldap_readonly()
        ret = conn.search(u"(&(droits=%s)(!(chbre=EXT)))" % right)
        return ["1%04d" % adh['aid'][0].value for adh in ret]
55

Valentin Samir's avatar
Valentin Samir committed
56 57
    def alias_to_num(self, alias):
        try:
58 59 60 61 62 63 64
            conn = lc_ldap.shortcuts.lc_ldap_readonly()
            ret = conn.search(
                (
                    u"(|(uid=%(alias)s)(mailAlias=%(alias)s@crans.org)"
                    + "(canonicalAlias=%(alias)s@crans.org))"
                ) % {'alias': alias}
            )
Valentin Samir's avatar
Valentin Samir committed
65
            if len(ret) == 1:
66
                return "1%04d" % ret[0]['aid'][0].value
Valentin Samir's avatar
Valentin Samir committed
67 68 69 70
            else:
                return "NONE"
        except:
            return "NONE"
71

72 73 74 75 76 77 78 79
    def num_to_callerid(self, num):
        try:
            conn = psycopg2.connect(self.sql_params)
            cur = conn.cursor()
            cur.execute("SELECT caller_id from %s WHERE num=%%s" % self.database, (num,))
            caller_id = cur.fetchall()[0][0]
            cur.close()
            conn.close()
80

81
            if caller_id == 'full_name' or caller_id == 'both':
82 83 84 85
                conn = lc_ldap.shortcuts.lc_ldap_readonly()
                aid = int(num[1:])
                adh = conn.search(u'aid=%s' % aid)[0]
                return '%s %s' % (adh['prenom'][0], adh['nom'][0])
86 87 88 89 90
            else:
                return num
        except:
            return num

91

92
class Sms(object):
93 94 95
    def __init__(self, sql_params=None, database=None):
        self.sql_params = sql_params
        self.database = database
96

97 98 99 100 101 102 103 104 105
    def sms_daemon(self, server, port, user, password, timeout=360):
        manager = Manager(
            user,
            password,
            server=server,
            event=True,
            auto_connect=False,
            timeout=timeout
        )
106 107 108 109 110 111
        manager.register_events_callback('PeerStatus', self._send_sms)
        while True:
            manager.connect()
            try:
                while True:
                    manager.process_events()
112
            except (socket.error, NullRecv):
113
                pass
114

115
    def sms_delay(self, src, dst, body, user, body_type='str'):
116
        if body_type not in ["str", "base64"]:
117
            raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type)
118 119
        conn = psycopg2.connect(self.sql_params)
        cur = conn.cursor()
120 121 122 123 124
        cur.execute(
            'INSERT INTO %s (date, "from", "to", body, "user") VALUES (NOW(), %%s, %%s, %%s, %%s)' %
            self.database,
            (src, dst, base64.b64encode(body).strip() if body_type == 'str' else body, user)
        )
125 126 127
        conn.commit()
        cur.close()
        conn.close()
128

129
    def _send_sms(self, manager, params):
130
        if params['PeerStatus'] in ['Reachable', 'Registered']:
131
            num = params['Peer'].split('/')[1]
132 133
            conn = psycopg2.connect(self.sql_params)
            cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
134 135 136 137
            cur.execute(
                'SELECT * FROM %s WHERE "user"=%%s ORDER BY "date" ASC' % self.database,
                (num,)
            )
138
            for sms in cur.fetchall():
139 140
                try:
                    manager.messageSend(sms['from'], sms['to'], sms['body'], body_type='base64')
141 142 143
                    syslog.syslog(
                        "Message from %s successfully delivered to %s" % (sms['from'], sms['to'])
                    )
144 145
                    cur.execute('DELETE FROM %s WHERE id=%%s' % self.database, (sms['id'],))
                    conn.commit()
146 147 148 149
                except AsteriskError:
                    syslog.syslog(
                        "Message from %s to %s : %s" % (sms['from'], sms['to'], params['Message'])
                    )
150 151
            cur.close()
            conn.close()
152

153 154 155 156
    def send(self, dst, msg, src=None):

        if isinstance(src, lc_ldap.objets.proprio):
            # rajouter @crans.org ne semble pas marcher, pourquoi ?
157
            num = "1%04d" % src['aid'][0].value
158 159 160 161 162 163 164 165
            profile_manager = Profile(self.sql_params, "voip_profile")
            callerid = profile_manager.num_to_callerid(num)
            caller = '"%s" <sip:%s@crans.org>' % (callerid, num)
        elif src:
            caller = src
        else:
            caller = "ServiceCenter"

166 167 168 169
        if isinstance(dst, lc_ldap.objets.proprio):
            to = "sip:1%04d" % dst['aid'][0].value
        else:
            to = dst
170 171 172 173

        ast_manager = Manager('sms', secrets.get('asterisk_sms_passwd'))

        try:
174
            ast_manager.messageSend(caller, to, msg)
175 176
        except AsteriskError as error:
            if error.message == "Message failed to send.":
177 178 179 180 181 182 183
                self.sms_delay(
                    error.params['from'],
                    error.params['to'],
                    error.params['base64body'],
                    error.params['to'].split(':', 1)[1],
                    body_type='base64'
                )
184 185 186 187
            else:
                raise


188 189 190
class History(object):
    def __init__(self, sql_params, database, quota_limit):
        self.sql_params = sql_params
191
        self.database = database
192 193 194 195 196
        self.quota_limit = quota_limit

    def add(self, id, src, dst):
        conn = psycopg2.connect(self.sql_params)
        cur = conn.cursor()
197 198 199 200
        cur.execute(
            "INSERT INTO %s (uniq_id,src,dst) VALUES (%%s, %%s, %%s)" % self.database,
            (id, src, dst)
        )
201 202 203
        conn.commit()
        cur.close()
        conn.close()
204

205 206 207 208 209 210 211
    def delete(self, id):
        conn = psycopg2.connect(self.sql_params)
        cur = conn.cursor()
        cur.execute("DELETE FROM %s WHERE uniq_id=%%s" % self.database, (id,))
        conn.commit()
        cur.close()
        conn.close()
212

213 214 215
    def update(self, id, duration):
        conn = psycopg2.connect(self.sql_params)
        cur = conn.cursor()
216 217 218
        cur.execute(
            "UPDATE %s SET duration=%%s WHERE uniq_id=%%s" % self.database, (int(duration), id)
        )
219 220 221
        conn.commit()
        cur.close()
        conn.close()
222

223 224 225 226 227
    def quota(self, number):
        allowed = False
        try:
            conn = psycopg2.connect(self.sql_params)
            cur = conn.cursor()
228 229 230 231 232 233
            cur.execute(
                "SELECT count(DISTINCT dst) FROM %s WHERE date>='%s' AND dst LIKE '+%%'" % (
                    self.database,
                    time.strftime('%Y-%m-01')
                )
            )
234 235
            outgoing_call_num = cur.fetchall()[0][0]
            if outgoing_call_num >= self.quota_limit:
236 237 238 239 240 241 242
                cur.execute(
                    "SELECT count(dst)>0 FROM %s WHERE date>'%s' AND dst=%%s" % (
                        self.database,
                        time.strftime('%Y-%m-01')
                    ),
                    (number,)
                )
243 244 245 246 247 248 249
                allowed = cur.fetchall()[0][0]
            else:
                allowed = True
            cur.close()
            conn.close()
        except:
            pass
250 251
        sys.stdout.write('ALLOWED' if allowed else 'DENY')

252 253

class Manager(object):
254 255

    def __init__(
256
        self, username, password, timeout=10, server='asterisk.adm.crans.org',
257 258 259
        port=5038, debug=False, event=False, auto_connect=True, agi=None,
        wait_fullybooted=True
    ):
260 261 262 263 264 265
        self.timeout = timeout
        self.server = server
        self.port = port
        self.socket = None
        self.debug = debug
        self.event = event
266 267 268
        self._pending_action = []
        self._response = {}
        self._event = []
269 270
        self._event_callback = {}
        self._toread = ""
271
        self._agi = agi
272
        self.fullybooted = False
273
        self.wait_fullybooted = wait_fullybooted
274

275 276
        self.username = username
        self.password = password
277 278 279

        if self.wait_fullybooted:
            self.register_events_callback('FullyBooted', self._FullyBooted)
280 281
        if auto_connect:
            self.connect()
282

283 284 285 286 287
    def agi(self, *args, **kwargs):
        if not self._agi:
            self._agi = AGI(*args, manager=self, **kwargs)
        return self._agi

288
    def _FullyBooted(self, manager, params):
289
        manager.fullybooted = True
290

291 292 293 294 295 296 297 298 299 300
    def _recv(self):
        data = self.socket.recv(1024)
        if len(data) == 0:
            raise NullRecv("Got null response")
        self._toread += data
        self._toread = self._toread.split('\r\n\r\n')
        for msg in self._toread[:-1]:
            if self.debug:
                print msg + "\n"
            self._parse_msg(msg)
301 302
        self._toread = self._toread[-1]

303 304 305
    def _send(self, str):
        if self.debug:
            print str
306 307
        self.socket.send('%s\r\n' % (str))

308 309 310
    def _parse_msg(self, msg):
        msg = msg.strip().split('\r\n')
        type, value = msg[0].split(': ', 1)
311 312 313 314
        params = dict([
            line.split(': ', 1) if len(line.split(': ', 1)) == 2 else (line.split(': ', 1)[0], '')
            for line in msg[1:]
        ])
315 316
        handler = getattr(self, "_do_"+type, None)
        handler(value, params)
317

318 319 320 321
    def _do_Response(self, status, params):
        id = params['ActionID']
        del(params['ActionID'])
        self._response[id] = (status, params)
322

323 324 325 326
    def _do_Event(self, type, params):
        self._event.append((type, params))

    def _gen_actionID(self):
327
        id = time.time()
328
        while id in self._pending_action:
329
            id = id+1
330
        return str(id)
331

332
    def _action(self, name, params):
333
        self._send('ACTION: %s' % name.upper())
334
        for (key, value) in params.items():
335 336 337
            self._send('%s: %s' % (key.upper(), value))
        id = self._gen_actionID()
        self._send('ActionID: %s' % id)
338
        self._pending_action.append(id)
339 340 341
        self._send('')

        while id not in self._response.keys():
342
            self._recv()
343

344 345 346 347
        response = self._response[id]
        del(self._response[id])
        self._pending_action.remove(id)
        if response[0] == 'Error':
348
            raise AsteriskError(response[1]['Message'], name, params)
349
        return response
350

351 352
    def action(self, name, **params):
        return self._action(name, params)
353

354
    def connect(self):
355
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
356
        sock.settimeout(self.timeout)
357 358
        sock.connect((self.server, self.port))
        self.socket = sock
359 360
        msg = self.socket.recv(1024).strip().split('\r\n', 1)
        self.version = msg[0]
361
        if len(msg) > 1:
362 363
            self._toread += msg[1]
        self.login()
364 365 366
        if self.wait_fullybooted:
            while not self.fullybooted:
                self.process_events()
367
        self.events(self.event)
368

369 370
    def register_events_callback(self, event, func):
        self._event_callback[event] = self._event_callback.get(event, []) + [func]
371

372 373
    def process_events(self):
        if not self._event:
374 375 376 377
            try:
                self._recv()
            except socket.timeout:
                pass
378 379 380 381
        for event in self._event:
            (type, params) = event
            for func in self._event_callback.get(type, []):
                func(self, params)
382 383
        self._event = []

384 385
    def login(self, username=None, secret=None):
        """Login Manager"""
386 387 388 389
        if username is None:
            username = self.username
        if secret is None:
            secret = self.password
390
        return self.action('login', username=username, secret=secret)
391

392 393
    def logoff(self):
        """ Logoff Manager"""
394 395 396
        response = self.action('logoff')
        if response[0] == 'Goodbye':
            self.fullybooted = False
397

398 399
    def events(self, param):
        """Control Event Flow
400 401
        params should be a boolean or a list among system,call,log,verbose,command,agent,user
        to select which flags events should have to be sent."""
402 403 404 405 406 407 408
        if isinstance(param, list):
            eventmask = ','.join(param)
        elif isinstance(param, bool):
            eventmask = 'on' if param else 'off'
        else:
            raise EnvironmentError("%r should be a list or a bool" % param)
        return self.action('events',  eventmask=eventmask)
409

410 411 412 413
    def hangup(self, channel):
        """Hangup a channel"""
        return self.action('hangup', channel=channel)

414 415 416 417
    def reload(self, module):
        """Synopsis: Send a reload event
            Privilege: system,config,all"""
        return self.action('reload', module=module)
418

419
    def messageSend(self, src, dst, body, body_type="str"):
420
        if body_type not in ["str", "base64"]:
421 422
            raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type)
        if body_type == "str":
423
            body = base64.b64encode(body).strip()
424
        return self._action('messageSend', {'to': dst, 'from': src, 'base64body': body})
425 426 427 428 429 430


class AGI(object):
    """voir http://www.voip-info.org/wiki/view/Asterisk+AGI"""
    def __init__(self, read=sys.stdin, write=sys.stdout, manager=None, **params):
        self.debug = True
431 432 433 434 435
        self.read = read
        self.write = write
        self.params = params
        self._manager = manager
        self._lock = False
436 437 438 439 440
        self._locklock = threading.Lock()

        self._read_params()

    def manager(self, *args, **kwargs):
441 442 443
        if not self._manager:
            self._manager = Manager(*args, agi=self, **kwargs)
        return self._manager
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461

    def __getitem__(self, key):
        try:
            return self.params[key]
        except KeyError:
            (result, data) = self.command("GET VARIABLE", key)
            if int(result) != 1:
                raise KeyError(key)
            return data[1:-2]

    def __setitem__(self, key, value):
        if key in self.params.keys():
            raise ValueError("ReadOnly value")
        else:
            (result, data) = self.command("SET VARIABLE", key, value)
            return result

    def _read_params(self):
462
        line = self.read.readline()
463 464 465
        while line.strip():
            syslog.syslog(line)
            if line.startswith('agi_'):
466 467
                (key, data) = line[4:].split(':', 1)
                self.params[key.strip()] = data.strip()
468 469 470 471 472
            line = self.read.readline()

    def command(self, name, *params):
        with self._locklock:
            if self._lock:
473 474 475 476 477
                raise AsteriskError(
                    "Cannot lauch AGI command %s, an other command is processing" % name,
                    name,
                    params
                )
478 479
            else:
                self._lock = True
480
        cmd = ' '.join([name] + ["%s" % p for p in params])
481 482 483 484
        if self.debug:
            syslog.syslog("%s\n" % cmd)
        self.write.write("%s\n" % cmd)
        self.write.flush()
485
        line = self.read.readline()
486 487
        if self.debug:
            syslog.syslog(line)
488 489 490
        lines = [line]
        code = int(line[0:3])
        type = line[3]
491 492
        if type == '-':
            while not "%s End of " % code not in line:
493
                line = self.read.readline()
494 495 496
                if self.debug:
                    syslog.syslog("%s\n" % cmd)
                lines.append(line)
497
        self._lock = False
498 499 500 501 502 503 504
        if code != 200:
            raise AsteriskError((code, '\n'.join(lines)), name, params)

        try:
            (result, data) = lines[0][4:].split(' ', 1)
        except ValueError:
            result = lines[0][4:]
505 506 507 508
            data = ""
        result = result.split('=', 1)[1].strip()
        return (int(result), data)

509 510
    def hangup(self):
        self.command("hangup")
511

512 513
    def set_callerid(self, callerid):
        self.command("set_callerid", callerid)
514

515 516
    def noop(self, str):
        self.command("noop", str)
517

518 519
    def launch_app(self, app, *params):
        self.command("exec", app, *params)
520

521 522
    def dial(self, to):
        self.launch_app("dial", to)
523

524 525
    def answer(self):
        self.launch_app("Answer")
526

527 528 529 530
    def goto(self, arg):
        self.launch_app("goto", arg)


531
# TODO
532
class FastAGI(object):
533
    def __init__(self, bind, port, *args, **kwargs):
534
        pass