TensorFlow实现RNN循环神经网络

yipeiwu_com5年前Python基础

RNN(recurrent neural Network)循环神经网络

主要用于自然语言处理(nature language processing,NLP)

RNN主要用途是处理和预测序列数据

RNN广泛的用于 语音识别、语言模型、机器翻译

RNN的来源就是为了刻画一个序列当前的输出与之前的信息影响后面节点的输出

RNN 是包含循环的网络,允许信息的持久化

RNN会记忆之前的信息,并利用之前的信息影响后面节点的输出

RNN的隐藏层之间的节点是有相连的,隐藏层的输入不仅仅包括输入层的输出,还包括上一时刻隐藏层的输出。

RNN会对于每一个时刻的输入结合当前模型的状态给出一个输出

RNN理论上被看作同一个神经网络结构被无限复制的结果,目前RNN无法做到真正的无限循环,一般以循环体展开。

RNN图:


RNN最擅长的问题是与时间序列相关的

RNN对于一个序列数据,可以将序列上不同时刻的数据依次输入循环神经网络的输入层,而输出可以是对序列中下一个时刻的预测,也可以是对当前时刻信息的处理结果。

RNN 的关键点之一就是他们可以用来连接先前的信息到当前的任务上

展开后的RNN


循环体网络中的参数在不同的时刻也是共享的

RNN的状态是通过一个向量来表示,这个向量的维度也称为RNN隐藏层的大小

假如该向量为h,输入为x,激活函数为tanh,则有如图:


前向传播的计算过程:


理论上RNN可以支持任意长度的序列,但是如果序列太长会导致优化时实现梯度消失的问题,一般会设置最大长度,超长会对其截断。

 代码实现简单的RNN:

import numpy as np 
 
# 定义RNN的参数。 
X = [1,2] 
state = [0.0, 0.0] 
w_cell_state = np.asarray([[0.1, 0.2], [0.3, 0.4]]) 
w_cell_input = np.asarray([0.5, 0.6]) 
b_cell = np.asarray([0.1, -0.1]) 
w_output = np.asarray([[1.0], [2.0]]) 
b_output = 0.1 
 
# 执行前向传播过程。 
for i in range(len(X)): 
  before_activation = np.dot(state, w_cell_state) + X[i] * w_cell_input + b_cell 
  state = np.tanh(before_activation) 
  final_output = np.dot(state, w_output) + b_output 
  print ("before activation: ", before_activation) 
  print ("state: ", state) 
  print ("output: ", final_output) 

LSTM(long short-term memory)长短时记忆网络: 

LSTM解决了RNN不支持长期依赖的问题,使其大幅度提升记忆时长

RNN被成功应用的关键就是LSTM。

LSTM是一种拥有三个“门”结构的特殊网络结构。


粉色的圈代表 pointwise 的操作,诸如向量的和,而黄色的矩阵就是学习到的神经网络层。合在一起的线表示向量的连接,分开的线表示内容被复制,然后分发到不同的位置。

LSTM核心思想:

LSTM 的关键就是细胞状态,水平线在图上方贯穿运行。

细胞状态类似于传送带。直接在整个链上运行,只有一些少量的线性交互,信息在上面流传保持不变会很容易。


LSTM 有通过精心设计的称作为“门”的结构来去除或者增加信息到细胞状态的能力

“门”是一种让信息选择式通过的方法,包含一个sigmoid 神经网络层和一个 pointwise (按位做乘法)的操作。

之所以称之为“门”,因为使用Sigmoid 作为激活函数的层会输出 0 到 1 之间的数值,描述每个部分有多少信息量可以通过这个结构。

0 代表“不许任何量通过”,1 就指“允许任意量通过”!

LSTM的公式:


代码实现:

import tensorflow as tf 
 
 
# 定义一个LSTM结构 
lstm = rnn_cell.BasicLSTMCell(lstm_hidden_size) 
 
# 将LSTM中的状态初始化为全0数组,每次使用一个batch的训练样本 
state = lstm.zero_state(batch_size,tf.float32) 
 
# 定义损失函数 
loss = 0.0 
 
# 规定一个最大序列长度 
for i in range(num_steps): 
  # 复用之前定义的变量 
  if i > 0: 
    tf.get_variable_scope().reuse_variables() 
  # 将当前输入和前一时刻的状态传入定义的LSTM结构,得到输出和更新后的状态 
  lstm_output, state = lstm(current_input,state) 
   
  # 将当前时刻的LSTM结构的输出传入一个全连接层得到最后的输出。 
  final_output = fully_connectd(lstm_output) 
   
  # 计算当前时刻输出的损失 
  loss += calc_loss(final_output,expected_output) 

双向循环神经网络

经典的循环神经网络中的状态传输是从前往后单向的,然而当前时刻的输出不仅和之前的状态有关系,也和之后的状态相关。

