Source code for websnort.runner
# Websnort - Web service for analysing pcap files with snort
# Copyright (C) 2013-2015 Steve Henderson
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from datetime import datetime
from multiprocessing.pool import ThreadPool
from websnort.plugins import registry
from websnort.config import Config
STATUS_SUCCESS = "Success"
STATUS_FAILED = "Failed"
MAX_THREADS = 3
def duration(start, end=None):
"""
Returns duration in seconds since supplied time.
Note: time_delta.total_seconds() only available in python 2.7+
:param start: datetime object
:param end: Optional end datetime, None = now
:returns: Seconds as decimal since start
"""
if not end:
end = datetime.now()
td = end - start
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) \
/ 1000000.0
[docs]def is_pcap(pcap):
"""
Simple test for pcap magic bytes in supplied file.
:param pcap: File path to Pcap file to check
:returns: True if content is pcap (magic bytes present), otherwise False.
"""
with open(pcap, 'rb') as tmp:
header = tmp.read(4)
# check for both big/little endian
if header == b"\xa1\xb2\xc3\xd4" or \
header == b"\xd4\xc3\xb2\xa1":
return True
return False
def _run_ids(runner, pcap):
"""
Runs the specified IDS runner.
:param runner: Runner instance to use
:param pcap: File path to pcap for analysis
:returns: dict of run metadata/alerts
"""
run = {'name': runner.conf.get('name'),
'module': runner.conf.get('module'),
'ruleset': runner.conf.get('ruleset', 'default'),
'status': STATUS_FAILED,
}
try:
run_start = datetime.now()
version, alerts = runner.run(pcap)
run['version'] = version or 'Unknown'
run['status'] = STATUS_SUCCESS
run['alerts'] = alerts
except Exception as ex:
run['error'] = str(ex)
finally:
run['duration'] = duration(run_start)
return run
[docs]def run(pcap):
"""
Runs all configured IDS instances against the supplied pcap.
:param pcap: File path to pcap file to analyse
:returns: Dict with details and results of run/s
"""
start = datetime.now()
errors = []
status = STATUS_FAILED
analyses = []
pool = ThreadPool(MAX_THREADS)
try:
if not is_pcap(pcap):
raise Exception("Not a valid pcap file")
runners = []
for conf in Config().modules.values():
runner = registry.get(conf['module'])
if not runner:
raise Exception("No module named: '{0}' found registered"
.format(conf['module']))
runners.append(runner(conf))
# launch via worker pool
analyses = [ pool.apply_async(_run_ids, (runner, pcap)) for runner in runners ]
analyses = [ x.get() for x in analyses ]
# were all runs successful?
if all([ x['status'] == STATUS_SUCCESS for x in analyses ]):
status = STATUS_SUCCESS
# propagate any errors to the main list
for run in [ x for x in analyses if x['status'] != STATUS_SUCCESS ]:
errors.append("Failed to run {0}: {1}".format(run['name'], run['error']))
except Exception as ex:
errors.append(str(ex))
return {'start': start,
'duration': duration(start),
'status': status,
'analyses': analyses,
'errors': errors,
}