#!/usr/bin/python3
SVER = '3.0.1'
##############################################################################
# pat - SCA Pattern Checker Tool
# Copyright (C) 2023-2024 SUSE LLC
#
# Description:  Runs a pattern or patterns against supportconfig directories to
#               check the pattern's output based on the supportconfig directory.
# Modified:     2024 Feb 03
#
##############################################################################
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; version 2 of the License.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, see <http://www.gnu.org/licenses/>.
#
#  Authors/Contributors:
#     Jason Record <jason.record@suse.com>
#
##############################################################################

import sys
import os
import re
import json
import getopt
import signal
import subprocess
import patdevel as pd
import configparser
from datetime import timedelta
from timeit import default_timer as timer

##############################################################################
# Global Options
##############################################################################

recurse_archives = False
recurse_patterns = False
pattern_list = []
archive_list = []
usepath = {'archives': '', 'patterns': '', 'scalib': '', 'log_dir': '', 'log_file': 'pattern-check.log'}
overall_dict = {"-2": "Temporary", "-1": "Partial", "0": "Success", "1": "Recommend", "2": "Promotion", "3": "Warning", "4": "Critical", "5": "Error", "6": "Ignore"}
c_ = {'current': 0, 'total': 0, 'total_pat': 0, 'total_fails': 0, 'total_skipped': 0, 'total_arch': 0, "active_pattern": '', "active_archive": '', 'checks_skipped': 0, 'Fatal': 0, 'Temporary': 0, 'Partial': 0, 'Success': 0, 'Recommend': 0, 'Promotion': 0, 'Warning': 0, 'Critical': 0, 'Error': 0, 'Ignore': 0}
meta_error = {'any': False, 'mode': False, 'bin': False, 'out': False, 'out1': False, 'out2': False, 'hpl': False, 'dos': False, 'notsca': False}
invalid_patterns = []
invalid_pattern_shortlist = {}
failed_pattern_shortlist = {}
skipped_pattern_shortlist = {}
elapsed = -1
title_string = "SCA Pattern Checker"
gen_value = -1


##############################################################################
# Functions
##############################################################################

def usage():
    "Displays usage information"
    display = "  {:33s} {}"
    print("Usage: pat [options] <path_to_pattern|directory>")
    print()
    print("Description:")
    print("  Runs the selected pattern(s) against the selected supportconfig archive(s).")
    print()
    print("  Documentation: /usr/share/doc/packages/sca-patterns-devel/index.html")
    print()
    print("Options:")
    print(display.format("-h, --help", "Display this help"))
    print(display.format("-a <path>, --archives <path>", "Root directory for supportconfig archives to be used for testing"))
    print(display.format("-r, --recurse", "Validate patterns and archives recursively found in the directory structures"))
    print(display.format('-l <level>, --log_level <level>', "Set log level, default: Minimal"))
    print(display.format('', "0 Quiet, 1 Minimal, 2 Normal, 3 Verbose, 4 Debug"))
    print()

def signal_handler(sig, frame):
    print("\n\nAborting...\n")
    show_summary()
    sys.exit(1)

