Usando ‘data pipes y coroutines’ en Python

El concepto es similar a usar ‘pipes’ en UNIX. Por ejemplo, en UNIX podemos combinar varias herramientas para filtrar los contenidos en un archivo de texto. ¿Que hace el siguiente comando?

1
2
# Contar cuantos 'root' hay en el archivo de passwords del servidor Linux
cat /etc/password| egrep -i root

En Python 3 podemos hacer algo como lo siguiente:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python3
# pipeline.py /etc/passwd root
import sys, re
 
def grep(expression):
    while True:
        text = (yield) # Espere por el texto enviados por la otra co-rutina
        if re.search(expression, text):
            print(text)
 
def cat(file, target):
    next(target) # Initialize el otro lado de la tuberia
    with open(file, "r") as fh:
        for line in fh.readlines():
            target.send(line.strip()) # envie la linea a la co-rutina
 
def main(args):
    cat(args[0], grep(args[1])) # La 'tuberia' se hace de afuera hacia adentro
 
if __name__ == "__main__":
    main(sys.argv[1:])

Otro ejemplo, vamos a modificar el programa que lee los datos del archivo de datos de DolarToday para filtrar por fecha de inicio, final, valores máximos y mínimos. Note como el uso de los filtros es opcional y si no se aplican entonces retornan todos los datos (para mantener el ejemplo sencillo, removí el código que convierte los datos al formato binario):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python3
# Simple program to save the 'Dolartoday' extra official dollar rates from CSV to a custom format
# Revisited to use data pipes, couroutines
# @author Jose Vicente Nunez, josevnz@kodegeek.com
import os, sys, datetime, re, gzip, struct
from optparse import OptionParser, OptionValueError
from datetime import datetime
 
class DollarToday:
 
    __slot__ = ("__date", "__value")
 
    def __init__(self, ddate, value):
        if isinstance(ddate, datetime):
            self.__date = ddate
        else:
            self.__date = datetime.strptime(ddate, "%m-%d-%Y")
        self.__value = float(value)
        assert self.__value > 0.0, "{} for date {} is invalid!".format(value, self.__date)
 
    @property
    def date(self):
        return self.__date
 
    @date.setter
    def date(self, date):
        assert isinstance(date, datetime), "Invalid date {}".format(date)
        self.__date = date
 
    @property
    def value(self):
        return self.__value
 
    def __str__(self):
        return "DollarToday[date={}, value={}]".format(self.__date, self.__value)
 
    def __hash__(self):
        return str(id(self))
 
class DollarCollection(dict):
 
    def values(self):
        for dateId in sorted(self.keys()):
            yield self[dateId]
 
    def items(self):
        for dateId in self.keys():
            yield (dateId, self[dateId])
 
    def __iter__(self):
        for dateId in sorted(super().keys()):
            yield dateId
 
    def __str__(self):
        return "Records={},\n{}".format(len(self), ",\n".join([str(date) for date in self.values()]))
 
# Pipe: Read from binary file
def readBinary(file, target):
    next(target)
    FILE_MAGIC = b"DLR\x00"
    FILE_VERSION = b"\x00\x01"
    dollarStruct = struct.Struct("<id ")
    try:
       with gzip.open(file, "rb") as fh:
        magic = fh.read(len(FILE_MAGIC))
        if magic != FILE_MAGIC:
          raise "File doesn't look like a KodeGeek.com binary file!"
        version = fh.read(len(FILE_VERSION))
        if version > FILE_VERSION:
          raise "Unsupported file version: {}, expected {}".format(version, FILE_VERSION)
        while True:
          data = fh.read(dollarStruct.size)
          if len(data) == 0:
           break
          numbers = dollarStruct.unpack(data)
          target.send(DollarToday(datetime.fromordinal(numbers[0]), numbers[1]))
    except (Exception) as err:
            raise
 
# Pipe: Filter elements >= minimum
def min(min, target):
    next(target)
    while True:
        dollar = (yield)
        if dollar.value >= min:
            target.send(dollar)
 
# Pipe: Filter elements < = max
def max(max, target):
    next(target)
    while True:
        dollar = (yield)
        if dollar.value <= max:
            target.send(dollar)
 
# Pipe: filter from date
def from_date(fromd, target):
        next(target)
        while True:
            dollar = (yield)
            if dollar.date >= fromd:
                target.send(dollar)
 
# Pipe: filter from date
def to_date(tod, target):
        next(target)
        while True:
            dollar = (yield)
            if dollar.date < = tod:
                target.send(dollar)
 
def adder(collection):
    while True:
        dollar = (yield)
        collection[dollar.date] = dollar
 
def main(options):
 
    dc = DollarCollection()
 
    pipeline = adder(dc) # Destination of the pipe
    # Add pipes (if needed)
    if options.min != 99999999:
        pipeline = min(options.min, pipeline)
 
    if options.max != -1:
        pipeline = max(options.min, pipeline)
 
    if options.fromd is not None:
        pipeline = from_date(datetime.strptime(options.fromd, "%Y-%m-%d"), pipeline)
 
    if options.tod is not None:
        pipeline = to_date(datetime.strptime(options.tod, "%Y-%m-%d"), pipeline)
 
    readBinary(options.report, pipeline) # Start of the pipe that reads the file contents
 
    print("{}".format(dc))
 
if __name__ == "__main__":
 
    usagetext = """
%prog --report binary.file [--from_date YYYYMMDD] [--to YYYYMMDD] [--min amount] [--max amount]
"""
 
    op = OptionParser(usage=usagetext)
    op.add_option(
                  "-p", "--report",
                  action="store",
                  dest="report",
                  help="Read the contents from the binary storage and generate a report.")
    op.add_option(
                  "-f", "--from",
                  action="store",
                  dest="fromd",
                  help="Optional filter. Start date yyyy-mm-dd")
    op.add_option(
                  "-t", "--to",
                  action="store",
                  dest="tod",
                  help="Optional filter. End date yyyy-mm-dd")
    op.add_option(
                  "-m", "--min",
                  action="store",
                  dest="min",
                  default=99999999,
                  type="float",
                  help="Optional filter. Minimal amount")
    op.add_option(
                  "-M", "--max",
                  action="store",
                  dest="max",
                  default=-1,
                  type="float",
                  help="Optional filter. Maximum amount")
 
    (options, values) = op.parse_args()
    main(options)

Una salida de ejemplo:

1
2
3
4
DolarTodayReader.py --report /Users/josevnz/Documents/dolartoday.jose --min 1050 --from 2016-02-23 --to 2016-02-24
Records=2,
DollarToday[date=2016-02-23 00:00:00, value=1060.0],
DollarToday[date=2016-02-24 00:00:00, value=1071.19]

Para cerrar, les recomiendo el siguiente tutorial: http://www.dabeaz.com/coroutines/index.html. La sintaxis es de Python 2.