Usando ‘data pipes y coroutines’ en Python

El concepto es similar a usar ‘pipes’ en UNIX. Por ejemplo, en UNIX podemos combinar varias herramientas para filtrar los contenidos en un archivo de texto. ¿Que hace el siguiente comando?

# Contar cuantos 'root' hay en el archivo de passwords del servidor Linux
cat /etc/password| egrep -i root

En Python 3 podemos hacer algo como lo siguiente:

#!/usr/bin/env python3
# pipeline.py /etc/passwd root
import sys, re

def grep(expression):
    while True:
        text = (yield) # Espere por el texto enviados por la otra co-rutina
        if re.search(expression, text):
            print(text)

def cat(file, target):
    next(target) # Initialize el otro lado de la tuberia
    with open(file, "r") as fh:
        for line in fh.readlines():
            target.send(line.strip()) # envie la linea a la co-rutina

def main(args):
    cat(args[0], grep(args[1])) # La 'tuberia' se hace de afuera hacia adentro

if __name__ == "__main__":
    main(sys.argv[1:])

Otro ejemplo, vamos a modificar el programa que lee los datos del archivo de datos de DolarToday para filtrar por fecha de inicio, final, valores máximos y mínimos. Note como el uso de los filtros es opcional y si no se aplican entonces retornan todos los datos (para mantener el ejemplo sencillo, removí el código que convierte los datos al formato binario):

#!/usr/bin/env python3
# Simple program to save the 'Dolartoday' extra official dollar rates from CSV to a custom format
# Revisited to use data pipes, couroutines
# @author Jose Vicente Nunez, josevnz@kodegeek.com
import os, sys, datetime, re, gzip, struct
from optparse import OptionParser, OptionValueError
from datetime import datetime

class DollarToday:
    
    __slot__ = ("__date", "__value")
    
    def __init__(self, ddate, value):
        if isinstance(ddate, datetime):
            self.__date = ddate
        else:
            self.__date = datetime.strptime(ddate, "%m-%d-%Y")
        self.__value = float(value)
        assert self.__value > 0.0, "{} for date {} is invalid!".format(value, self.__date)
        
    @property
    def date(self):
        return self.__date
    
    @date.setter
    def date(self, date):
        assert isinstance(date, datetime), "Invalid date {}".format(date)
        self.__date = date
    
    @property
    def value(self):
        return self.__value
    
    def __str__(self):
        return "DollarToday[date={}, value={}]".format(self.__date, self.__value)
    
    def __hash__(self):
        return str(id(self))
            
class DollarCollection(dict):
    
    def values(self):
        for dateId in sorted(self.keys()):
            yield self[dateId]
    
    def items(self):
        for dateId in self.keys():
            yield (dateId, self[dateId])
    
    def __iter__(self):
        for dateId in sorted(super().keys()):
            yield dateId

    def __str__(self):
        return "Records={},\n{}".format(len(self), ",\n".join([str(date) for date in self.values()]))

# Pipe: Read from binary file
def readBinary(file, target):
    next(target)
    FILE_MAGIC = b"DLR\x00"
    FILE_VERSION = b"\x00\x01"
    dollarStruct = struct.Struct(" FILE_VERSION:
          raise "Unsupported file version: {}, expected {}".format(version, FILE_VERSION)
        while True:
          data = fh.read(dollarStruct.size)
          if len(data) == 0:
           break
          numbers = dollarStruct.unpack(data)
          target.send(DollarToday(datetime.fromordinal(numbers[0]), numbers[1]))
    except (Exception) as err:
            raise

# Pipe: Filter elements >= minimum
def min(min, target):
    next(target)
    while True:
        dollar = (yield)
        if dollar.value >= min:
            target.send(dollar)

# Pipe: Filter elements < = max
def max(max, target):
    next(target)
    while True:
        dollar = (yield)
        if dollar.value <= max:
            target.send(dollar)

# Pipe: filter from date
def from_date(fromd, target):
        next(target)
        while True:
            dollar = (yield)
            if dollar.date >= fromd:
                target.send(dollar)

# Pipe: filter from date
def to_date(tod, target):
        next(target)
        while True:
            dollar = (yield)
            if dollar.date < = tod:
                target.send(dollar)

def adder(collection):
    while True:
        dollar = (yield)
        collection[dollar.date] = dollar

def main(options):

    dc = DollarCollection()
    
    pipeline = adder(dc) # Destination of the pipe
    # Add pipes (if needed)
    if options.min != 99999999:
        pipeline = min(options.min, pipeline)
    
    if options.max != -1:
        pipeline = max(options.min, pipeline)

    if options.fromd is not None:
        pipeline = from_date(datetime.strptime(options.fromd, "%Y-%m-%d"), pipeline)
        
    if options.tod is not None:
        pipeline = to_date(datetime.strptime(options.tod, "%Y-%m-%d"), pipeline)

    readBinary(options.report, pipeline) # Start of the pipe that reads the file contents
 
    print("{}".format(dc))

if __name__ == "__main__":
    
    usagetext = """
%prog --report binary.file [--from_date YYYYMMDD] [--to YYYYMMDD] [--min amount] [--max amount]
"""

    op = OptionParser(usage=usagetext)
    op.add_option(
                  "-p", "--report",
                  action="store",
                  dest="report",
                  help="Read the contents from the binary storage and generate a report.")
    op.add_option(
                  "-f", "--from",
                  action="store",
                  dest="fromd",
                  help="Optional filter. Start date yyyy-mm-dd")
    op.add_option(
                  "-t", "--to",
                  action="store",
                  dest="tod",
                  help="Optional filter. End date yyyy-mm-dd")
    op.add_option(
                  "-m", "--min",
                  action="store",
                  dest="min",
                  default=99999999,
                  type="float",
                  help="Optional filter. Minimal amount")
    op.add_option(
                  "-M", "--max",
                  action="store",
                  dest="max",
                  default=-1,
                  type="float",
                  help="Optional filter. Maximum amount")
    
    (options, values) = op.parse_args()
    main(options)

Una salida de ejemplo:

DolarTodayReader.py --report /Users/josevnz/Documents/dolartoday.jose --min 1050 --from 2016-02-23 --to 2016-02-24
Records=2,
DollarToday[date=2016-02-23 00:00:00, value=1060.0],
DollarToday[date=2016-02-24 00:00:00, value=1071.19]

Para cerrar, les recomiendo el siguiente tutorial: http://www.dabeaz.com/coroutines/index.html. La sintaxis es de Python 2.