Cómo prefetch de datos utilizando una costumbre función de python en tensorflow


Estoy tratando de prefetch datos de entrenamiento para ocultar la latencia de E/S. Me gustaría escribir código Python personalizado que cargue datos desde el disco y preprocese los datos (por ejemplo, agregando una ventana de contexto). En otras palabras, un hilo hace preprocesamiento de datos y el otro hace entrenamiento. ¿Es esto posible en TensorFlow?

Actualización: Tengo un ejemplo de trabajo basado en el ejemplo de @mrry.

import numpy as np
import tensorflow as tf
import threading

BATCH_SIZE = 5
TRAINING_ITERS = 4100

feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])

sess = tf.Session()

def load_and_enqueue(sess, enqueue_op, coord):
  with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
    while not coord.should_stop():
      feature_array = np.fromfile(feature_file, np.float32, 128)
      if feature_array.shape[0] == 0:
        print('reach end of file, reset using seek(0,0)')
        feature_file.seek(0,0)
        label_file.seek(0,0)
        continue
      label_value = np.fromfile(label_file, np.float32, 128)

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()

for i in range(TRAINING_ITERS):
  sum = sess.run(c)
  print('train_iter='+str(i))
  print(sum)

coord.request_stop()
coord.join([t])
Author: read Read, 2016-01-04

2 answers

Este es un caso de uso común, y la mayoría de las implementaciones utilizan las colas de TensorFlow para desacoplar el código de preprocesamiento del código de entrenamiento. Hay un tutorial sobre cómo usar colas, pero los pasos principales son los siguientes:

  1. Defina una cola, q, que almacenará en búfer los datos preprocesados. TensorFlow soporta el simple tf.FIFOQueue que produce elementos en el orden en que fueron encolerizados, y los más avanzados tf.RandomShuffleQueue que produce elementos en un orden aleatorio. Un elemento de cola es una tupla de uno o más tensores (que pueden tener diferentes tipos y formas). Todas las colas admiten un solo elemento (enqueue, dequeue) y lote (enqueue_many, dequeue_many) operaciones, pero para usar las operaciones por lotes debe especificar las formas de cada tensor en un elemento de cola al construir la cola.

  2. Cree un subgrafo que enqueúe los elementos preprocesados en la cola. Una manera de hacer esto sería definir algunos tf.placeholder() ops para tensores correspondientes a un solo ejemplo de entrada, luego pasarlos a q.enqueue(). (Si su preprocesamiento produce un lote a la vez, debe usar q.enqueue_many() en su lugar.) También puede incluir TensorFlow ops en este subgrafo.

  3. Construya un subgrafo que realice entrenamiento. Esto se verá como un gráfico TensorFlow regular, pero obtendrá su entrada llamando q.dequeue_many(BATCH_SIZE).

  4. Comienza tu sesión.

  5. Crear uno o más hilos que ejecutan su lógica de preprocesamiento, luego ejecutan el enqueue op, alimentando los datos preprocesados. Usted puede encontrar el tf.train.Coordinator y tf.train.QueueRunner clases de utilidad útil para esto.

  6. Ejecute su gráfico de entrenamiento (optimizador, etc.) como normal.

EDIT: Aquí hay una función simple load_and_enqueue() y un fragmento de código para comenzar:

# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])

# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])

# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...

sess = tf.Session()

def load_and_enqueue():
  with open(...) as feature_file, open(...) as label_file:
    while True:
      feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
      if not feature_array:
        return
      label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()

for _ in range(TRAINING_EPOCHS):
  sess.run(train_op)
 51
Author: mrry,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-01 16:18:13

En otras palabras, un hilo hace preprocesamiento de datos y el otro hace entrenamiento. ¿Es esto posible en TensorFlow?

Sí, lo es. la solución de mrry funciona, pero existe más simple.

Obteniendo datos

tf.py_func envuelve una función python y la usa como operador TensorFlow. Así que podemos cargar los datos en sess.run() cada vez. El problema con este enfoque es que los datos se cargan durante sess.run() a través del hilo principal.

Un mínimo ejemplo:

def get_numpy_tensor():
  return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)

Un ejemplo más complejo:

def get_numpy_tensors():
  # Load data from the disk into numpy arrays.
  input = np.array([[1,2],[3,4]], dtype=np.float32)
  target = np.int32(1)
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target

sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2

Prefetching de datos en otro hilo

Para poner en cola nuestros datos en otro hilo (para que sess.run() no tenga que esperar los datos), podemos usar tf.train.batch() en nuestros operadores de tf.py_func().

Un ejemplo mínimo:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.

Podemos omitir el argumento shapes si tensorflow_tensor tiene su forma especificada:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.

Un ejemplo más complejo:

input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
  input = np.random.rand(*input_shape).astype(np.float32)
  target = np.random.randint(10, dtype=np.int32)
  print('f', end='')
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets

sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
  numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
  assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
  print('r', end='')

# Prints `fffffrrffrfrffrffrffrffrffrffrf`.

En caso de que get_numpy_tensor() devuelva un lote de tensores, entonces tf.train.batch(..., enqueue_many=True) ayudará.

 7
Author: AlexP,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-07-21 14:28:21