Pberndt V4

Direkt zum Inhalt springen


Quellcode phone.py

Beschreibung

Ein Benachrichtigungsdienst für Anrufe, die über die Fritz!Box laufen. Reguliert auch automatisch die Lautstärke am PC. Einfach erweiterbar.

Sourcecode

#!/usr/bin/python
# vim:fileencoding=utf-8
#
# Hacked by Phillip Berndt, www.pberndt.com
#
# Copyrighted, but you may use this program under the terms of the GNU GENERAL
# PUBLIC LICENSE v3. If you are not able to retrieve a copy of this license
# from www.gnu.org or you are uncertain what GPL I am referring to, you may NOT
# use, modify, redistribute, etc. this program. Enjoy.
#
#
import gtk
import glib
import sys
import os
import urllib
import fcntl
import re
import pynotify
import atexit
import signal
from socket import socket
from getopt import getopt
from optparse import OptionParser



## PHONE NUMBER LOOKUP HANDLERS ### {{{
lookup_services = {}
def lookup_service(name):
    def g(f):
        lookup_services[name] = f
        return f
    return g

# Phone number lookup (atm from german „dasoertliche“)
@lookup_service("Das Örtliche")
def phoneLookup_dasOertliche(number):
    try:
        phonebook = dict(( x.strip().split(" ", 1) for x in open(os.path.expanduser("~/.phonebook"), "r").readlines() ))
    except:
        phonebook = { }

    if number in phonebook:
        return phonebook[number]
    answer = urllib.urlopen(("http://www2.dasoertliche.de/Controller?action=43&buc=&context=4&"
        "form_name=search_inv&image.x=73&image.y=3&la=de&page=5&ph=%s" % str(number))).read()
    name = re.search('na: "([^"]+)"', answer)
    if name:
        ret = "%s (%s)" % (name.group(1).decode("iso-8859-15"), number)
    else:
        ret = number
   
    phonebookFile = open(os.path.expanduser("~/.phonebook"), "a")
    fcntl.flock(phonebookFile, fcntl.LOCK_EX)
    phonebookFile.write("%s %s\n" % (number, ret))
    phonebookFile.close()

    return ret

# Don't try to resolve numbers
@lookup_service("None")
def phoneLookup_none(number):
    return number
# }}}
# Parse options {{{
parser = OptionParser(description="React on fritz.box'es phone information")
parser.add_option("-s", "--server", dest="server",
                  help="Use this IP for fritz.box connection",
          default="fritz.box",
          metavar="IP")
parser.add_option("-a", "--no-alsa-volume", dest="alsa",
                  help="Don't use alsa to adjust volume",
          default=True,
          action="store_false")
parser.add_option("-r", "--no-rhythmbox", dest="rhythmbox",
                  help="Don't pause rhythmbox upon calls",
          default=True,
          action="store_false")
parser.add_option("-n", "--no-notifications", dest="notify",
                  help="Don't notify user upon incoming calls",
          default=True,
          action="store_false")
parser.add_option("-d", "--debug",
                  action="store_true", dest="debug", default=False,
                  help="Debug mode")
parser.add_option("-l", "--lookup-service", dest="lookup",
                  help="Lookup service to use",
          default="Das Örtliche")
(options, args) = parser.parse_args()

# Lookup service
if options.lookup not in lookup_services:
    print "Available lookup services:"
    for service in lookup_services:
        print " - " + service
    print
    sys.exit(1)
phone_lookup = lookup_services[options.lookup]

# Debug output
if options.debug:
    def debug(arg):
        print arg
else:
    def debug(arg):
        pass
# }}}
# Fork into background {{{
if not options.debug:
    if os.fork() != 0:
        os._exit(0)
    os.setsid()
    if os.fork() != 0:
        os._exit(0)
    os.chdir("/")
# }}}
# Setup Exit-Handler {{{
def exit_handler(*params):
    sys.exit(0)
for sig in (signal.SIGHUP, signal.SIGINT, signal.SIGQUIT):
    signal.signal(sig, exit_handler)
# }}}
# Create a repeatative server connection code {{{
information_handlers = []
def information_handler(fn):
    global information_handlers
    information_handlers += [ fn ]
    return fn
def connection_failed(fd, condition):
    debug("Connection lost. Initiating timeout for reconnection")
    glib.timeout_add_seconds(10, reestablish_connection)
    fd.close()
    return False
def information_received(fd, condition):
    if type(fd) is file:
        data = fd.readline()
    else:
        data = fd.makefile().readline().decode("iso-8859-15")
    if not data:
        connection_failed(fd, condition)
        return False
    data = data.strip()
    split = data.split(";")
    if len(split) < 2: return True
    date, ctype = split[:2]
    rest = data.split(";")[2:]
    debug("Received " + data)
    for handler in information_handlers:
        try:
            handler(date, ctype, rest)
        except:
            debug("Error: " + str(sys.exc_info()[1]))
    return True
def reestablish_connection():
    try:
        server_socket = socket()
        server_socket.connect((options.server, 1012))
    except:
        debug("Failed to connect to " + options.server)
        return True
    debug("Connected to " + options.server)
    glib.io_add_watch(server_socket, glib.IO_IN, information_received)
    glib.io_add_watch(server_socket, glib.IO_ERR | glib.IO_HUP, connection_failed)
    return False
