首页 > 代码库 > reading from files

reading from files

如果有图会很好理解,最近太忙,以后再加吧
  1. #首先有一个需要读取的文件名列表
  2. #然后将文件名列表通过函数string_input_producer放进文件名队列。
  3. #有时候因为数据量太大,需要把他们放进不同的tfrecord文件中
  4. filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
  5. #对不同格式的文件有不同的reader
  6. reader = tf.TextLineReader()
  7. #通过reader的read函数extract a record from a file whose name is in the queue,
  8. #如果该文件中所有记录都被抽取完,dequeue这个filename,参考readerbase
  9. #read()返回下一个record
  10. key, value = reader.read(filename_queue)
  11. # decoded record,decode方式和文件内部record格式相关,然后拼接成需要的格式
  12. record_defaults = [[1], [1], [1], [1], [1]]
  13. col1, col2, col3, col4, col5 = tf.decode_csv(
  14. value, record_defaults=record_defaults)
  15. features = tf.stack([col1, col2, col3, col4])
  16. with tf.Session() as sess:
  17. # Start populating the filename queue.
  18. coord = tf.train.Coordinator()
  19. threads = tf.train.start_queue_runners(coord=coord)
  20. for i in range(1200):
  21. # Retrieve a single instance:
  22. example, label = sess.run([features, col5])
  23. coord.request_stop()
  24. coord.join(threads)

提到queue就不得不提两个帮助多线程异步的类:tf.train.Coordinator和tf.train.QueueRunner;
  • tf.train.Coordinator:控制多线程,使其同时结束。
  • tf.train.QueueRunner:包含一些enqueue op,为其create一些线程,每一个op都在一个线程上运行。

coordinator

Coordinator方法:should_stop,request_stop,join
    1. # Thread body: loop until the coordinator indicates a stop was requested.
    2. # If some condition becomes true, ask the coordinator to stop.
    3. def MyLoop(coord):
    4. while not coord.should_stop():#should_stop返回true or false,表示线程是否该结束
    5. ...do something...
    6. if ...some condition...:
    7. coord.request_stop()#当某些条件发生时,一个进程request_stop,其他进程因为should_stop返回true而终止
    1. # Main thread: create a coordinator.
    2. coord = tf.train.Coordinator()
    1. # Create 10 threads that run ‘MyLoop()‘
    2. threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
    1. # Start the threads and wait for all of them to stop.
    2. for t in threads:
    3. t.start()
    4. coord.join(threads)

QueueRunner

  1. example = ...ops to create one example...
  2. # Create a queue, and an op that enqueues examples one at a time in the queue.
  3. #区别于filename queue,这是example queue。可以是接着上面读数据解析然后放进这个queue
  4. queue = tf.RandomShuffleQueue(...)
  5. enqueue_op = queue.enqueue(example)#定义入队操作
  6. # Create a training graph that starts by dequeuing a batch of examples.
  7. inputs = queue.dequeue_many(batch_size)
  8. train_op = ...use ‘inputs‘ to build the training part of the graph...
  9. # Create a queue runner that will run 4 threads in parallel to enqueue
  10. # examples.
  11. #QueueRunner的构造函数,queuerunner是为一个queue的入队操作多线程化服务的,
  12. #第二个参数是入队操作列表
  13. qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
  14. # Launch the graph.
  15. sess = tf.Session()
  16. # Create a coordinator, launch the queue runner threads.
  17. coord = tf.train.Coordinator()
  18. #queuerunner为queue创造多线程,并且把这些线程的结束交由coordinator管理
  19. enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
  20. # Run the training loop, controlling termination with the coordinator.
  21. for step in xrange(1000000):
  22. if coord.should_stop():
  23. break
  24. sess.run(train_op)
  25. # When done, ask the threads to stop.
  26. coord.request_stop()
  27. # And wait for them to actually do it.
  28. coord.join(enqueue_threads)
未完待续。。。


null


reading from files