Source code for interpreter.interpreter

import logging
import numpy as np
import pickle
import scipy.sparse as sp
import torch
import warnings
from collections       import Counter
from sklearn.neighbors import KDTree
from tqdm              import tqdm

from .cluster import Cluster
from .utils   import group_by, unique_2d, sp_unique

# Set logger
logger = logging.getLogger(__name__)

[docs]class Interpreter(object):
[docs] def __init__(self, context_builder, features, eps=0.1, min_samples=5, threshold=0.2): """Interpreter for a given ContextBuilder. Parameters ---------- context_builder : ContextBuilder ContextBuilder to interpret. features : int Number of different possible security events. eps : float, default=0.1 Epsilon used for determining maximum distance between clusters. min_samples : int, default=5 Minimum number of required samples per cluster. threshold : float, default=0.2 Minimum required confidence of ContextBuilder before using a context in training clusters. """ # Initialise ContextBuilder self.context_builder = context_builder # Create cluster algorithm dbscan self.dbscan = Cluster(p=1) # Set parameters self.features = features self.eps = eps self.min_samples = min_samples self.threshold = threshold # Store entries self.clusters = np.zeros(0) self.vectors = np.zeros((0, self.features)) self.events = np.zeros(0) self.tree = dict() self.labels = dict()
######################################################################## # Fit/Predict methods # ########################################################################
[docs] def fit(self, X, y, scores, iterations = 100, batch_size = 1024, strategy = "max", NO_SCORE = -1, verbose = False, ): """Fit the Interpreter by performing clustering and assigning scores. Fit function is a wrapper that calls the following methods: 1. Interpreter.cluster 2. Interpreter.score_clusters 3. Interpreter.score Parameters ---------- X : torch.Tensor of shape=(n_samples, seq_length) Input context to cluster. y : torch.Tensor of shape=(n_samples, 1) Events to cluster. scores : array-like of float, shape=(n_samples,) Scores for each sample in cluster. iterations : int, default=100 Number of iterations for query. batch_size : int, default=1024 Size of batch for query. strategy : string (max|min|avg), default=max Strategy to use for computing scores per cluster based on scores of individual events. Currently available options are: - max: Use maximum score of any individual event in a cluster. - min: Use minimum score of any individual event in a cluster. - avg: Use average score of any individual event in a cluster. NO_SCORE : float, default=-1 Score to indicate that no score was given to a sample and that the value should be ignored for computing the cluster score. The NO_SCORE value will also be given to samples that do not belong to a cluster. verbose : boolean, default=False If True, prints achieved speedup of clustering algorithm. Returns ------- self : self Returns self """ # Call cluster method clusters = self.cluster( X = X, y = y, iterations = iterations, batch_size = batch_size, verbose = verbose, ) # Call score_clusters method to distribute individual scores over # clusters according to chosen strategy scores = self.score_clusters( scores = scores, strategy = strategy, NO_SCORE = NO_SCORE, ) # Set scores self.score( scores = scores, verbose = verbose, ) # Return self return self
[docs] def predict(self, X, y, iterations=100, batch_size=1024, verbose=False): """Predict maliciousness of context samples. Parameters ---------- X : torch.Tensor of shape=(n_samples, seq_length) Input context for which to predict maliciousness. y : torch.Tensor of shape=(n_samples, 1) Events for which to predict maliciousness. iterations : int, default=100 Iterations used for optimization. batch_size : int, default=1024 Batch size used for optimization. verbose : boolean, default=False If True, print progress. Returns ------- result : np.array of shape=(n_samples,) Predicted maliciousness score. Positive scores are maliciousness scores. A score of 0 means we found a match that was not malicious. Special cases: * -1: Not confident enough for prediction * -2: Label not in training * -3: Closest cluster > epsilon """ # Get unique samples X, y, inverse_result = unique_2d(X, y) #################################################################### # Compute vectors # #################################################################### # Compute vectors vectors, mask = self.attended_context( X = X, y = y, threshold = self.threshold, iterations = iterations, batch_size = batch_size, verbose = verbose, ) # Initialise result result = np.full(vectors.shape[0], -4, dtype=float) #################################################################### # Find closest known sequences # #################################################################### # Group sequences by individual events events = group_by(y[mask].squeeze(1).cpu().numpy()) # Add verbosity, if necessary if verbose: events = tqdm(events, desc="Predicting ") # Loop over all events for event, indices in events: ############################################################ # Case - unknown event # ############################################################ # If event is not in training set, set to -2 if event not in self.tree: result[indices] = -2 continue ############################################################ # Case - known event # ############################################################ # Get vectors for given event vectors_ = vectors[indices] # Get unique vectors - optimizes computation time vectors_, inverse, _ = sp_unique(vectors_) # Get closest cluster distance, neighbours = self.tree[event].query( X = vectors_.toarray(), return_distance = True, dualtree = vectors_.shape[0] >= 1e3, # Optimization ) # Get neighbour indices neighbours = self.tree[event].get_arrays()[1][neighbours][:, 0] # Compute neighbour scores scores = np.asarray([ self.labels[event][neighbour] for neighbour in neighbours ]) ############################################################ # Set result, based on epsilon # ############################################################ # Set resulting indices result[indices] = np.where( distance[:, 0] <= self.eps, # Check if closest cluster > eps scores, # If so, assign actual score -3, # Else, closest cluster > eps, -3 )[inverse] #################################################################### # Add non-confident events # #################################################################### result_ = np.full(X.shape[0], -1, dtype=float) result_[mask.cpu().numpy()] = result result = result_ # Return result return result[inverse_result.cpu().numpy()]
[docs] def fit_predict(self, X, y, scores, iterations = 100, batch_size = 1024, strategy = "max", NO_SCORE = -1, verbose = False, ): """Fit Interpreter with samples and labels and return the predictions of the same samples after running them through the Interpreter. Parameters ---------- X : torch.Tensor of shape=(n_samples, seq_length) Input context to cluster. y : torch.Tensor of shape=(n_samples, 1) Events to cluster. scores : array-like of float, shape=(n_samples,) Scores for each sample in cluster. iterations : int, default=100 Number of iterations for query. batch_size : int, default=1024 Size of batch for query. strategy : string (max|min|avg), default=max Strategy to use for computing scores per cluster based on scores of individual events. Currently available options are: - max: Use maximum score of any individual event in a cluster. - min: Use minimum score of any individual event in a cluster. - avg: Use average score of any individual event in a cluster. NO_SCORE : float, default=-1 Score to indicate that no score was given to a sample and that the value should be ignored for computing the cluster score. The NO_SCORE value will also be given to samples that do not belong to a cluster. verbose : boolean, default=False If True, prints achieved speedup of clustering algorithm. Returns ------- result : np.array of shape=(n_samples,) Predicted maliciousness score. Positive scores are maliciousness scores. A score of 0 means we found a match that was not malicious. Special cases: * -1: Not confident enough for prediction * -2: Label not in training * -3: Closest cluster > epsilon """ # Run fit and predict sequentially return self.fit( X = X, y = y, scores = scores, iterations = iterations, batch_size = batch_size, strategy = strategy, NO_SCORE = NO_SCORE, verbose = verbose, ).predict( X = X, y = y, iterations = 100, batch_size = 1024, verbose = False, )
######################################################################## # Clustering # ########################################################################
[docs] def cluster(self, X, y, iterations=100, batch_size=1024, verbose=False): """Cluster contexts in X for same output event y. Parameters ---------- X : torch.Tensor of shape=(n_samples, seq_length) Input context to cluster. y : torch.Tensor of shape=(n_samples, 1) Events to cluster. iterations : int, default=100 Number of iterations for query. batch_size : int, default=1024 Size of batch for query. verbose : boolean, default=False If True, prints achieved speedup of clustering algorithm. Returns ------- clusters : np.array of shape=(n_samples,) Clusters per input sample. """ #################################################################### # Represent context as vector # #################################################################### # Get optimized vectors vectors, mask = self.attended_context( X = X, y = y, threshold = self.threshold, iterations = iterations, batch_size = batch_size, verbose = verbose, ) #################################################################### # Group sequences by event # #################################################################### # Group sequences for clustering per event type indices_y = group_by( X = y[mask].squeeze(1).cpu().numpy(), key = lambda x: x.data.tobytes(), ) # Add verbosity if necessary if verbose: indices_y = tqdm(indices_y, desc="Clustering ") #################################################################### # Cluster events # #################################################################### # Initialise result for confident samples result = np.full(mask.sum(), -1, dtype=int) # Loop over each event for event, context_mask in indices_y: # Compute clusters per event clusters = self.dbscan.dbscan( X = vectors[context_mask], eps = self.eps, min_samples = self.min_samples, verbose = False, ) # Add offset to clusters to ensure unique identifiers per event clusters[clusters != -1] += max(0, result.max() + 1) # Set resulting clusters result[context_mask] = clusters #################################################################### # Add non-confident clusters # #################################################################### # Set clusters to -1 by default, i.e., if not confident clusters = np.full(mask.shape[0], -1, dtype=int) # Add confident clusters clusters[mask.cpu().numpy()] = result #################################################################### # Store in object # #################################################################### # Store clusters self.clusters = clusters # Store vectors self.vectors = vectors # Store events self.events = y.reshape(-1).cpu().numpy() # Return clusters return clusters
######################################################################## # Manual scoring # ########################################################################
[docs] def score(self, scores, verbose=False): """Assigns score to clustered samples. Parameters ---------- scores : array-like of shape=(n_samples,) Scores of individual samples. verbose : boolean, default=False If True, print progress. Returns ------- self : self Returns self """ # Cast scores to numpy array scores = np.asarray(scores) ################################################################ # Perform checks # ################################################################ # Check if scores have same shape as clusters if scores.shape != self.clusters.shape: raise ValueError( "Shape of scores {} did not match shape of clusters {}".format( scores.shape, self.clusters.shape, )) # Check if score for each cluster are equal for cluster, indices in group_by(self.clusters): if np.unique(scores[indices]).shape[0] != 1: raise ValueError( "Cluster {} contains different scores. Please use the " "Interpreter.score_clusters function to assign the same " "score to all samples in a cluster.".format(cluster) ) ################################################################ # Assign scores # ################################################################ # Retrieve scores for clustered events only scores = scores[self.clusters != -1] # Compute clustered events clustered_events = group_by(self.events[self.clusters != -1]) # If verbose, add printing if verbose: clustered_events = tqdm(clustered_events, desc="Scoring") # Loop over all clustered events for event, indices in clustered_events: # Get relevant vectors for given event vectors = self.vectors[indices] # Get unique vectors - optimizes computation time vectors, inverse, _ = sp_unique(vectors) # Compute KDTree for vectors self.tree[event] = KDTree(vectors.toarray(), p=1) # Compute scores for given tree indices self.labels[event] = dict() score = scores[indices] data, index_tree, _, _ = self.tree[event].get_arrays() _, index_vector = zip(*group_by(inverse)) assert np.all(data == vectors.toarray()) for index, mapping in zip(index_tree, index_vector): self.labels[event][index] = score[mapping].max() # Return self return self
[docs] def score_clusters(self, scores, strategy="max", NO_SCORE=-1): """Compute score per cluster based on individual scores and given strategy. Parameters ---------- scores : array-like of float, shape=(n_samples,) Scores for each sample in cluster. strategy : string (max|min|avg), default=max Strategy to use for computing scores per cluster based on scores of individual events. Currently available options are: - max: Use maximum score of any individual event in a cluster. - min: Use minimum score of any individual event in a cluster. - avg: Use average score of any individual event in a cluster. NO_SCORE : float, default=-1 Score to indicate that no score was given to a sample and that the value should be ignored for computing the cluster score. The NO_SCORE value will also be given to samples that do not belong to a cluster. Returns ------- scores : np.array of shape=(n_samples) Scores for individual sequences computed using clustering strategy. All datapoints within a cluster are guaranteed to have the same score. """ # Cast scores to numpy array scores = np.asarray(scores) # Initialise result result = np.full(scores.shape[0], NO_SCORE, dtype=float) # Check if scores are same shape as clusters if scores.shape != self.clusters.shape: raise ValueError( "Scores and stored clusters should have the same shape, but " "instead we found '{}' scores and '{}' cluster entries".format( scores .shape, self.clusters.shape, )) # Group by clusters for cluster, indices in group_by(self.clusters): # Skip "no cluster" cluster if cluster == -1: continue # Get relevant scores scores_ = scores[indices] scores_ = scores_[scores_ != NO_SCORE] # Raise error in case scores cannot be computed because of NO_SCORE if scores_.shape[0] == 0: raise ValueError( "Cannot compute cluster score for cluster '{}'. All " "sequences in this cluster have been assigned score " "NO_SCORE == {}.".format(cluster, NO_SCORE) ) # Apply strategy if strategy == "max": score = scores_.max() elif strategy == "min": score = scores_.min() elif strategy == "avg": score = scores_.mean() else: raise NotImplementedError( "Unknown strategy: '{}'".format(strategy) ) # Add score to result result[indices] = score # Return result return result
######################################################################## # Computing total attention per contextual event # ########################################################################
[docs] def vectorize(self, X, attention, size): """Compute the total attention for each event in the context. The resulting vector can be used to compare sequences. Parameters ---------- X : torch.Tensor of shape=(n_samples, sequence_length, input_dim) Context events to vectorize. attention : torch.Tensor of shape=(n_samples, sequence_length) Attention for each event. size : int Total number of possible events, determines the vector size. Returns ------- result : scipy.sparse.csc_matrix of shape=(n_samples, n) Sparse vector representing each context. """ # Initialise result result = sp.csc_matrix((X.shape[0], size)) range = np.arange(X.shape[0], dtype=int) # Create vectors for i, events in enumerate(torch.unbind(X, dim=1)): result += sp.csc_matrix( (attention[:, i].detach().cpu().numpy(), (range, events.cpu().numpy())), shape=(X.shape[0], size) ) # Return result return result
[docs] def attended_context(self, X, y, threshold = 0.2, iterations = 100, batch_size = 1024, verbose = False, ): """Get vectors representing context after the attention query. Parameters ---------- X : torch.Tensor of shape=(n_samples, seq_length) Input context to cluster. y : torch.Tensor of shape=(n_samples, 1) Events to cluster. threshold : float, default=0.2 Minimum confidence required for creating a vector representing the context. iterations : int, default=100 Number of iterations for query. batch_size : int, default=1024 Size of batch for query. verbose : boolean, default=False If True, prints achieved speedup of clustering algorithm. Returns ------- vectors : scipy.sparse.csc_matrix of shape=(n_samples, dim_vector) Sparse vectors representing each context with a confidence >= threshold. mask : np.array of shape=(n_samples,) Boolean array of masked vectors. True where input has confidence >= threshold, False otherwise. """ #################################################################### # Optimize attention # #################################################################### logger.info("attended_context: Optimize attention") # Get optimal confidence confidence, attention = self.attention_query( X = X, y = y, iterations = iterations, batch_size = batch_size, verbose = verbose, ) # Check where confidence is above threshold mask = confidence >= threshold logger.info("attended_context: Optimize attention finished") #################################################################### # Create vectors (total attention for each event) # #################################################################### logger.info("attended_context: Create vectors") # Perform vectorization vectors = self.vectorize( X = X[mask], attention = attention[mask], size = self.features, ) # Round attention to 4 decimal places (for quicker analysis) vectors = np.round(vectors, decimals=4) logger.info("attended_context: Create vectors finished") #################################################################### # Return result # #################################################################### # Return result return vectors, mask
######################################################################## # Attention Query # ########################################################################
[docs] def attention_query(self, X, y, iterations=100, batch_size=1024, verbose=False): """Compute optimal attention for given context X. Parameters ---------- X : array-like of type=int and shape=(n_samples, context_size) Input context of events, same as input to fit and predict. y : array-like of type=int and shape=(n_samples,) Observed event. iterations : int, default=100 Number of iterations to perform for optimization of actual event. batch_size : int, default=1024 Batch size of items to optimize. verbose : boolean, default=False If True, prints progress. Returns ------- confidence : torch.Tensor of shape=(n_samples,) Resulting confidence levels in y. attention : torch.Tensor of shape=(n_samples,) Optimal attention for predicting event y. """ # Get unique values X, y, inverse = unique_2d(X, y) # Perform query confidence, attention, _ = self.context_builder.query( X = X, y = y, iterations = iterations, batch_size = batch_size, verbose = verbose, ) # Compute confidence of y confidence = confidence[torch.arange(y.shape[0]), y.squeeze(1)] # Return confidence and attention return confidence[inverse], attention[inverse]
######################################################################## # I/O methods # ######################################################################## def to_dict(self): """Return a pickle-compatible dictionary representation of the interpreter. Returns ------- result : dict() JSON-compatible dictionary representation of the (trained) interpreter. """ logger.info("to_dict") return { # Object parameters 'features' : self.features, 'eps' : self.eps, 'min_samples': self.min_samples, 'threshold' : self.threshold, # Stored entries 'clusters': self.clusters, 'vectors' : self.vectors, 'events' : self.events, # Trained features 'tree' : self.tree, 'labels' : self.labels, } @classmethod def from_dict(cls, dictionary, context_builder=None): """Load the interpreter from the given dictionary. Parameters ---------- dictionary : dict() Dictionary containing state information of the interpreter to load. context_builder : ContextBuilder, optional If given, use the given ContextBuilder for loading the Interpreter. Returns ------- interpreter : Interpreter Interpreter, constructed from dictionary. """ logger.info("from_dict") # Set context_builder if given separately if context_builder is not None: dictionary['context_builder'] = context_builder # List of required features features = { # ContextBuilder 'context_builder': None, # Interpreter parameters 'features' : 100, 'eps' : 0.1, 'min_samples' : 5, 'threshold' : 0.2, # Stored entries 'clusters': np.zeros(0), 'vectors' : np.zeros((0, 100)), 'events' : np.zeros(0), 'tree' : dict(), 'labels' : dict(), } # Throw warning if dictionary does not contain values for feature, default in features.items(): # Throw warning if feature not available if feature not in dictionary: # Throw warning warnings.warn( "Loading interpreter from dictionary, required feature '{}'" " not in dictionary. Defaulting to default '{}'".format( feature, default )) # Set default value dictionary[feature] = default # Create new instance with given features result = cls( context_builder= dictionary.get('context_builder'), features = dictionary.get('features') , eps = dictionary.get('eps'), min_samples = dictionary.get('min_samples'), threshold = dictionary.get('threshold'), ) result.clusters = dictionary.get('clusters') result.vectors = dictionary.get('vectors') result.events = dictionary.get('events') result.tree = dictionary.get('tree') result.labels = dictionary.get('labels') # Return result return result
[docs] def save(self, outfile): """Save model to output file. Parameters ---------- outfile : string File to output model. """ logger.info("save to {}".format(outfile)) # Save to output file with open(outfile, 'wb') as outfile: pickle.dump(self.to_dict(), outfile)
[docs] @classmethod def load(cls, infile, context_builder=None): """Load model from input file. Parameters ---------- infile : string File from which to load model. context_builder : ContextBuilder, optional If given, use the given ContextBuilder for loading the Interpreter. Returns ------- self : self Return self. """ logger.info("load from {}".format(infile)) # Load data with open(infile, 'rb') as infile: return Interpreter.from_dict( dictionary = pickle.load(infile), context_builder = context_builder, )