¿Cómo * realmente * leer datos CSV en TensorFlow?


Soy relativamente nuevo en el mundo de TensorFlow, y bastante perplejo por cómo en realidad lea los datos CSV en un tensor de ejemplo/etiqueta utilizable en TensorFlow. El ejemplo del tutorial de TensorFlow sobre la lectura de datos CSV está bastante fragmentado y solo te ayuda a ser capaz de entrenar en datos CSV.

Aquí está mi código que he juntado, basado en ese tutorial CSV:

from __future__ import print_function
import tensorflow as tf

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

filename = "csv_test_data.csv"

# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)

# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)

# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])

print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, col5])
    print(example, label)

  coord.request_stop()
  coord.join(threads)
  print("\ndone loading")

Y aquí hay un breve ejemplo del archivo CSV Estoy cargando-datos bastante básicos - 4 columnas de características y 1 columna de etiquetas:

0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

Todo lo que hace el código anterior es imprimir cada ejemplo del archivo CSV, uno por uno, que, aunque agradable, es bastante inútil para el entrenamiento.

Lo que estoy luchando aquí es cómo convertirías esos ejemplos individuales, cargados uno por uno, en un conjunto de datos de entrenamiento. Por ejemplo, aquí hay un cuaderno en el que estaba trabajando en el curso de Aprendizaje profundo de Udacity. Básicamente quiero tomar los datos CSV estoy de carga, y plop en algo como train_dataset y train_labels:

def reformat(dataset, labels):
  dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
  # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
  labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
  return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

He intentado usar tf.train.shuffle_batch, así, pero inexplicablemente cuelga:

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, colRelevant])
    example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
    print(example, label)

Para resumir, aquí están mis preguntas: {[14]]}

  • ¿Qué me estoy perdiendo de este proceso?
    • Parece que hay una intuición clave que me falta sobre cómo construir correctamente una tubería de entrada.
  • hay una manera de evitar tener que saber la longitud del archivo CSV?
    • Se siente bastante poco elegante tener que saber el número de líneas que desea procesar (la for i in range(file_length) línea de código anterior)

Editar: Tan pronto como Yaroslav señaló que probablemente estaba mezclando partes imperativas y de construcción de gráficos aquí, comenzó a ser más claro. Pude reunir el siguiente código, que creo que está más cerca de lo que se haría normalmente cuando se entrena un modelo de CSV (excluyendo cualquier código de entrenamiento modelo):

from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

def read_from_csv(filename_queue):
  reader = tf.TextLineReader(skip_header_lines=1)
  _, csv_row = reader.read(filename_queue)
  record_defaults = [[0],[0],[0],[0],[0]]
  colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
  features = tf.stack([colHour,colQuarter,colAction,colUser])  
  label = tf.stack([colLabel])  
  return features, label

def input_pipeline(batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)  
  example, label = read_from_csv(filename_queue)
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)

with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  try:
    while not coord.should_stop():
      example_batch, label_batch = sess.run([examples, labels])
      print(example_batch)
  except tf.errors.OutOfRangeError:
    print('Done training, epoch reached')
  finally:
    coord.request_stop()

  coord.join(threads) 
Author: Aaron Lelevier, 2016-05-07

3 answers

Creo que estás mezclando partes imperativas y de construcción de gráficos aquí. La operación tf.train.shuffle_batch crea un nuevo nodo de cola, y se puede usar un solo nodo para procesar todo el conjunto de datos. Así que creo que estás colgando porque creaste un montón de shuffle_batch colas en tu bucle for y no iniciaste corredores de cola para ellos.

El uso normal de la canalización de entrada se ve así:

  1. Agregue nodos como shuffle_batch a la tubería de entrada
  2. (opcional, para evitar gráficos no intencionales modificación) finalizar gráfico

- - - fin de la construcción del gráfico, comienzo de la programación imperativa {

  1. tf.start_queue_runners
  2. while(True): session.run()

Para ser más escalable (para evitar Python GIL), puede generar todos sus datos utilizando la canalización TensorFlow. Sin embargo, si el rendimiento no es crítico, puede conectar una matriz numpy a una canalización de entrada utilizando slice_input_producer. Aquí hay un ejemplo con algunos nodos Print para ver lo que está sucediendo (los mensajes en Print van a stdout cuando el nodo se ejecuta)

tf.reset_default_graph()

num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data

(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()

try:
  while True:
    print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
  print "No more inputs."

Deberías ver algo como esto

[[0 1]
 [2 3]
 [4 5]
 [6 7]
 [8 9]]
[[0 1]
 [2 3]]
[[4 5]
 [6 7]]
No more inputs.

Los números "8, 9" no llenaron el lote completo, por lo que no se produjeron. También tf.Print se imprimen en sys.stdout, por lo que aparecen por separado en la Terminal para mí.

PD: un mínimo de conexión batch a una cola inicializada manualmente está en github issue 2193

Además, para fines de depuración, es posible que desee establecer timeout en su sesión para que su notebook IPython no se cuelgue en la cola vacía dequeues. Utilizo esta función auxiliar para mis sesiones

def create_session():
  config = tf.ConfigProto(log_device_placement=True)
  config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
  config.operation_timeout_in_ms=60000   # terminate on long hangs
  # create interactive session to register a default session
  sess = tf.InteractiveSession("", config=config)
  return sess

Notas de escalabilidad:

  1. tf.constant enlaza la copia de sus datos en el Gráfico. Hay un límite fundamental de 2 GB en el tamaño de la definición del gráfico, por lo que es un límite superior en el tamaño de los datos
  2. Podría sortear ese límite usando v=tf.Variable y guardando los datos allí ejecutando v.assign_op con un tf.placeholder en el lado derecho y alimentando el array numpy al marcador de posición (feed_dict)
  3. Que todavía crea dos copias de datos, por lo que para ahorrar memoria podría hacer su propia versión de slice_input_producer que opera en matrices numpy, y carga filas una a la vez usando feed_dict
 21
Author: Yaroslav Bulatov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-05-07 20:34:29

O puede probar esto, el código carga el conjunto de datos Iris en tensorflow utilizando pandas y numpy y se imprime una salida de una neurona simple en la sesión. Espero que ayude a una comprensión básica.... [No he añadido la forma de una etiqueta de decodificación caliente].

import tensorflow as tf 
import numpy
import pandas as pd
df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None)
d = df.values
l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None)
labels = l.values
data = numpy.float32(d)
labels = numpy.array(l,'str')
#print data, labels

#tensorflow
x = tf.placeholder(tf.float32,shape=(150,5))
x = data
w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32)
y = tf.nn.softmax(tf.matmul(w,x))

with tf.Session() as sess:
    print sess.run(y)
 12
Author: Nagarjun Gururaj,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-06 20:54:32

Puede utilizar el último tf.api de datos:

dataset = tf.contrib.data.make_csv_dataset(filepath)
iterator = dataset.make_initializable_iterator()
columns = iterator.get_next()
with tf.Session() as sess:
   sess.run([iteator.initializer])
 0
Author: Adarsh Kumar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-09-03 13:52:01