| import torch
|
| import torch.nn as nn
|
| import torch.nn.functional as F
|
| from transformers import (
|
| AutoModel, AutoConfig, AutoTokenizer,
|
| T5ForConditionalGeneration, T5Config,
|
| AutoModelForSequenceClassification
|
| )
|
| from torchcrf import CRF
|
| import numpy as np
|
|
|
| class BaseHateSpeechModel(nn.Module):
|
| """Base class cho tất cả các mô hình hate speech detection"""
|
| def __init__(self, model_name: str, num_labels: int = 3):
|
| super().__init__()
|
| self.num_labels = num_labels
|
| self.model_name = model_name
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| raise NotImplementedError
|
|
|
| class PhoBERTV2Model(BaseHateSpeechModel):
|
| """PhoBERT-V2 cho hate speech detection"""
|
| def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
| pooled_output = outputs.pooler_output
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class BartPhoModel(BaseHateSpeechModel):
|
| """BART Pho cho hate speech detection"""
|
| def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.d_model, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
| last_hidden_states = outputs.last_hidden_state
|
| pooled_output = last_hidden_states.mean(dim=1)
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class ViSoBERTModel(BaseHateSpeechModel):
|
| """ViSoBERT cho hate speech detection"""
|
| def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
| pooled_output = outputs.pooler_output
|
| else:
|
|
|
| pooled_output = outputs.last_hidden_state.mean(dim=1)
|
|
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class PhoBERTV1Model(BaseHateSpeechModel):
|
| """PhoBERT-V1 cho hate speech detection"""
|
| def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
| pooled_output = outputs.pooler_output
|
| else:
|
| pooled_output = outputs.last_hidden_state.mean(dim=1)
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class MBERTModel(BaseHateSpeechModel):
|
| """mBERT (bert-base-multilingual-cased) cho hate speech detection"""
|
| def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
| pooled_output = outputs.pooler_output
|
| else:
|
| pooled_output = outputs.last_hidden_state.mean(dim=1)
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class SPhoBERTModel(BaseHateSpeechModel):
|
| """SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection"""
|
| def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
| pooled_output = outputs.pooler_output
|
| else:
|
| pooled_output = outputs.last_hidden_state.mean(dim=1)
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class ViHateT5Model(BaseHateSpeechModel):
|
| """ViHateT5 cho hate speech detection"""
|
| def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = T5Config.from_pretrained(model_name)
|
| self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.d_model, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
| last_hidden_states = outputs.last_hidden_state
|
| pooled_output = last_hidden_states.mean(dim=1)
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class XLMRModel(BaseHateSpeechModel):
|
| """XLM-R Large cho hate speech detection"""
|
| def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
| pooled_output = outputs.pooler_output
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class RoBERTaGRUModel(BaseHateSpeechModel):
|
| """RoBERTa + GRU Hybrid model"""
|
| def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256):
|
| super().__init__(model_name, num_labels)
|
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
| self.gru = nn.GRU(
|
| input_size=self.config.hidden_size,
|
| hidden_size=hidden_size,
|
| num_layers=2,
|
| batch_first=True,
|
| dropout=0.1,
|
| bidirectional=True
|
| )
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
| hidden_states = outputs.last_hidden_state
|
|
|
|
|
| gru_output, _ = self.gru(hidden_states)
|
|
|
|
|
| pooled_output = gru_output.mean(dim=1)
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class TextCNNModel(BaseHateSpeechModel):
|
| """TextCNN cho hate speech detection"""
|
| def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3,
|
| num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5):
|
| super().__init__("textcnn", num_labels)
|
| self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
| self.convs = nn.ModuleList([
|
| nn.Conv2d(1, num_filters, (filter_size, embedding_dim))
|
| for filter_size in filter_sizes
|
| ])
|
| self.dropout = nn.Dropout(dropout)
|
| self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
|
|
| embedded = self.embedding(input_ids)
|
|
|
|
|
| embedded = embedded.unsqueeze(1)
|
|
|
|
|
| conv_outputs = []
|
| for conv in self.convs:
|
| conv_out = F.relu(conv(embedded))
|
| conv_out = conv_out.squeeze(3)
|
| pooled = F.max_pool1d(conv_out, conv_out.size(2))
|
| pooled = pooled.squeeze(2)
|
| conv_outputs.append(pooled)
|
|
|
|
|
| concatenated = torch.cat(conv_outputs, dim=1)
|
|
|
|
|
| concatenated = self.dropout(concatenated)
|
| logits = self.classifier(concatenated)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class BiLSTMModel(BaseHateSpeechModel):
|
| """BiLSTM cho hate speech detection"""
|
| def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
|
| num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5):
|
| super().__init__("bilstm", num_labels)
|
| self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
| self.lstm = nn.LSTM(
|
| input_size=embedding_dim,
|
| hidden_size=hidden_size,
|
| num_layers=num_layers,
|
| batch_first=True,
|
| dropout=dropout if num_layers > 1 else 0,
|
| bidirectional=True
|
| )
|
| self.dropout = nn.Dropout(dropout)
|
| self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
|
|
| embedded = self.embedding(input_ids)
|
|
|
|
|
| lstm_output, (hidden, cell) = self.lstm(embedded)
|
|
|
|
|
|
|
| pooled_output = lstm_output.mean(dim=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| pooled_output = self.dropout(pooled_output)
|
| logits = self.classifier(pooled_output)
|
|
|
| loss = None
|
| if labels is not None:
|
| loss_fn = nn.CrossEntropyLoss()
|
| loss = loss_fn(logits, labels)
|
| return {"loss": loss, "logits": logits}
|
|
|
| class BiLSTMCRFModel(BaseHateSpeechModel):
|
| """BiLSTM + CRF cho hate speech detection"""
|
| def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
|
| num_labels: int = 3, num_layers: int = 2):
|
| super().__init__("bilstm-crf", num_labels)
|
| self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
| self.lstm = nn.LSTM(
|
| input_size=embedding_dim,
|
| hidden_size=hidden_size,
|
| num_layers=num_layers,
|
| batch_first=True,
|
| dropout=0.1,
|
| bidirectional=True
|
| )
|
| self.dropout = nn.Dropout(0.1)
|
| self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
| self.crf = CRF(num_labels, batch_first=True)
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
|
|
| embedded = self.embedding(input_ids)
|
|
|
|
|
| lstm_output, _ = self.lstm(embedded)
|
| lstm_output = self.dropout(lstm_output)
|
|
|
|
|
| emissions = self.classifier(lstm_output)
|
|
|
| if labels is not None:
|
|
|
| if labels.dim() == 1:
|
|
|
| labels = labels.unsqueeze(1).expand(-1, emissions.size(1))
|
| loss = -self.crf(emissions, labels, mask=attention_mask.bool())
|
| return {"loss": loss, "logits": emissions}
|
| else:
|
|
|
| predictions = self.crf.decode(emissions, mask=attention_mask.bool())
|
| return {"loss": None, "logits": emissions, "predictions": predictions}
|
|
|
| class EnsembleModel(BaseHateSpeechModel):
|
| """Ensemble model kết hợp các mô hình deep learning"""
|
| def __init__(self, models: list, num_labels: int = 3, weights: list = None):
|
| super().__init__("ensemble", num_labels)
|
| self.models = nn.ModuleList(models)
|
| self.num_models = len(models)
|
| self.weights = weights if weights else [1.0] * self.num_models
|
| self.weights = torch.tensor(self.weights, dtype=torch.float32)
|
| self.weights = self.weights / self.weights.sum()
|
|
|
| def forward(self, input_ids, attention_mask, labels=None):
|
| all_logits = []
|
| total_loss = 0
|
|
|
| for i, model in enumerate(self.models):
|
| model_output = model(input_ids, attention_mask, labels)
|
| all_logits.append(model_output["logits"])
|
|
|
| if model_output["loss"] is not None:
|
| total_loss += self.weights[i] * model_output["loss"]
|
|
|
|
|
| ensemble_logits = torch.zeros_like(all_logits[0])
|
| for i, logits in enumerate(all_logits):
|
| ensemble_logits += self.weights[i] * logits
|
|
|
| return {
|
| "loss": total_loss if total_loss > 0 else None,
|
| "logits": ensemble_logits
|
| }
|
|
|
| def get_model(model_name: str, num_labels: int = 3, **kwargs):
|
| """
|
| Factory function để tạo model dựa trên tên
|
|
|
| Args:
|
| model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5",
|
| "xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble")
|
| num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE)
|
| **kwargs: Các tham số bổ sung cho model
|
|
|
| Returns:
|
| Model instance
|
| """
|
| model_mapping = {
|
| "phobert-v1": PhoBERTV1Model,
|
| "phobert-v2": PhoBERTV2Model,
|
| "bartpho": BartPhoModel,
|
| "visobert": ViSoBERTModel,
|
| "vihate-t5": ViHateT5Model,
|
| "xlm-r": XLMRModel,
|
| "mbert": MBERTModel,
|
| "sphobert": SPhoBERTModel,
|
| "roberta-gru": RoBERTaGRUModel,
|
| "textcnn": TextCNNModel,
|
| "bilstm": BiLSTMModel,
|
| "bilstm-crf": BiLSTMCRFModel,
|
| "ensemble": EnsembleModel
|
| }
|
|
|
| if model_name not in model_mapping:
|
| raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}")
|
|
|
| model_class = model_mapping[model_name]
|
| return model_class(num_labels=num_labels, **kwargs) |