def show_summary():
    msg.min()
    msg.min("Summary")
    if( msg.get_level() >= msg.LOG_MIN ):
        pd.separator_line('-')
    msg.normal("Elsapsed Runtime", str(elapsed).split('.')[0])
    msg.normal("Pattern Directory", usepath['patterns'])
    msg.normal("Archive Directory", usepath['archives'])
    msg.normal("SCA Library Directory", usepath['scalib'])
    msg.min("Patterns Checked", str(c_['total_pat']))
    msg.min("Patterns Failed", str(c_['total_fails']))
    msg.min("Patterns Skipped", str(c_['total_skipped']))
    msg.min("Archives Used", str(c_['total_arch']))
    msg.normal("Total Checks", str(c_['total']))
    msg.min("Checks Skipped", str(c_['checks_skipped']))
    msg.normal("Fatal Checks", str(c_['Fatal']))
    msg.normal("Errors", str(c_['Error']))
    msg.normal("Ignored", str(c_['Ignore']))
    msg.normal("Critical", str(c_['Critical']))
    msg.normal("Warning", str(c_['Warning']))
    msg.normal("Proactive", str(c_['Promotion']))
    msg.normal("Recommended", str(c_['Recommend']))
    msg.normal("Success", str(c_['Success']))
    if len(c_['active_pattern']) > 0:
        msg.min("Active Pattern", c_['active_pattern'])
    if len(c_['active_archive']) > 0:
        msg.min("Active Archive", c_['active_archive'])
    if len(invalid_pattern_shortlist) > 0:
        msg.min()
        msg.min("Fatal Pattern Short List")
        if( msg.get_level() >= msg.LOG_MIN ):
            pd.separator_line('-')
        for pattern in invalid_pattern_shortlist.keys():
            msg.min(pattern)
        msg.min()
        msg.min("Fatal Pattern Details")
        if( msg.get_level() >= msg.LOG_MIN ):
            pd.separator_line('-')
        if len(invalid_patterns) > 0:
            for invalid_pattern in invalid_patterns:
                msg.min("clear; pat -a " + invalid_pattern[1] + " " + invalid_pattern[0] + " -l3")
        else:
            msg.min("None")
    msg.min()

def set_environment():
    os.environ['PYTHONPATH'] = usepath['scalib'] + '/python'
    os.environ['PERL5LIB'] = usepath['scalib'] + '/perl'
    os.environ['BASHLIB'] = usepath['scalib'] + '/bash'

def get_archive_list(this_path):
    scadir = re.compile("^scc_|^nts_", re.IGNORECASE)
    subfolders = []
    supportconfig_dir = this_path + "/basic-environment.txt"

    if os.path.exists(supportconfig_dir):
        subfolders.append(this_path)
    else:
        these_folders = [f.path for f in os.scandir(this_path) if f.is_dir()]

        if recurse_archives:
            for dirname in list(these_folders):
                these_folders.extend(get_archive_list(dirname))

        for folder in these_folders:
            if scadir.search(os.path.basename(folder)):
                subfolders.append(folder)
                
    return subfolders

def prepare_archives():
    these_archives = []
    if os.path.isdir(usepath['archives']):
        usepath['archives'] = os.path.abspath(usepath['archives'])
        if recurse_archives:
            msg.normal("Recursively Loading Archives", usepath['archives'])
        else:
            msg.normal("Loading Archives", usepath['archives'])
        these_archives = get_archive_list(usepath['archives'])
    else:
        print("Error: Invalid archive path - " + usepath['archives'] + "\n")
        usage()
        sys.exit(5)

    return these_archives

def prepare_patterns():
    these_patterns = []

    if os.path.isdir(usepath['patterns']):
        usepath['patterns'] = os.path.abspath(usepath['patterns'])
        if recurse_patterns:
            msg.normal("Recursively Processing Dir", usepath['patterns'])
            these_patterns = pd.get_pattern_list(usepath['patterns'], _recurse=True)
        else:
            msg.normal("Processing Dir", usepath['patterns'])
            these_patterns = pd.get_pattern_list(usepath['patterns'], _recurse=False)
    elif os.path.isfile(usepath['patterns']):
        usepath['patterns'] = os.path.abspath(usepath['patterns'])
        msg.min("Processing File", usepath['patterns'])
        these_patterns.append(usepath['patterns'])
    else:
        print("Error: Invalid pattern file or path - " + usepath['patterns'] + "\n")
        usage()
        sys.exit(5)

    return these_patterns

