#
# Python interface to Aleph.
#
# author: Anze Vavpetic <anze.vavpetic@ijs.si>, 2011
#
import os.path
import shutil
import logging
import re
import tempfile
import json
from StringIO import StringIO
from stat import S_IREAD, S_IEXEC
from subprocess import PIPE
if __name__ != '__main__':
from ..security import SafePopen
else:
import os
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.sys.path.append(parent_dir)
from security import SafePopen
DEBUG = False
# Setup a logger
logger = logging.getLogger("Aleph [Python]")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
ch = logging.StreamHandler()
formatter = logging.Formatter("%(name)s %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
[docs]class Aleph(object):
'''
Aleph python wrapper.
'''
# The aleph source file is presumed to be in the same dir as this file.
THIS_DIR = os.path.dirname(__file__) if os.path.dirname(__file__) else '.'
ALEPH_FN = 'aleph.pl'
FEATURES_FN = 'features.pl'
RULES_SUFFIX = 'Rules'
FEATURES_SUFFIX = 'Features'
PROP_DATASET_SUFFIX = 'Propositional'
SCRIPT = 'run_aleph.pl'
ESSENTIAL_PARAMS = {
'depth': 10,
'evalfn': 'coverage',
'i': 2,
'language': 'inf',
'm': 0.0,
'max_features': 'inf',
'minpos': 1,
'noise': 0
}
[docs] def __init__(self, verbosity=logging.NOTSET):
"""
Creates an Aleph object.
:param logging: Can be DEBUG, INFO or NOTSET (default).
This controls the verbosity of the output.
"""
self.tmpdir = tempfile.mkdtemp()
self.aleph_script = '%s/%s' % (self.tmpdir, Aleph.ALEPH_FN)
self.postGoal = None
self.postScript = None
# Dictionary of non-default settings
self.settings = dict()
logger.setLevel(verbosity)
shutil.copy("%s/%s" % (Aleph.THIS_DIR, Aleph.ALEPH_FN), self.tmpdir)
shutil.copy("%s/%s" % (Aleph.THIS_DIR, Aleph.FEATURES_FN), self.tmpdir)
[docs] def set(self, name, value):
"""
Sets the value of setting 'name' to 'value'.
:param name: Name of the setting
:param value: Value of the setting
"""
self.settings[name] = value
[docs] def settingsAsFacts(self, settings):
"""
Parses a string of settings.
:param setting: String of settings in the form:
``set(name1, val1), set(name2, val2)...``
"""
pattern = re.compile('set\(([a-zA-Z0-9_]+),(\[a-zA-Z0-9_]+)\)')
pairs = pattern.findall(settings)
for name, val in pairs:
self.set(name, val)
[docs] def setPostScript(self, goal, script):
"""
After learning call the given script using 'goal'.
:param goal: goal name
:param script: prolog script to call
"""
self.postGoal = goal
self.postScript = script
[docs] def induce(self, mode, pos, neg, b, filestem='default', printOutput=False):
"""
Induce a theory or features in 'mode'.
:param filestem: The base name of this experiment.
:param mode: In which mode to induce rules/features.
:param pos: String of positive examples.
:param neg: String of negative examples.
:param b: String of background knowledge.
:return: The theory as a string or an arff dataset in induce_features mode.
:rtype: str
"""
# Write the inputs to appropriate files.
self.__prepare(filestem, pos, neg, b)
# Make a script to run aleph (with appropriate settings).
self.__script(mode, filestem)
logger.info("Running aleph...")
dumpFile = None
if not printOutput:
dumpFile = tempfile.TemporaryFile()
# Run the aleph script.
p = SafePopen(['yap', '-s50000', '-h200000', '-L', Aleph.SCRIPT],
cwd=self.tmpdir,
stdout=dumpFile,
stderr=dumpFile
).safe_run()
stdout_str, stderr_str = p.communicate()
logger.info("Done.")
result = None
if mode != 'induce_features':
# Return the rules written in the output file.
rules_fn = filestem + Aleph.RULES_SUFFIX
result = open('%s/%s' % (self.tmpdir, rules_fn)).read()
features = None
else:
features_fn = filestem + Aleph.FEATURES_SUFFIX
features = open('%s/%s' % (self.tmpdir, features_fn)).read()
dataset_fn = filestem + Aleph.PROP_DATASET_SUFFIX
pl_dataset = open('%s/%s' % (self.tmpdir, dataset_fn)).read()
result = self.__to_arff(features, pl_dataset, filestem)
# Cleanup.
self.__cleanup()
return (result, features)
def __prepare(self, filestem, pos, neg, b):
"""
Prepares the needed files.
"""
posFile = open('%s/%s.f' % (self.tmpdir, filestem), 'w')
negFile = open('%s/%s.n' % (self.tmpdir, filestem), 'w')
bFile = open('%s/%s.b' % (self.tmpdir, filestem), 'w')
posFile.write(pos)
negFile.write(neg)
bFile.write(b)
posFile.close()
negFile.close()
bFile.close()
def __cleanup(self):
"""
Cleans up all the temporary files.
"""
try:
shutil.rmtree(self.tmpdir)
except:
logger.info('Problem removing temporary files. \
The files are probably in use.')
def __script(self, mode, filestem):
"""
Makes the script file to be run by yap.
"""
scriptPath = '%s/%s' % (self.tmpdir, Aleph.SCRIPT)
script = open(scriptPath, 'w')
# Permit the owner to execute and read this script
os.chmod(scriptPath, S_IREAD | S_IEXEC)
cat = lambda x: script.write(x + '\n')
cat(":- initialization(run_aleph).")
cat("run_aleph :- ")
cat("consult(aleph),")
cat("read_all('%s')," % filestem)
# Cat all the non-default settings
for setting, value in self.settings.items():
cat("set(%s, %s)," % (setting, str(value)))
cat("%s," % mode)
eof = ',' if self.postScript else '.'
if mode == 'induce_features':
cat("consult(features),")
features_fn = filestem + Aleph.FEATURES_SUFFIX
dataset_fn = filestem + Aleph.PROP_DATASET_SUFFIX
cat('save_features(%s),' % features_fn)
cat('save_dataset(%s)%s' % (dataset_fn, eof))
else:
rules_fn = filestem + Aleph.RULES_SUFFIX
cat("write_rules('%s')%s" % (rules_fn, eof))
if self.postScript:
cat(self.postGoal + ".")
cat(self.postScript)
script.close()
def __to_arff(self, features, pl_dataset, filestem):
arff = StringIO()
cat = lambda x: arff.write(x + '\n')
cat('@RELATION "%s"' % filestem)
features = re.findall(r"feature\((\d+),\((.*)\)\).", features)
for fid, feature in sorted(features, key=lambda e: e[0]):
cat('%% f%s: %s' % (fid, feature))
cat('@ATTRIBUTE f%s {+,-}' % fid)
# Class attribute
class_id = len(features)
cat('@ATTRIBUTE class {negative,positive}')
cat('@DATA')
examples = re.findall(r"example\((\w+),(\[[\d,]*\]),(\w+)\)\.", pl_dataset)
for _, features, cls in examples:
vals = []
for i in range(0, class_id):
vals.append('+' if i in json.loads(features) else '-')
vals.append(cls)
cat('%s' % ','.join(vals))
return arff.getvalue()