双向循环神经网络能解决状态单向传输的问题

双向循环神经网络是由两个循环神经网络反向上下叠加在一起组成两个循环神经网络的状态共同决定输出

也就是时间t时的输出不仅仅取决于过去的记忆,也同样取决于后面发生的事情。


深层(双向)循环神经网络

深层循环神经网络似于双向循环神经网络,只不过是每个时长内都有多层

深层循环神经网络有更强的学习能力

深层循环神经网络在每个时刻上将循环体结构复制多次,和卷积神经网络类似,每一层循环体中的参数是一致的,不同层的参数可以不同


TensorFlow中使用MultiRNNCell实现深层循环神经网络中每一个时刻的前向传播过程。剩下的步骤和RNN的构建步骤相同。

RNN中的dropout

通过dropout方法可以上卷积神经网络更加健壮,类似的用在RNN上也能取得同样的效果。

类似卷积神经网络,RNN只在最后的全连接层使用dropout。

RNN一般只在不同层循环体结构中使用dropout,不在同层循环体使用。

同一时刻t中,不同循环体之间会使用dropout

在TensorFlow中,使用DropoutWrapper类实现dropout功能。

通过input_keep_prob参数控制输入的dropout概率

通过output_keep_prob参数控制输出的dropout概率

TensorFlow样例实现RNN语言模型

代码:

import numpy as np 
import tensorflow as tf 
import reader 
  
DATA_PATH = "../datasets/PTB/data" 
HIDDEN_SIZE = 200 # 隐藏层规模 
NUM_LAYERS = 2 # 深层RNN中的LSTM结构的层数 
VOCAB_SIZE = 10000 # 单词标识符个数 
 
LEARNING_RATE = 1.0 # 学习速率 
TRAIN_BATCH_SIZE = 20 # 训练数据大小 
TRAIN_NUM_STEP = 35 # 训练数据截断长度 
 
# 测试时不需要截断 
EVAL_BATCH_SIZE = 1 # 测试数据大小 
EVAL_NUM_STEP = 1 # 测试数据截断长度 
NUM_EPOCH = 2 # 使用训练数据轮数 
KEEP_PROB = 0.5 # 节点不被dropout 
MAX_GRAD_NORM = 5 # 控制梯度膨胀参数 
 
 
# 定义一个类来描述模型结构。 
class PTBModel (object): 
  def __init__(self, is_training, batch_size, num_steps): 
 
    self.batch_size = batch_size 
    self.num_steps = num_steps 
 
    # 定义输入层。 
    self.input_data = tf.placeholder (tf.int32, [batch_size, num_steps]) 
    self.targets = tf.placeholder (tf.int32, [batch_size, num_steps]) 
 
    # 定义使用LSTM结构及训练时使用dropout。 
    lstm_cell = tf.contrib.rnn.BasicLSTMCell (HIDDEN_SIZE) 
    if is_training: 
      lstm_cell = tf.contrib.rnn.DropoutWrapper (lstm_cell, output_keep_prob=KEEP_PROB) 
    cell = tf.contrib.rnn.MultiRNNCell ([lstm_cell] * NUM_LAYERS) 
 
    # 初始化最初的状态。 
    self.initial_state = cell.zero_state (batch_size, tf.float32) 
    embedding = tf.get_variable ("embedding", [VOCAB_SIZE, HIDDEN_SIZE]) 
 
    # 将原本单词ID转为单词向量。 
    inputs = tf.nn.embedding_lookup (embedding, self.input_data) 
 
    if is_training: 
      inputs = tf.nn.dropout (inputs, KEEP_PROB) 
 
    # 定义输出列表。 
    outputs = [] 
    state = self.initial_state 
    with tf.variable_scope ("RNN"): 
      for time_step in range (num_steps): 
        if time_step > 0: tf.get_variable_scope ().reuse_variables () 
        cell_output, state = cell (inputs[:, time_step, :], state) 
        outputs.append (cell_output) 
    output = tf.reshape (tf.concat (outputs, 1), [-1, HIDDEN_SIZE]) 
    weight = tf.get_variable ("weight", [HIDDEN_SIZE, VOCAB_SIZE]) 
    bias = tf.get_variable ("bias", [VOCAB_SIZE]) 
    logits = tf.matmul (output, weight) + bias 
 
    # 定义交叉熵损失函数和平均损失。 
    loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example ( 
      [logits], 
      [tf.reshape (self.targets, [-1])], 
      [tf.ones ([batch_size * num_steps], dtype=tf.float32)]) 
    self.cost = tf.reduce_sum (loss) / batch_size 
    self.final_state = state 
 
    # 只在训练模型时定义反向传播操作。 
    if not is_training: return 
    trainable_variables = tf.trainable_variables () 
 
    # 控制梯度大小,定义优化方法和训练步骤。 
    grads, _ = tf.clip_by_global_norm (tf.gradients (self.cost, trainable_variables), MAX_GRAD_NORM) 
    optimizer = tf.train.GradientDescentOptimizer (LEARNING_RATE) 
    self.train_op = optimizer.apply_gradients (zip (grads, trainable_variables)) 
 
 
 
