# Imports
import logging
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
# Custom imports
from deepcase.context_builder import ContextBuilder
from deepcase.interpreter import Interpreter
from deepcase.context_builder.utils import unique_2d
# Set logger
logger = logging.getLogger(__name__)
[docs]class DeepCASE(object):
[docs] def __init__(self,
features,
max_length = 10,
hidden_size = 128,
eps = 0.1,
min_samples = 5,
threshold = 0.2,
):
"""Analyse security events with respect to contextual machine behaviour.
Note
----
When an Interpreter is trained, it heavily depends on the
ContextBuilder used during training. Therefore, we **strongly**
suggest **not** to manually change the context_builder attribute,
without retraining the interpreter of the DeepCASE object.
Parameters
----------
features : int
Number of different possible security events.
max_length : int, default=10
Maximum length of context window as number of events.
hidden_size : int, default=128
Size of hidden layer in sequence to sequence prediction.
This parameter determines the complexity of the model and its
prediction power. However, high values will result in slower
training and prediction times.
eps : float, default=0.1
Epsilon used for determining maximum distance between clusters.
min_samples : int, default=5
Minimum number of required samples per cluster.
threshold : float, default=0.2
Minimum required confidence in fingerprint before using it in
training clusters.
"""
# Initialise Context Builder from parameters
self.context_builder = ContextBuilder(
input_size = features,
output_size = features,
max_length = max_length,
hidden_size = hidden_size,
)
# Initialise Interpreter from parameters
self.interpreter = Interpreter(
context_builder = self.context_builder,
features = features,
eps = eps,
min_samples = min_samples,
threshold = threshold,
)
########################################################################
# Fit method #
########################################################################
[docs] def fit(self,
# Input data
X,
y,
scores,
# ContextBuilder-specific parameters
epochs = 10,
batch_size = 128,
learning_rate = 0.01,
optimizer = optim.SGD,
teach_ratio = 0.5,
# Interpreter-specific parameters
iterations = 100,
query_batch_size = 1024,
strategy = "max",
NO_SCORE = -1,
# Verbosity level
verbose = True,
):
"""Fit DeepCASE with given data.
This method is provided as a wrapper and is equivalent to calling:
- context_builder.fit() and
- interpreter.fit()
in the given order.
Parameters
----------
X : array-like of type=int and shape=(n_samples, context_size)
Input context to train with.
y : array-like of type=int and shape=(n_samples, n_future_events)
Sequences of target events.
scores : array-like of float, shape=(n_samples,)
Scores for each sample in cluster.
epochs : int, default=10
Number of epochs to train with.
batch_size : int, default=128
Batch size to use for training.
learning_rate : float, default=0.01
Learning rate to use for training.
optimizer : optim.Optimizer, default=torch.optim.SGD
Optimizer to use for training.
teach_ratio : float, default=0.5
Ratio of sequences to train including labels.
iterations : int, default=100
Number of iterations for query.
query_batch_size : int, default=1024
Size of batch for query.
strategy : string (max|min|avg), default=max
Strategy to use for computing scores per cluster based on scores
of individual events. Currently available options are:
- max: Use maximum score of any individual event in a cluster.
- min: Use minimum score of any individual event in a cluster.
- avg: Use average score of any individual event in a cluster.
NO_SCORE : float, default=-1
Score to indicate that no score was given to a sample and that
the value should be ignored for computing the cluster score.
The NO_SCORE value will also be given to samples that do not
belong to a cluster.
verbose : boolean, default=True
If True, prints progress.
Returns
-------
self : self
Returns self.
"""
# Fit the ContextBuilder
self.context_builder.fit(
X = X,
y = y,
epochs = epochs,
batch_size = batch_size,
learning_rate = learning_rate,
optimizer = optimizer,
teach_ratio = teach_ratio,
verbose = verbose,
)
# Fit the Interpreter
self.interpreter.fit(
X = X,
y = y,
scores = scores,
iterations = iterations,
batch_size = query_batch_size,
strategy = strategy,
NO_SCORE = NO_SCORE,
verbose = verbose,
)
# Returns self
return self
########################################################################
# Predict method #
########################################################################
[docs] def predict(self, X, y, iterations=100, batch_size=1024, verbose=False):
"""Predict maliciousness of context samples.
Parameters
----------
X : torch.Tensor of shape=(n_samples, seq_length)
Input context for which to predict maliciousness.
y : torch.Tensor of shape=(n_samples, 1)
Events for which to predict maliciousness.
iterations : int, default=100
Iterations used for optimization.
batch_size : int, default=1024
Batch size used for optimization.
verbose : boolean, default=False
If True, print progress.
Returns
-------
result : np.array of shape=(n_samples,)
Predicted maliciousness score.
Positive scores are maliciousness scores.
A score of 0 means we found a match that was not malicious.
Special cases:
* -1: Not confident enough for prediction
* -2: Label not in training
* -3: Closest cluster > epsilon
"""
# Return the prediction of the interpreter
return self.interpreter.predict(
X = X,
y = y,
iterations = iterations,
batch_size = batch_size,
verbose = verbose,
)
########################################################################
# Fit/predict methods #
########################################################################
[docs] def fit_predict(self,
# Input data
X,
y,
scores,
# ContextBuilder-specific parameters
epochs = 10,
batch_size = 128,
learning_rate = 0.01,
optimizer = optim.SGD,
teach_ratio = 0.5,
# Interpreter-specific parameters
iterations = 100,
query_batch_size = 1024,
strategy = "max",
NO_SCORE = -1,
# Verbosity level
verbose = True,
):
"""Fit DeepCASE with given data and predict that same data.
This method is provided as a wrapper and is equivalent to calling:
- self.fit() and
- self.predict()
in the given order.
Parameters
----------
X : array-like of type=int and shape=(n_samples, context_size)
Input context to train with.
y : array-like of type=int and shape=(n_samples, n_future_events)
Sequences of target events.
scores : array-like of float, shape=(n_samples,)
Scores for each sample in cluster.
epochs : int, default=10
Number of epochs to train with.
batch_size : int, default=128
Batch size to use for training.
learning_rate : float, default=0.01
Learning rate to use for training.
optimizer : optim.Optimizer, default=torch.optim.SGD
Optimizer to use for training.
teach_ratio : float, default=0.5
Ratio of sequences to train including labels.
iterations : int, default=100
Number of iterations for query.
query_batch_size : int, default=1024
Size of batch for query.
strategy : string (max|min|avg), default=max
Strategy to use for computing scores per cluster based on scores
of individual events. Currently available options are:
- max: Use maximum score of any individual event in a cluster.
- min: Use minimum score of any individual event in a cluster.
- avg: Use average score of any individual event in a cluster.
NO_SCORE : float, default=-1
Score to indicate that no score was given to a sample and that
the value should be ignored for computing the cluster score.
The NO_SCORE value will also be given to samples that do not
belong to a cluster.
verbose : boolean, default=True
If True, prints progress.
Returns
-------
result : np.array of shape=(n_samples,)
Predicted maliciousness score.
Positive scores are maliciousness scores.
A score of 0 means we found a match that was not malicious.
Special cases:
* -1: Not confident enough for prediction
* -2: Label not in training
* -3: Closest cluster > epsilon
"""
# Call fit and predict in sequence and return result
return self.fit(
X = X,
y = y,
scores = scores,
epochs = epochs,
batch_size = batch_size,
learning_rate = learning_rate,
optimizer = optimizer,
teach_ratio = teach_ratio,
iterations = iterations,
query_batch_size = query_batch_size,
strategy = strategy,
NO_SCORE = NO_SCORE,
verbose = verbose,
).predict(
X = X,
y = y,
iterations = iterations,
batch_size = query_batch_size,
verbose = verbose,
)
########################################################################
# Cast to device #
########################################################################
def to(self, device):
"""Cast DeepCASE to a specific device.
This method acts as a wrapper for the underlying context_builder.
Parameters
----------
device : string
String describing the device, e.g., "cpu", "cuda", or "cuda:0".
Returns
-------
self : self
Returns self
"""
# Cast ContextBuilder to device
self.context_builder = self.context_builder.to(device)
# Return self
return self
########################################################################
# I/O methods #
########################################################################
[docs] def save(self, outfile):
"""Save DeepCASE model to output file.
Parameters
----------
outfile : string
Path to output file in which to store DeepCASE model.
"""
# Save to output file
torch.save({
"context_builder": self.context_builder.state_dict(),
"interpreter" : self.interpreter .to_dict(),
}, outfile)
[docs] @classmethod
def load(cls, infile, device=None):
"""Load DeepCASE model from input file.
Parameters
----------
infile : string
Path to input file from which to load DeepCASE model.
device : string, optional
If given, cast DeepCASE automatically to device.
"""
# Load model
model = torch.load(infile, map_location=device)
# Extract ContextBuilder and Interpreter from loaded model
state_dict = model['context_builder']
interpreter = model['interpreter']
# Recreate ContextBuilder
input_size = state_dict.get('embedding.weight').shape[0]
output_size = state_dict.get('decoder_event.out.weight').shape[0]
hidden_size = state_dict.get('embedding.weight').shape[1]
num_layers = 1 # TODO
max_length = state_dict.get('decoder_attention.attn.weight').shape[0]
bidirectional = state_dict.get('decoder_attention.attn.weight').shape[1] // hidden_size != num_layers
LSTM = False # TODO
# Create ContextBuilder
context_builder = ContextBuilder(
input_size = input_size,
output_size = output_size,
hidden_size = hidden_size,
num_layers = num_layers,
max_length = max_length,
bidirectional = bidirectional,
LSTM = LSTM,
)
# Set trained parameters
context_builder.load_state_dict(state_dict)
# Recreate interpreter
interpreter = Interpreter.from_dict(
dictionary = interpreter,
context_builder = context_builder,
)
# Rebuild DeepCASE
result = cls(features = interpreter.features)
# Set loaded ContextBuilder and Interpreter
result.context_builder = context_builder
result.interpreter = interpreter
# Cast to device if necessary
if device is not None: result = result.to(device)
# Return loaded DeepCASE model
return result