def validate_pattern(this_pattern):
    global gen_value
    # Validate pattern mode
    if not os.access(this_pattern, os.X_OK):
        meta_error['any'] = True
        meta_error['mode'] = True

    # Validate UNIX text files
    with open(this_pattern, 'rb') as f:
        bindata = f.read()

    if b'\x0d\x0a' in bindata: # Look for \r\n
        meta_error['any'] = True
        meta_error['dos'] = True

    # Validate pattern hash pling and SCA pattern
    data = bindata.splitlines()
    hashpling = re.compile('^#!/')
    validhpls = re.compile('python3$|perl$')
    scapattern_gen1 = re.compile('^Core.init\(META_CLASS|^\@PATTERN_RESULTS = \(', re.IGNORECASE)
    scapattern_gen2 = re.compile('SCAPatternGen2\(')
    hpl = ''
    hplmissing = True
    scafile = False
    for binline in data:
        line = binline.decode('ascii')
        if hashpling.search(line):
            hpl = line[2:].split()[0] #drop the #!, keep the path
        if scapattern_gen1.search(line):
            gen_value = 1
            scafile = True
            break
        if scapattern_gen2.search(line):
            gen_value = 2
            scafile = True
            break

    if not scafile:
        meta_error['any'] = True
        meta_error['notsca'] = True
        return

    if len(hpl) > 0:
        if os.path.exists(hpl):
            if validhpls.search(hpl):
                hplmissing = False
    if hplmissing:
        meta_error['any'] = True
        meta_error['hpl'] = True

def show_meta_errors():
    if meta_error['any']:
        if meta_error['dos']:
            msg.normal("  + Detected DOS file format, use dos2unix to convert")
        if meta_error['mode']:
            msg.normal("  + Missing execute permission")
        if meta_error['hpl']:
            msg.normal("  + Missing or invalid hash pling")
        if meta_error['out']:
            msg.normal("  + Invalid pattern output string, review Pattern Requirements")
        if meta_error['out1']:
            msg.normal("  + Invalid generation 1 pattern output string, review Pattern Requirements")
        if meta_error['out2']:
            msg.normal("  + Invalid generation 2 JSON output string, review Pattern Requirements")
        if meta_error['bin']:
            msg.normal("  + Pattern execution error, pattern returned non-zero")
        if meta_error['notsca']:
            msg.normal("  + Not an SCA Pattern")