debug("Creating connection handler")
if reestablish_connection():
    glib.timeout_add_seconds(10, reestablish_connection)
if options.debug:
    glib.io_add_watch(sys.stdin, glib.IO_IN, information_received)
# }}}

########################################

# Setup notifications for incoming calls {{{
if options.notify:
    pynotify.init("phone")
    current_notification = False
    current_number = False
    calls_in_absense = { "active": False, "text": "" }
    def setup_call_when_absent(date, number):
        if not calls_in_absense["active"]:
            calls_in_absense["active"] = True
            displayer = gtk.StatusIcon()
            displayer.set_from_icon_name("call-stop")
            displayer.set_blinking(True)
            displayer.set_tooltip("Calls in absense")
            displayer.set_visible(True)
            calls_in_absense["displayer"] = displayer
            def show_data(widget):
                calls_in_absense["displayer"].set_visible(False)
                del calls_in_absense["displayer"]
                wnd = gtk.Window()
                txt = gtk.TextView()
                txt.get_buffer().set_text(calls_in_absense["text"])
                calls_in_absense["text"] = ""
                calls_in_absense["active"] = False
                wnd.add(txt)
                wnd.set_title("Calls in absense")
                wnd.set_size_request(800, 600)
                wnd.show_all()
            displayer.connect("activate", show_data)
        calls_in_absense["text"] += date + ": Call from " + number + "\n"
    @information_handler
    def incoming_handler(date, type, rest):
        global current_notification, current_number
        if type == "RING":
            incoming = rest[1]
            if incoming:
                number = phone_lookup(incoming)
            else:
                number = incoming
            current_notification = pynotify.Notification("Telephone", "Incoming call from " + number, "call-start")
            current_notification.set_timeout(60000)
            current_notification.show()
            current_number = number
        elif type == "CALL":
            outbound = rest[3]
            if outbound:
                number = phone_lookup(outbound)
            else:
                number = outbound
            current_notification = pynotify.Notification("Telephone", "Outgoing call to " + number, "call-start")
            current_notification.set_timeout(60000)
            current_notification.show()
            current_number = number
        elif type == "CONNECT":
            if current_notification:
                current_notification.close()
                current_notification = False
                current_number = False
        elif type == "DISCONNECT":
            if current_notification:
                current_notification.close()
                current_notification = False
                setup_call_when_absent(date, current_number)
# }}}

# Setup volume control while phoning {{{
if options.alsa:
    try:
        import alsaaudio
        import dbus
        hasAlsa = True
    except:
        debug("Don't setting up ALSA volume control, alsaaudio module not present")
        hasAlsa = False
    if hasAlsa:
        alsa_volume = False
        @atexit.register
        def volume_exit_handler():
            global alsa_volume
            if avg(alsaaudio.Mixer().getvolume()) < 15 and alsa_volume != False:
                debug("Volume control: Resetting volume to " + str(alsa_volume))
                alsaaudio.Mixer().setvolume(int(alsa_volume))
                alsa_volume = False
        avg = lambda l: sum(l) * 1.0 / len(l)
        @information_handler
        def volume_handler(date, type, rest):
            global alsa_volume
            if type == "CONNECT":
                vol = avg(alsaaudio.Mixer().getvolume())
                debug("Volume control: Old volume was " + str(vol))
                if vol < 10:
                    return
                alsa_volume = vol
                alsaaudio.Mixer().setvolume(10)           
            elif type == "DISCONNECT":
                if avg(alsaaudio.Mixer().getvolume()) < 15 and alsa_volume != False:
                    debug("Volume control: Resetting volume to " + str(alsa_volume))
                    alsaaudio.Mixer().setvolume(int(alsa_volume))
                    alsa_volume = False
# }}}

# Setup rhythmbox control while phoning {{{
if options.rhythmbox:
    import dbus
    sbus = dbus.SessionBus()
    was_playing = False
    @atexit.register
    def rhythmbox_exit_handler():
        if was_playing:
            obj = sbus.get_object('org.gnome.Rhythmbox', '/org/gnome/Rhythmbox/Player')
            if not obj.getPlaying():
                debug("Resuming Rhythmbox")
                try: obj.playPause(False)
                except: pass
    @information_handler
    def rhythmbox_handler(date, type, rest):
        global was_playing
        if type == "CONNECT":
            if os.system("pgrep rhythmbox >/dev/null") == 0:
                obj = sbus.get_object('org.gnome.Rhythmbox', '/org/gnome/Rhythmbox/Player')
                was_playing = obj.getPlaying()
                if was_playing:
                    debug("Stopping Rhythmbox")
                    obj.playPause(False)
        elif type == "DISCONNECT":
            if was_playing:
                obj = sbus.get_object('org.gnome.Rhythmbox', '/org/gnome/Rhythmbox/Player')
                if not obj.getPlaying():
                    debug("Resuming Rhythmbox")
                    try: obj.playPause(False)
                    except: pass
# }}}

gtk.main()

Download

Dateiname
phone.py
Größe
8.97kb