# 使用给定的模型model在数据data上运行train_op并返回在全部数据上的perplexity值 
def run_epoch(session, model, data, train_op, output_log, epoch_size): 
  total_costs = 0.0 
  iters = 0 
  state = session.run(model.initial_state) 
 
  # 训练一个epoch。 
  for step in range(epoch_size): 
    x, y = session.run(data) 
    cost, state, _ = session.run([model.cost, model.final_state, train_op], 
                    {model.input_data: x, model.targets: y, model.initial_state: state}) 
    total_costs += cost 
    iters += model.num_steps 
 
    if output_log and step % 100 == 0: 
      print("After %d steps, perplexity is %.3f" % (step, np.exp(total_costs / iters))) 
  return np.exp(total_costs / iters) 
 
 
# 定义主函数并执行 
def main(): 
  train_data, valid_data, test_data, _ = reader.ptb_raw_data(DATA_PATH) 
 
  # 计算一个epoch需要训练的次数 
  train_data_len = len(train_data) 
  train_batch_len = train_data_len // TRAIN_BATCH_SIZE 
  train_epoch_size = (train_batch_len - 1) // TRAIN_NUM_STEP 
 
  valid_data_len = len(valid_data) 
  valid_batch_len = valid_data_len // EVAL_BATCH_SIZE 
  valid_epoch_size = (valid_batch_len - 1) // EVAL_NUM_STEP 
 
  test_data_len = len(test_data) 
  test_batch_len = test_data_len // EVAL_BATCH_SIZE 
  test_epoch_size = (test_batch_len - 1) // EVAL_NUM_STEP 
 
  initializer = tf.random_uniform_initializer(-0.05, 0.05) 
  with tf.variable_scope("language_model", reuse=None, initializer=initializer): 
    train_model = PTBModel(True, TRAIN_BATCH_SIZE, TRAIN_NUM_STEP) 
 
  with tf.variable_scope("language_model", reuse=True, initializer=initializer): 
    eval_model = PTBModel(False, EVAL_BATCH_SIZE, EVAL_NUM_STEP) 
 
  # 训练模型。 
  with tf.Session() as session: 
    tf.global_variables_initializer().run() 
 
    train_queue = reader.ptb_producer(train_data, train_model.batch_size, train_model.num_steps) 
    eval_queue = reader.ptb_producer(valid_data, eval_model.batch_size, eval_model.num_steps) 
    test_queue = reader.ptb_producer(test_data, eval_model.batch_size, eval_model.num_steps) 
 
    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(sess=session, coord=coord) 
 
    for i in range(NUM_EPOCH): 
      print("In iteration: %d" % (i + 1)) 
      run_epoch(session, train_model, train_queue, train_model.train_op, True, train_epoch_size) 
 
      valid_perplexity = run_epoch(session, eval_model, eval_queue, tf.no_op(), False, valid_epoch_size) 
      print("Epoch: %d Validation Perplexity: %.3f" % (i + 1, valid_perplexity)) 
 
    test_perplexity = run_epoch(session, eval_model, test_queue, tf.no_op(), False, test_epoch_size) 
    print("Test Perplexity: %.3f" % test_perplexity) 
 
    coord.request_stop() 
    coord.join(threads) 
 
if __name__ == "__main__": 
  main() 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python实现输出某区间范围内全部素数的方法

Python实现输出某区间范围内全部素数的方法

本文实例讲述了Python实现输出某区间范围内全部素数的方法。分享给大家供大家参考,具体如下: # -*- coding: utf-8 -*- # 简述:区间范围101-200 #...

网易2016研发工程师编程题 奖学金(python)

本文为大家分享了网易2016研发工程师编程题,供大家参考,具体内容如下 ''' [编程题] 奖学金 时间限制:1秒 空间限制:32768K 小v今年有n门课,每门都有考试,为了拿到奖学金...

Python中关于字符串对象的一些基础知识

Python的字符串被划分为不可变序列的类别,意味着这些字符串所包含的字符存在从左至右的位置顺序,并且它们不可以在本地进行修改。 基本操作 字符串可以通过+操作符进行合并,可以使用*运算...

Python(PyS60)实现简单语音整点报时

本文实例为大家分享了python语音整点报时的具体代码,供大家参考,具体内容如下 主要的技术特殊点在于PyS60的定时器最多只能定2147秒。在手机上直接写的。 import e...

Python将视频或者动态图gif逐帧保存为图片的方法

本文是基于opencv将视频和动态图gif保存为图像帧。可以根据输入视频格式的不同,修改第21行。        对动图的处理...