Cómo prefetch de datos utilizando una costumbre función de python en tensorflow
Estoy tratando de prefetch datos de entrenamiento para ocultar la latencia de E/S. Me gustaría escribir código Python personalizado que cargue datos desde el disco y preprocese los datos (por ejemplo, agregando una ventana de contexto). En otras palabras, un hilo hace preprocesamiento de datos y el otro hace entrenamiento. ¿Es esto posible en TensorFlow?
Actualización: Tengo un ejemplo de trabajo basado en el ejemplo de @mrry.
import numpy as np
import tensorflow as tf
import threading
BATCH_SIZE = 5
TRAINING_ITERS = 4100
feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])
q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])
label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])
sess = tf.Session()
def load_and_enqueue(sess, enqueue_op, coord):
with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
while not coord.should_stop():
feature_array = np.fromfile(feature_file, np.float32, 128)
if feature_array.shape[0] == 0:
print('reach end of file, reset using seek(0,0)')
feature_file.seek(0,0)
label_file.seek(0,0)
continue
label_value = np.fromfile(label_file, np.float32, 128)
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()
for i in range(TRAINING_ITERS):
sum = sess.run(c)
print('train_iter='+str(i))
print(sum)
coord.request_stop()
coord.join([t])
2 answers
Este es un caso de uso común, y la mayoría de las implementaciones utilizan las colas de TensorFlow para desacoplar el código de preprocesamiento del código de entrenamiento. Hay un tutorial sobre cómo usar colas, pero los pasos principales son los siguientes:
Defina una cola,
q
, que almacenará en búfer los datos preprocesados. TensorFlow soporta el simpletf.FIFOQueue
que produce elementos en el orden en que fueron encolerizados, y los más avanzadostf.RandomShuffleQueue
que produce elementos en un orden aleatorio. Un elemento de cola es una tupla de uno o más tensores (que pueden tener diferentes tipos y formas). Todas las colas admiten un solo elemento (enqueue
,dequeue
) y lote (enqueue_many
,dequeue_many
) operaciones, pero para usar las operaciones por lotes debe especificar las formas de cada tensor en un elemento de cola al construir la cola.Cree un subgrafo que enqueúe los elementos preprocesados en la cola. Una manera de hacer esto sería definir algunos
tf.placeholder()
ops para tensores correspondientes a un solo ejemplo de entrada, luego pasarlos aq.enqueue()
. (Si su preprocesamiento produce un lote a la vez, debe usarq.enqueue_many()
en su lugar.) También puede incluir TensorFlow ops en este subgrafo.Construya un subgrafo que realice entrenamiento. Esto se verá como un gráfico TensorFlow regular, pero obtendrá su entrada llamando
q.dequeue_many(BATCH_SIZE)
.Comienza tu sesión.
Crear uno o más hilos que ejecutan su lógica de preprocesamiento, luego ejecutan el enqueue op, alimentando los datos preprocesados. Usted puede encontrar el
tf.train.Coordinator
ytf.train.QueueRunner
clases de utilidad útil para esto.Ejecute su gráfico de entrenamiento (optimizador, etc.) como normal.
EDIT: Aquí hay una función simple load_and_enqueue()
y un fragmento de código para comenzar:
# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])
# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])
q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])
# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])
feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...
sess = tf.Session()
def load_and_enqueue():
with open(...) as feature_file, open(...) as label_file:
while True:
feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
if not feature_array:
return
label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()
for _ in range(TRAINING_EPOCHS):
sess.run(train_op)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-01 16:18:13
En otras palabras, un hilo hace preprocesamiento de datos y el otro hace entrenamiento. ¿Es esto posible en TensorFlow?
Sí, lo es. la solución de mrry funciona, pero existe más simple.
Obteniendo datos
tf.py_func
envuelve una función python y la usa como operador TensorFlow. Así que podemos cargar los datos en sess.run()
cada vez. El problema con este enfoque es que los datos se cargan durante sess.run()
a través del hilo principal.
Un mínimo ejemplo:
def get_numpy_tensor():
return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)
Un ejemplo más complejo:
def get_numpy_tensors():
# Load data from the disk into numpy arrays.
input = np.array([[1,2],[3,4]], dtype=np.float32)
target = np.int32(1)
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target
sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2
Prefetching de datos en otro hilo
Para poner en cola nuestros datos en otro hilo (para que sess.run()
no tenga que esperar los datos), podemos usar tf.train.batch()
en nuestros operadores de tf.py_func()
.
Un ejemplo mínimo:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.
Podemos omitir el argumento shapes
si tensorflow_tensor
tiene su forma especificada:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.
Un ejemplo más complejo:
input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
input = np.random.rand(*input_shape).astype(np.float32)
target = np.random.randint(10, dtype=np.int32)
print('f', end='')
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.
tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets
sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
print('r', end='')
# Prints `fffffrrffrfrffrffrffrffrffrffrf`.
En caso de que get_numpy_tensor()
devuelva un lote de tensores, entonces tf.train.batch(..., enqueue_many=True)
ayudará.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-07-21 14:28:21