Chapter 2.8: Decision Trees and Random Forest

The final methods we are going to look at for our classification chapter are still quite popular in machine learning. They are decision trees and random forest. We'll first focus on decision trees. We can visualise these models as a tree (from graph theory) where each node of the tree splits into two based on a decision rule. The basic algorithm follows four steps, 1. start at the root node, 2. split the dataset based on a feature that provides the best seperation (compute the impurity measure across all classes and use the one that is the highest to create a split), 3. recursively repeat on each child node and 4. stop when all samples at a node are classified as the same class or a stopping criterion is met (e.g. max depth, min samples). In order to choose the best split we can use rules based on information theory. One is the Gini index and the other is entropy which has previously been discussed. The Gini index is:

\(\text{Gini}=1-\sum_{i}p_{i}^{2}\)

And the entropy is:

\(H=-\sum_{i}p_{i}\log_{2}(p_{i})\)

Where \(p_{i}\) are the class probabilities.


# ========= IMPURITY FUNCTIONS =========

def gini(y):
    counts = np.bincount(y)
    probs = counts / len(y)
    return 1 - np.sum(probs ** 2)

def entropy(y):
    counts = np.bincount(y)
    probs = counts / len(y)
    return -np.sum([p * math.log2(p) for p in probs if p > 0])

# ========= TREE NODE =========

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature        # index of feature to split on
        self.threshold = threshold    # threshold value
        self.left = left              # left subtree
        self.right = right            # right subtree
        self.value = value            # value if it's a leaf node

    def is_leaf(self):
        return self.value is not None

# ========= DECISION TREE CLASSIFIER =========

class DecisionTreeClassifierScratch:
    def __init__(self, max_depth=5, criterion='gini', max_features=None):
        self.max_depth = max_depth
        self.criterion = gini if criterion == 'gini' else entropy
        self.root = None
        self.max_features = max_features  # number of features to consider at each split

    def fit(self, X, y):
        self.n_features = X.shape[1]
        self.root = self._grow_tree(X, y, depth=0)

    def _grow_tree(self, X, y, depth):
        num_samples, num_features = X.shape
        num_labels = len(set(y))

        if depth >= self.max_depth or num_labels == 1 or num_samples == 0:
            return Node(value=self._majority_vote(y))

        best_feat, best_thresh = self._best_split(X, y)

        if best_feat is None:
            return Node(value=self._majority_vote(y))

        left_idx = X[:, best_feat] <= best_thresh
        right_idx = X[:, best_feat] > best_thresh

        left = self._grow_tree(X[left_idx], y[left_idx], depth + 1)
        right = self._grow_tree(X[right_idx], y[right_idx], depth + 1)

        return Node(feature=best_feat, threshold=best_thresh, left=left, right=right)

    def _best_split(self, X, y):
        best_gain = -1
        best_feat, best_thresh = None, None
        impurity_before = self.criterion(y)

        # Select subset of features randomly if max_features is set
        if self.max_features is None or self.max_features > self.n_features:
            features_indices = range(self.n_features)
        else:
            features_indices = np.random.choice(self.n_features, self.max_features, replace=False)

        for feature_index in features_indices:
            thresholds = np.unique(X[:, feature_index])
            for thresh in thresholds:
                left_idx = X[:, feature_index] <= thresh
                right_idx = X[:, feature_index] > thresh

                if len(y[left_idx]) == 0 or len(y[right_idx]) == 0:
                    continue

                impurity_left = self.criterion(y[left_idx])
                impurity_right = self.criterion(y[right_idx])
                p = len(y[left_idx]) / len(y)

                impurity_after = p * impurity_left + (1 - p) * impurity_right
                gain = impurity_before - impurity_after

                if gain > best_gain:
                    best_gain = gain
                    best_feat = feature_index
                    best_thresh = thresh

        return best_feat, best_thresh

    def _majority_vote(self, y):
        return Counter(y).most_common(1)[0][0]

    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])
  

Training this model on the Iris dataset we get an accuracy of 95.56% for Gini as an impuriy measure and 91.11% using entropy as an impurity measure.

This is a single decision tree, we can create a random forest (currently a very popular model for it's simplicity and interpretability) by training multiple decision trees on different subsets of our dataset and making the final prediction on the majority vote from all our decision trees we can get a classification. This is our first ensemble learning method (majority classification from different models). Implementing this into code we get:


class RandomForestScratch:
    def __init__(self, n_estimators=10, max_depth=5, criterion='gini', max_features=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.criterion = criterion
        self.max_features = max_features
        self.trees = []

    def fit(self, X, y):
        self.trees = []
        n_samples = X.shape[0]

        for _ in range(self.n_estimators):
            # Bootstrap sample (sample with replacement)
            indices = np.random.choice(n_samples, n_samples, replace=True)
            X_sample = X[indices]
            y_sample = y[indices]

            # Create and train tree
            tree = DecisionTreeClassifierScratch(
                max_depth=self.max_depth,
                criterion=self.criterion,
                max_features=self.max_features
            )
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def predict(self, X):
        # Collect predictions from all trees
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        # Majority vote along axis 0 (over trees)
        y_pred = []

        for sample_preds in tree_preds.T:  # transpose so samples are rows
            vote = Counter(sample_preds).most_common(1)[0][0]
            y_pred.append(vote)

        return np.array(y_pred)
  

Training this on the Iris dataset we now get 100%. So a consensus vote from multiple models improves performance (ensemble method).