def run_pattern(this_pattern, this_archive):
    global meta_error
    REQUIRED_JSON_KEYS = ['generation', 'class', 'category', 'component', 'id', 'primary_solution', 'severity', 'description', 'solution_links']
    IDX_OVERALL = 5
    IDX_LAST = -1
    output = []
    status = ''
    valid_gen1_output = re.compile("^META_CLASS=.*|META_CATEGORY=.*|META_COMPONENT=.*|PATTERN_ID=.*|PRIMARY_LINK=META_LINK_.*|OVERALL=.*|OVERALL_INFO=.*|META_LINK_")

    msg.normal("Evaluating Pattern [{}/{}]".format(c_['current'], c_['total']), this_pattern)
    validate_pattern(this_pattern)


    if meta_error['notsca']:
        c_['checks_skipped'] += 1
        skipped_pattern_shortlist[this_pattern] = True
        invalid_pattern_shortlist[this_pattern] = True
        invalid_patterns.append([this_pattern, this_archive])
        status = 'Skipped'
        if( msg.get_level() >= msg.LOG_NORMAL ):
            msg.normal("+ Archive Used", this_archive)
            show_meta_errors()
            msg.normal("+ Status", status)
            pd.separator_line('-')
            print()
        return


    if meta_error['any']:
        failed_pattern_shortlist[this_pattern] = True
        status = 'Fatal'
        c_['Fatal'] += 1

    try:
        if gen_value == 2:
            p = subprocess.run([this_pattern, this_archive], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        elif gen_value == 1:
            p = subprocess.run([this_pattern, '-p', this_archive], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        else:
            print("Error: Invalid generation value {0}".format(gen_value))
            sys.exit(5)
    except Exception as e:
        msg.debug('  <> subprocess.run Exception')
        meta_error['bin'] = True # not working
        if not meta_error['any']:
            c_['Fatal'] += 1
        status = 'Fatal'
        meta_error['any'] = True

        if( msg.get_level() >= msg.LOG_NORMAL ):
            msg.normal("+ Archive Used", this_archive)
            show_meta_errors()
            msg.normal()
            print(str(e) + "\n")
            msg.normal("+ Status", status)
            pd.separator_line('-')
            print()
        if meta_error['any']:
            failed_pattern_shortlist[this_pattern] = True
            invalid_patterns.append([this_pattern, this_archive])
            invalid_pattern_shortlist[this_pattern] = True
        return

    if p.returncode > 0:
        msg.debug("  <> Non-Zero return code, p.returncode > 0")
        if not meta_error['any']:
            c_['Fatal'] += 1
        status = 'Fatal'
        meta_error['any'] = True
        meta_error['bin'] = True

        if( msg.get_level() >= msg.LOG_NORMAL ):
            msg.normal("+ Archive Used", this_archive)
            show_meta_errors()
            msg.normal()
            print(p.stdout)
            print(p.stderr)
            msg.normal("+ Status", status)
            pd.separator_line('-')
            print()
    else:
        json_data = True

        try:
            json_object = json.loads(p.stdout)
        except ValueError as e:
            json_data = False
            meta_error['any'] = True
            meta_error['out2'] = True
            status = 'Fatal'
            c_['Fatal'] += 1
            if( msg.get_level() >= msg.LOG_NORMAL ):
                msg.normal("+ Archive Used", this_archive)
                show_meta_errors()
                msg.normal()
                print(p.stdout)
                print(p.stderr)
                msg.normal("+ Status", status)
                pd.separator_line('-')
                print()
            return

        missing_json_keys = []
        if json_data:
            for key in REQUIRED_JSON_KEYS:
                if not key in json_object:
                    missing_json_keys.append(key)
            if len(missing_json_keys) > 0:
                meta_error['any'] = True
                meta_error['out2'] = True
                status = 'Fatal'
                c_['Fatal'] += 1
                if( msg.get_level() >= msg.LOG_NORMAL ):
                    msg.normal("+ Archive Used", this_archive)
                    show_meta_errors()
                    msg.normal()
                    print(p.stdout)
                    print(p.stderr)
                    msg.normal("+ Status", status)
                    pd.separator_line('-')
                    print()
            else:
                this_sev = str(json_object['severity'])
                c_[overall_dict[this_sev]] += 1
                msg.normal("+ Result", str(json_object['description']))
                msg.verbose("+ Archive Used", this_archive)
                msg.verbose("+ Output", p.stdout)
                status = overall_dict[this_sev]
        elif valid_gen1_output.search(p.stdout):
            msg.debug('  <> Valid Execution, Output Found')
            lines = p.stdout.splitlines()
            if len(lines) > 1:
                meta_error['any'] = True
                meta_error['out1'] = True
                status = 'Fatal'
                c_['Fatal'] += 1
            else:
                output = p.stdout.split('|')
                overall = output[IDX_OVERALL].split("=")[IDX_LAST]
                c_[overall_dict[overall]] += 1
                status = overall_dict[overall]
            if( msg.get_level() >= msg.LOG_VERBOSE ):
                msg.verbose("+ Archive Used", this_archive)
                msg.verbose()
                msg.verbose(p.stdout)
                for this_out in output:
                    (key, value) = this_out.split("=", 1)
                    if key.startswith('OTHER_LINKS'):
                        (nextkey, nextvalue) = value.split("=", 1)
                        msg.verbose(key)
                        msg.verbose(nextkey, nextvalue)
                    else:
                        msg.verbose(key, value)
        else:
            msg.debug('  <> Valid Execution, Output MISSING')
            if not meta_error['any']:
                c_['Fatal'] += 1
            status = 'Fatal'
            meta_error['any'] = True
            meta_error['out'] = True

    if meta_error['any']:
        failed_pattern_shortlist[this_pattern] = True
        invalid_patterns.append([this_pattern, this_archive])
        invalid_pattern_shortlist[this_pattern] = True
    else:
        if( msg.get_level() >= msg.LOG_VERBOSE ):
            show_meta_errors()
            msg.normal("+ Status", status)
            pd.separator_line('-')
            print()
        elif( msg.get_level() >= msg.LOG_NORMAL ):
            show_meta_errors()
            msg.normal("+ Status", status)

##############################################################################
# Main
##############################################################################

def main(argv):
    "main entry point"
    global SVER, pattern_list, archive_list, recurse_archives, recurse_patterns, c
    global invalid_patterns, invalid_pattern_shortlist, elapsed, defpath, usepath, meta_error, gen_value
    start = timer()
    
    if( os.path.exists(pd.config_file) ):
        config.read(pd.config_file)
        usepath['archives'] = pd.config_entry(config.get("Common", "sca_arch_dir"), '/')
        usepath['scalib'] = pd.config_entry(config.get("Common", "sca_lib_dir"), '/')
        usepath['log_dir'] = pd.config_entry(config.get("Security", "pat_logs"), '/')
        usepath['log_file'] = usepath['log_dir'] + usepath['log_file']
        config_log_level = pd.config_entry(config.get("Common", "log_level"))
        config_logging = msg.validate_level(config_log_level)
        if( config_logging >= msg.LOG_QUIET ):
            msg.set_level(config_logging)
        else:
            print("Warning: Invalid log level in config file, using instance default")
    else:
        pd.title(title_string, SVER)
        print("Error: File not found - " + pd.config_file + "\n")
        sys.exit(1)

    try:
        (optlist, args) = getopt.gnu_getopt(argv[1:], "ha:rl:", ["help", "archives=", "recurse", "log_level="])
    except getopt.GetoptError as exc:
        pd.title(title_string, SVER)
        print("Error:", exc, file=sys.stderr)
        sys.exit(2)
    for opt, arg in optlist:
        if opt in {"-h", "--help"}:
            pd.title(title_string, SVER)
            usage()
            sys.exit(0)
        elif opt in {"-a", "--archives"}:
            usepath['archives'] = arg
        elif opt in {"-r", "--recurse"}:
            recurse_archives = True
            recurse_patterns = True
        elif opt in {"-l", "--log_level"}:
            user_logging = msg.validate_level(arg)
            if( user_logging >= msg.LOG_QUIET ):
                msg.set_level(user_logging)
            else:
                print("Warning: Invalid log level, using instance default")

    if( msg.get_level() > msg.LOG_QUIET ):
        pd.title(title_string, SVER)

    msg.normal("Log Level", msg.get_level_str())

    set_environment()
    archive_list = prepare_archives()
    c_['total_arch'] = len(archive_list)
    if c_['total_arch'] == 0:
        print("Error: No supportconfig archives found in {}\n".format(usepath['archives']))
        sys.exit(5)

    if len(args) > 0:
        usepath['patterns'] = args[0]
    else:
        print("Error: Missing path to pattern file or directory, use . for current directory\n")
        usage()
        sys.exit(1)


    pattern_list = prepare_patterns()
    c_['total_pat'] = len(pattern_list)
    c_['total'] = c_['total_pat'] * c_['total_arch']
    msg.min()

    if( msg.get_level() == msg.LOG_MIN ):
        bar = pd.ProgressBar("Checking: ", c_['total'])

    for pattern in pattern_list:
        for archive in archive_list:
            c_['current'] += 1
            if( msg.get_level() == msg.LOG_MIN ):
                bar.inc_count()
            c_['active_pattern'] = pattern
            c_['active_archive'] = archive
            run_pattern(pattern, archive)
            if( msg.get_level() == msg.LOG_MIN ):
                bar.update()
    c_['total_fails'] = len(failed_pattern_shortlist)
    c_['total_skipped'] = len(skipped_pattern_shortlist)

    c_['active_pattern'] = ''
    c_['active_archive'] = ''

    if( msg.get_level() == msg.LOG_MIN ):
        bar.finish()

    end = timer()
    elapsed = str(timedelta(seconds=end-start))
    msg.verbose()
    if c_['total_pat'] > 0:
        show_summary()
    else:
        print("No patterns found\n")
        
# Entry point
if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
    msg = pd.DisplayMessages()
    main(sys.argv)
    sys.exit(c_['Fatal'])


