This is a simple Python script that connects to a VDR server, which must be configured to accept SVDRP connections at port 2001.
It retrieves the EPG listing of all future programs, finds the ones whose title matches one of your favorites, and generates a timer for them. The script can be run e.g. once per day to add new programs.
Disclaimer: This script could mess up your other VDR timers, so use at your own risk.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Autorecord favorite VDR programs via SVDRP.
# Copyright (C) Kenneth Falck 2011.
# Distribution allowed under the Simplified BSD License.
import telnetlib
from datetime import datetime, timedelta
import re
import sys
VDR server
Link to heading
VDR_HOST = '127.0.0.1'
VDR_PORT = 2001
Only consider these channels
Link to heading
VDR_CHANNELS = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Extra time after end
Link to heading
VDR_EXTRATIME = 60*10
Autorecord these regexps
Link to heading
VDR_FAVORITES = [re.compile(r) for r in (
r'star trek',
r'james bond',
)]
Transliterate titles to filenames
Link to heading
VDR_TRANSLITERATE = (
('ä', 'a'),
('ö', 'o'),
('å', 'a'),
('Ä', 'A'),
('Ö', 'O'),
('Å', 'A'),
(':', '|'),
)
class SVDRPClient(object):
def init(self, host, port):
self.telnet = telnetlib.Telnet()
self.telnet.open(host, port)
self.read_response()
def close(self):
self.telnet.close()
def send_command(self, line):
#print '>>>', line
self.telnet.write(line + '\r\n')
def read_line(self):
line = self.telnet.read_until('\n', 10).replace('\n', '').replace('\r', '')
#print '<<<', line
if len(line) < 4: return None
return int(line[0:3]), line[3] == '-', line[4:]
def read_response(self):
response = []
line = self.read_line()
if line: response.append(line)
while line and line[1]:
line = self.read_line()
if line: response.append(line)
return response
def parse_channel(self, line):
"""250-1 TV1;YLE:274000:C0M128:C:6900:512:650=fin:2321:0:17:42249:1:0"""
num, rest = line[2].split(' ', 1)
fields = rest.split(':')
name = fields[0].split(';', 1)[0]
fields = fields[1:]
return {
'num': int(num),
'name': name,
'fields': fields,
}
def read_channels_response(self):
return [channel for channel in [self.parse_channel(line) for line in self.read_response()] if channel and channel['num'] in VDR_CHANNELS]
def get_channels(self):
self.send_command('LSTC')
return self.read_channels_response()
def read_programs_response(self):
programs = []
program = {}
for rline in self.read_response():
line = rline[2]
if rline[1] and len(line) > 0:
if line[0] == 'E':
# start new program: 215-E 3419 1295135100 3600 61 12
info = line[2:].split(' ')
program = {
'id': int(info[0]),
'start': datetime.strptime(datetime.fromtimestamp(int(info[1])).strftime('%Y-%m-%d %H:%M:00'), '%Y-%m-%d %H:%M:%S'),
'duration': int(info[2]),
}
elif line[0] == 'e':
# end program
programs.append(program)
elif line[0] == 'T':
# title
program['title'] = line[2:]
elif line[0] == 'D':
# description
program['description'] = line[2:]
return programs
def get_programs(self, channel_num):
self.send_command('LSTE %d' % channel_num)
return self.read_programs_response()
def parse_timer(self, line):
"""250 1 1:5:2011-01-08:2050:2320:99:99:Salainen agentti 007 ja Tohtori No (K15):<vdradmin-am><bstart>10</bstart><bstop>10</bstop></vdradmin-am>"""
num, rest = line[2].split(' ', 1)
fields = rest.split(':')
date = datetime.strptime(fields[2], '%Y-%m-%d')
start = datetime.strptime(fields[2] + ' ' + fields[3] + '00', '%Y-%m-%d %H%M%S')
stop = datetime.strptime(fields[2] + ' ' + fields[4] + '00', '%Y-%m-%d %H%M%S')
if stop < start:
stop += timedelta(days=1)
return {
'num': int(num),
'status': int(fields[0]),
'channel': int(fields[1]),
'date': date,
'start': start,
'stop': stop,
'priority': int(fields[5]),
'lifetime': int(fields[6]),
'filename': fields[7],
'extra': fields[8],
}
def read_timers_response(self):
return [self.parse_timer(line) for line in self.read_response()]
def get_timers(self):
self.send_command('LSTT')
return self.read_timers_response()
def timer_matches(self, timer, channel, program, verbose=False):
matched = timer['channel'] == channel['num'] and timer['start'] == program['start']
if not matched and verbose:
print '(Not matched:', program['title'], timer['start'], program['start'], ')'
return matched
def find_timer(self, timers, channel, program, verbose=False):
for timer in timers:
if self.timer_matches(timer, channel, program, verbose=verbose):
return timer
return None
def cleanup_title(self, title):
for src, dst in VDR_TRANSLITERATE:
title = title.replace(src, dst)
return re.sub(r'[^a-zA-Z0-9 _-]', '', title)
def del_timer(self, timer_num):
self.send_command('DELT %d' % timer_num)
self.read_response()
def add_timer(self, channel, program):
#print 'Adding timer', channel['num'], channel['name'], program['start'], program['title']
self.send_command('NEWT 1:%d:%s:%s:%s:99:99:%s:' % (channel['num'], program['start'].strftime('%Y-%m-%d'), program['start'].strftime('%H%M'), (program['start'] + timedelta(seconds=program['duration'] + VDR_EXTRATIME)).strftime('%H%M'), self.cleanup_title(program['title'])))
self.read_response()
client = SVDRPClient(VDR_HOST, VDR_PORT)
timers = client.get_timers()
for channel in client.get_channels():
programs = client.get_programs(channel['num'])
#print channel['name'], ':', len(programs), 'program(s)'
for program in programs:
if program['start'] > datetime.now():
for favorite in VDR_FAVORITES:
if favorite.match(program['title'].lower()):
timer = client.find_timer(timers, channel, program)
if not timer:
client.add_timer(channel, program)
break
client.close()