import tensorflow as tf from tensorflow.keras.layers import ( # type: ignore Input, Dense, GRU, LSTM, Bidirectional, MultiHeadAttention, BatchNormalization, Dropout, Concatenate, TimeDistributed, RepeatVector, Add, Lambda, LayerNormalization, GaussianNoise, Reshape ) from tensorflow.keras.models import Model # type: ignore from tensorflow.keras.regularizers import l2 # type: ignore # 自定义 Transformer Encoder 层 # 使用自定义层替代 Lambda 层 @tf.keras.utils.register_keras_serializable(package="Custom", name="ExpandDimension") class ExpandDimension(tf.keras.layers.Layer): def call(self, inputs): return tf.expand_dims(inputs, axis=1) @tf.keras.utils.register_keras_serializable(package="Custom", name="ConcatenateTimesteps") class ConcatenateTimesteps(tf.keras.layers.Layer): def call(self, inputs): return tf.concat(inputs, axis=1) @tf.keras.utils.register_keras_serializable(package="Custom", name="TransformerEncoder") class TransformerEncoder(tf.keras.layers.Layer): def __init__(self, num_heads, embed_dim, ff_dim, rate=0.1, **kwargs): super(TransformerEncoder, self).__init__(**kwargs) self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) # 将 key_dim 设置为 embed_dim self.ffn = tf.keras.Sequential( [Dense(ff_dim, activation="relu"), Dense(embed_dim)] ) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate) def build(self, input_shape): query_shape = input_shape # 输入形状为 (batch_size, seq_len, embed_dim) key_shape = input_shape # 假定 key 和 query 形状一致 value_shape = input_shape # 假定 value 和 key 形状一致 # 调用 attention 的 build 方法 self.attention.build(query_shape, value_shape) # 构建 FFN 和归一化层 self.ffn.build(input_shape) self.layernorm1.build(input_shape) self.layernorm2.build(input_shape) self.built = True def call(self, inputs, training): attn_output, attn_weights = self.attention(inputs, inputs, return_attention_scores=True) attn_output = self.dropout1(attn_output, training=training) attn_output += tf.random.normal(tf.shape(attn_output), mean=0.0, stddev=0.01) # 加入噪声 out1 = self.layernorm1(inputs + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) return self.layernorm2(out1 + ffn_output), attn_weights def get_config(self): config = super(TransformerEncoder, self).get_config() config.update({ "num_heads": self.attention.num_heads, "embed_dim": self.attention.key_dim, "ff_dim": self.ffn.layers[0].units, "rate": self.dropout1.rate }) return config @classmethod def from_config(cls, config): return cls(**config) def build_model_1118(word2vec_embedding_dim, pos_tag_dim, entity_dim, time_series_input_shape): import tensorflow as tf from tensorflow.keras.layers import ( # type: ignore Input, Dense, GRU, LSTM, Bidirectional, MultiHeadAttention, BatchNormalization, Dropout, Concatenate, TimeDistributed, RepeatVector, Add, Lambda, LayerNormalization, GaussianNoise, Reshape ) from tensorflow.keras.models import Model # type: ignore from tensorflow.keras.regularizers import l2 # type: ignore # 1. 文本特征处理 text_input = Input(shape=(word2vec_embedding_dim,), name='text_input') text_dense = Dense(256, activation='relu', kernel_regularizer=l2(0.01), name='text_dense')(text_input) text_batch_norm = BatchNormalization(name='text_batch_norm')(text_dense) text_output = Dropout(0.3, name='text_dropout')(text_batch_norm) # 2. POS 特征处理 pos_input = Input(shape=(pos_tag_dim,), name='pos_input') pos_dense = Dense(64, activation='relu', kernel_regularizer=l2(0.01), name='pos_dense')(pos_input) pos_batch_norm = BatchNormalization(name='pos_batch_norm')(pos_dense) pos_output = Dropout(0.3, name='pos_dropout')(pos_batch_norm) # 3. 命名实体识别特征处理 entity_input = Input(shape=(entity_dim,), name='entity_input') entity_dense = Dense(64, activation='relu', kernel_regularizer=l2(0.01), name='entity_dense')(entity_input) entity_batch_norm = BatchNormalization(name='entity_batch_norm')(entity_dense) entity_output = Dropout(0.3, name='entity_dropout')(entity_batch_norm) # 4. 情感分析特征处理 sentiment_input = Input(shape=(1,), name='sentiment_input') sentiment_dense = Dense(256, activation='relu', kernel_regularizer=l2(0.01), name='sentiment_dense')(sentiment_input) sentiment_batch_norm = BatchNormalization(name='sentiment_batch_norm')(sentiment_dense) sentiment_output = Dropout(0.3, name='sentiment_dropout')(sentiment_batch_norm) # 5. 时间序列特征处理(大盘数据) def process_index(index_input, index_name, training): # 第一个双向 LSTM 层,用于初步提取时间序列特征 x = Bidirectional(LSTM(256, return_sequences=True), name=f'{index_name}_bidirectional_lstm_1')(index_input) # 第二个双向 LSTM 层,进一步挖掘时间序列的深层特征 x = Bidirectional(LSTM(128, return_sequences=True), name=f'{index_name}_bidirectional_lstm_2')(x) # Transformer Encoder,用于捕捉全局的时间步间关系 x, attn_weights = TransformerEncoder(num_heads=4, embed_dim=256, ff_dim=512)(x, training=training) # 投影到一个固定维度 x = Dense(128, activation='relu', name=f'{index_name}_project')(x) # 调整为 128 维 # 批量归一化,防止梯度消失或爆炸 x = BatchNormalization(name=f'{index_name}_batch_norm')(x) # Dropout,防止过拟合 x = Dropout(0.3, name=f'{index_name}_dropout')(x) return x, attn_weights index_inx_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_INX') index_dj_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_DJ') index_ixic_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_IXIC') index_ndx_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_NDX') index_inx_processed, _ = process_index(index_inx_input, 'index_inx', training=True) index_dj_processed, _ = process_index(index_dj_input, 'index_dj', training=True) index_ixic_processed, _ = process_index(index_ixic_input, 'index_ixic', training=True) index_ndx_processed, _ = process_index(index_ndx_input, 'index_ndx', training=True) # 6. 时间序列特征处理(个股数据) stock_input = Input(shape=(30, time_series_input_shape[1]), name='stock_input') stock_gru = Bidirectional(GRU(256, return_sequences=True), name='stock_bidirectional_gru')(stock_input) stock_attention = MultiHeadAttention(num_heads=4, key_dim=64, name='stock_attention')(stock_gru, stock_gru) stock_dense = Dense(128, activation='relu', name='stock_dense')(stock_attention) stock_batch_norm = BatchNormalization(name='stock_batch_norm')(stock_dense) stock_dropout = Dropout(0.3, name='stock_dropout')(stock_batch_norm) stock_processed = stock_dropout # 7. 静态特征融合 static_features = Concatenate(name='static_features_concatenate')([ text_output * 2, pos_output, entity_output, sentiment_output * 2 ]) # 8. 合并所有特征 combined_features = Concatenate(name='combined_features')([ index_inx_processed, index_dj_processed, index_ixic_processed, index_ndx_processed, stock_processed ]) # 9. 静态特征扩展与时间序列结合 static_features_expanded = RepeatVector(30, name='static_features_expanded')(static_features) combined_with_static = Concatenate(name='combined_with_static')([ combined_features, static_features_expanded ]) # 10. 解码器 combined_dense = TimeDistributed(Dense(256, activation='relu', kernel_regularizer=l2(0.01)), name='combined_dense')(combined_with_static) combined_dropout = Dropout(0.3, name='combined_dropout')(combined_dense) decoder_gru = GRU(128, return_sequences=False, name='decoder_gru')(combined_dropout) decoder_gru = Dropout(0.2)(decoder_gru) # Dropout decoder_gru = GaussianNoise(0.02)(decoder_gru) # GaussianNois # 独立预测未来 3 个时间步 future_day_1 = Dense(128, activation='relu', name='future_day_1')(decoder_gru) future_day_2 = Dense(128, activation='relu', name='future_day_2')(decoder_gru) future_day_3 = Dense(128, activation='relu', name='future_day_3')(decoder_gru) future_day_1_expanded = ExpandDimension(name='future_day_1_expanded')(future_day_1) future_day_2_expanded = ExpandDimension(name='future_day_2_expanded')(future_day_2) future_day_3_expanded = ExpandDimension(name='future_day_3_expanded')(future_day_3) future_reshaped = ConcatenateTimesteps(name='future_reshaped')( [future_day_1_expanded, future_day_2_expanded, future_day_3_expanded] ) # **为每个指数设计独立的输出层** def create_output_layer(input_tensor, name): x = TimeDistributed(Dense(64, activation='relu'), name=f'{name}_dense1')(input_tensor) x = TimeDistributed(Dense(32, activation='relu'), name=f'{name}_dense2')(x) x = Dense(6, activation='linear', name=f'{name}_final_output')(x) return x index_inx_output_final = create_output_layer(future_reshaped, 'index_inx') index_dj_output_final = create_output_layer(future_reshaped, 'index_dj') index_ixic_output_final = create_output_layer(future_reshaped, 'index_ixic') index_ndx_output_final = create_output_layer(future_reshaped, 'index_ndx') stock_output_final = create_output_layer(future_reshaped, 'stock') news_sentiment_loss = Dense(1, activation='linear', name='news_sentiment_output')(text_output) # 构建模型 model = Model( inputs=[ text_input, pos_input, entity_input, sentiment_input, index_inx_input, index_dj_input, index_ixic_input, index_ndx_input, stock_input ], outputs=[ index_inx_output_final, index_dj_output_final, index_ixic_output_final, index_ndx_output_final, stock_output_final ] ) # 优化器与学习率调度 lr_schedule = tf.keras.optimizers.schedules.CosineDecay( initial_learning_rate=0.0005, # 初始学习率降低 decay_steps=10000, alpha=0.1 ) optimizer = tf.keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=0.01) model.compile(optimizer=optimizer, loss=tf.keras.losses.Huber(), metrics=[['mae', 'mse']] * 5) return model