# Python interface to RSD.
#
# author: Anze Vavpetic <anze.vavpetic@ijs.si>, 2012
#
import os.path
import shutil
import logging
import re
import tempfile
from stat import S_IREAD, S_IEXEC
from subprocess import PIPE
try:
from ..security import SafePopen
except:
import os
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.sys.path.append(parent_dir)
from security import SafePopen
DEBUG = True
# Setup a logger
logger = logging.getLogger("RSD [Python]")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
ch = logging.StreamHandler()
formatter = logging.Formatter("%(name)s %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
[docs]class RSD(object):
'''
RSD python wrapper.
'''
THIS_DIR = os.path.dirname(__file__) if os.path.dirname(__file__) else '.'
RSD_FILES = ['featurize.pl', 'process.pl', 'rules.pl']
# Generated scripts filenames
CONSTRUCT = '_construct.pl'
SAVE = '_save.pl'
SUBGROUPS = '_subgroups.pl'
SCRIPTS = [CONSTRUCT, SAVE, SUBGROUPS]
ESSENTIAL_PARAMS = {
'clauselength' : 8,
'depth' : 4,
'negation' : 'none',
'min_coverage' : 1,
'filtering' : 'true'
}
[docs] def __init__(self, verbosity=logging.NOTSET):
"""
Creates an RSD object.
:param logging: Can be DEBUG, INFO or NOTSET (default).
This controls the verbosity of the output.
"""
self.tmpdir = tempfile.mkdtemp()
self.settings = dict()
logger.setLevel(verbosity)
# Copy needed files to tmp dir
for fn in RSD.RSD_FILES:
shutil.copy("%s/%s" % (RSD.THIS_DIR, fn), self.tmpdir)
[docs] def set(self, name, value):
"""
Sets the value of setting 'name' to 'value'.
:param name: Name of the setting
:param value: Value of the setting
"""
self.settings[name] = value
[docs] def settingsAsFacts(self, settings):
"""
Parses a string of settings.
:param setting: String of settings in the form:
``set(name1, val1), set(name2, val2)...``
"""
pattern = re.compile('set\(([a-zA-Z0-9_]+),(\[a-zA-Z0-9_]+)\)')
pairs = pattern.findall(settings)
for name, val in pairs:
self.set(name, val)
[docs] def induce(self, b, filestem='default',
examples=None,
pos=None,
neg=None,
cn2sd=True,
printOutput=False):
"""
Generate features and find subgroups.
:param filestem: The base name of this experiment.
:param examples: Classified examples; can be used instead of separate pos / neg files below.
:param pos: String of positive examples.
:param neg: String of negative examples.
:param b: String with background knowledge.
:param cn2sd: Find subgroups after feature construction?
:return: a tuple ``(features, weka, rules)``, where:
- features is a set of prolog clauses of generated features,
- weka is the propositional form of the input data,
- rules is a set of generated cn2sd subgroup descriptions;
this will be an empty string if cn2sd is set to False.
:rtype: tuple
"""
# Write the inputs
self.__prepare(filestem, b, examples=examples, pos=pos, neg=neg)
# Write scripts
self.__scripts(filestem)
dumpFile = None
if not printOutput:
dumpFile = tempfile.TemporaryFile()
# Run the script
logger.info("Running RSD...")
try:
for script in RSD.SCRIPTS:
# Skip subgroup discovery part?
if script == RSD.SUBGROUPS and not cn2sd:
continue
p = SafePopen(['yap', '-s50000', '-h200000', '-L', script],
cwd=self.tmpdir,
stdout=dumpFile,
stderr=dumpFile).safe_run()
stdout_str, stderr_str = p.communicate()
logger.debug(stdout_str)
logger.debug(stderr_str)
logger.info("Done.")
# Return the rules written in the output file.
features = open('%s/%s' % (self.tmpdir, filestem + '_frs.pl')).read()
weka = open('%s/%s' % (self.tmpdir, filestem + '.arff')).read()
rules = open('%s/%s' % (self.tmpdir, filestem + '.rules')).read() if cn2sd else ''
self.__cleanup()
return (features, weka, rules)
except OSError:
raise RuntimeError("Yap compiler could not be loaded! (see http://www.dcc.fc.up.pt/~vsc/Yap/).")
def __prepare(self, filestem, b, examples=None, pos=None, neg=None):
"""
Prepares the needed files.
"""
if examples:
examplesFile = open('%s/%s.pl' % (self.tmpdir, filestem), 'w')
examplesFile.write(examples)
examplesFile.close()
elif pos and neg:
posFile = open('%s/%s.f' % (self.tmpdir, filestem), 'w')
negFile = open('%s/%s.n' % (self.tmpdir, filestem), 'w')
posFile.write(pos)
negFile.write(neg)
posFile.close()
negFile.close()
else:
raise Exception('You need to provide either a single file of classified examples or \
two files, positive and negative examples.')
bFile = open('%s/%s.b' % (self.tmpdir, filestem), 'w')
# Write settings.
for setting, val in self.settings.items():
bFile.write(':- set(%s,%s).\n' % (setting, val))
bFile.write(b)
bFile.close()
def __cleanup(self):
"""
Cleans up all the temporary files.
"""
try:
shutil.rmtree(self.tmpdir)
except:
logger.info('Problem removing temporary files. The files are probably in use.')
def __scripts(self, filestem):
"""
Generates the required scripts.
"""
script_construct = open('%s/%s' % (self.tmpdir, RSD.CONSTRUCT), 'w')
script_save = open('%s/%s' % (self.tmpdir, RSD.SAVE), 'w')
script_subgroups = open('%s/%s' % (self.tmpdir, RSD.SUBGROUPS), 'w')
# Permit the owner to execute and read this script
for fn in RSD.SCRIPTS:
os.chmod('%s/%s' % (self.tmpdir, fn), S_IREAD | S_IEXEC)
# Writes one line of script
new_script = lambda script: lambda x: script.write(x + '\n')
#
# 'Construction' script
#
w = new_script(script_construct)
w(':- initialization(main).')
w('main :-')
w('[featurize],')
w('r(%s),' % filestem)
w('w.')
script_construct.close()
#
# 'Saving' script
#
w = new_script(script_save)
w(':- initialization(main).')
w('main :-')
w('[process],')
w('r(%s),' % filestem)
w('w,')
w('w(weka, %s),' % filestem)
w('w(rsd, %s).' % filestem)
script_save.close()
#
# 'Subgroups' script
#
w = new_script(script_subgroups)
w(':- initialization(main).')
w('main :-')
w('[rules],')
w('r(%s),' % filestem)
w('i,')
w('w.')
script_subgroups.close()