El concepto es similar a usar ‘pipes’ en UNIX. Por ejemplo, en UNIX podemos combinar varias herramientas para filtrar los contenidos en un archivo de texto. ¿Que hace el siguiente comando?
# Contar cuantos 'root' hay en el archivo de passwords del servidor Linux
cat /etc/password| egrep -i root
En Python 3 podemos hacer algo como lo siguiente:
#!/usr/bin/env python3
# pipeline.py /etc/passwd root
import sys, re
def grep(expression):
while True:
text = (yield) # Espere por el texto enviados por la otra co-rutina
if re.search(expression, text):
print(text)
def cat(file, target):
next(target) # Initialize el otro lado de la tuberia
with open(file, "r") as fh:
for line in fh.readlines():
target.send(line.strip()) # envie la linea a la co-rutina
def main(args):
cat(args[0], grep(args[1])) # La 'tuberia' se hace de afuera hacia adentro
if __name__ == "__main__":
main(sys.argv[1:])
Otro ejemplo, vamos a modificar el programa que lee los datos del archivo de datos de DolarToday para filtrar por fecha de inicio, final, valores máximos y mínimos. Note como el uso de los filtros es opcional y si no se aplican entonces retornan todos los datos (para mantener el ejemplo sencillo, removí el código que convierte los datos al formato binario):
#!/usr/bin/env python3
# Simple program to save the 'Dolartoday' extra official dollar rates from CSV to a custom format
# Revisited to use data pipes, couroutines
# @author Jose Vicente Nunez, josevnz@kodegeek.com
import os, sys, datetime, re, gzip, struct
from optparse import OptionParser, OptionValueError
from datetime import datetime
class DollarToday:
__slot__ = ("__date", "__value")
def __init__(self, ddate, value):
if isinstance(ddate, datetime):
self.__date = ddate
else:
self.__date = datetime.strptime(ddate, "%m-%d-%Y")
self.__value = float(value)
assert self.__value > 0.0, "{} for date {} is invalid!".format(value, self.__date)
@property
def date(self):
return self.__date
@date.setter
def date(self, date):
assert isinstance(date, datetime), "Invalid date {}".format(date)
self.__date = date
@property
def value(self):
return self.__value
def __str__(self):
return "DollarToday[date={}, value={}]".format(self.__date, self.__value)
def __hash__(self):
return str(id(self))
class DollarCollection(dict):
def values(self):
for dateId in sorted(self.keys()):
yield self[dateId]
def items(self):
for dateId in self.keys():
yield (dateId, self[dateId])
def __iter__(self):
for dateId in sorted(super().keys()):
yield dateId
def __str__(self):
return "Records={},\n{}".format(len(self), ",\n".join([str(date) for date in self.values()]))
# Pipe: Read from binary file
def readBinary(file, target):
next(target)
FILE_MAGIC = b"DLR\x00"
FILE_VERSION = b"\x00\x01"
dollarStruct = struct.Struct(" FILE_VERSION:
raise "Unsupported file version: {}, expected {}".format(version, FILE_VERSION)
while True:
data = fh.read(dollarStruct.size)
if len(data) == 0:
break
numbers = dollarStruct.unpack(data)
target.send(DollarToday(datetime.fromordinal(numbers[0]), numbers[1]))
except (Exception) as err:
raise
# Pipe: Filter elements >= minimum
def min(min, target):
next(target)
while True:
dollar = (yield)
if dollar.value >= min:
target.send(dollar)
# Pipe: Filter elements < = max
def max(max, target):
next(target)
while True:
dollar = (yield)
if dollar.value <= max:
target.send(dollar)
# Pipe: filter from date
def from_date(fromd, target):
next(target)
while True:
dollar = (yield)
if dollar.date >= fromd:
target.send(dollar)
# Pipe: filter from date
def to_date(tod, target):
next(target)
while True:
dollar = (yield)
if dollar.date < = tod:
target.send(dollar)
def adder(collection):
while True:
dollar = (yield)
collection[dollar.date] = dollar
def main(options):
dc = DollarCollection()
pipeline = adder(dc) # Destination of the pipe
# Add pipes (if needed)
if options.min != 99999999:
pipeline = min(options.min, pipeline)
if options.max != -1:
pipeline = max(options.min, pipeline)
if options.fromd is not None:
pipeline = from_date(datetime.strptime(options.fromd, "%Y-%m-%d"), pipeline)
if options.tod is not None:
pipeline = to_date(datetime.strptime(options.tod, "%Y-%m-%d"), pipeline)
readBinary(options.report, pipeline) # Start of the pipe that reads the file contents
print("{}".format(dc))
if __name__ == "__main__":
usagetext = """
%prog --report binary.file [--from_date YYYYMMDD] [--to YYYYMMDD] [--min amount] [--max amount]
"""
op = OptionParser(usage=usagetext)
op.add_option(
"-p", "--report",
action="store",
dest="report",
help="Read the contents from the binary storage and generate a report.")
op.add_option(
"-f", "--from",
action="store",
dest="fromd",
help="Optional filter. Start date yyyy-mm-dd")
op.add_option(
"-t", "--to",
action="store",
dest="tod",
help="Optional filter. End date yyyy-mm-dd")
op.add_option(
"-m", "--min",
action="store",
dest="min",
default=99999999,
type="float",
help="Optional filter. Minimal amount")
op.add_option(
"-M", "--max",
action="store",
dest="max",
default=-1,
type="float",
help="Optional filter. Maximum amount")
(options, values) = op.parse_args()
main(options)
Una salida de ejemplo:
DolarTodayReader.py --report /Users/josevnz/Documents/dolartoday.jose --min 1050 --from 2016-02-23 --to 2016-02-24
Records=2,
DollarToday[date=2016-02-23 00:00:00, value=1060.0],
DollarToday[date=2016-02-24 00:00:00, value=1071.19]
Para cerrar, les recomiendo el siguiente tutorial: http://www.dabeaz.com/coroutines/index.html. La sintaxis es de Python 2.