Jugando con MongoDB, recolectando eventos (Mongo Tail-Log)

Esta es una idea con la cual he venido jugando desde hace tiempo, la cual es capturar eventos usando MongoDB como base de datos. Me gusta la idea que se pueden controlar el espacio en disco usando un arreglo circular (en MongoDB se llaman ‘capped collections‘), además de que definir el esquema de los datos es muy sencillo.

Preparación

Lo primero es bajarse e instalar MongoDB (yo utilizo OSX):

1
2
3
4
cd /usr/share && mkdir mongodb
tar -xzvf /Users/josevnz/Downloads/mongodb-osx-x86_64-2.2.2.tgz
sudo ln -s mongodb-osx-x86_64-2.2.2 mongo
cd mongo && sudo mkdir data etc log scripts

Luego creamos un archivo de configuración básico:

1
2
3
4
5
6
7
8
9
10
Macintosh:mongo josevnz$ cat etc/mongodb.conf 
fork = true
bind_ip = 127.0.0.1
port = 27017
quiet = true
dbpath = /usr/share/mongodb/mongo/data
logpath = /usr/share/mongodb/mongo/log/mongodb.log
pidfilepath = /usr/share/mongodb/mongo/mongodb.pid
logappend = true
journal = true

Y un par de ‘scripts’ para hacernos la vida más sencilla:

1
2
3
4
5
6
7
8
9
10
Macintosh:mongo josevnz$ cat etc/mongodb.conf 
fork = true
bind_ip = 127.0.0.1
port = 27017
quiet = true
dbpath = /usr/share/mongodb/mongo/data
logpath = /usr/share/mongodb/mongo/log/mongodb.log
pidfilepath = /usr/share/mongodb/mongo/mongodb.pid
logappend = true
journal = true

Y la arrancamos:

1
2
3
4
5
Macintosh:~ josevnz$ sudo mongod -f $MONGO_HOME/etc/mongodb.conf 
Password:
forked process: 2782
all output going to: /usr/share/mongodb/mongo/log/mongodb.log
child process started successfully, parent exiting

Preparando la colección

Ahora hay que preparar el sitio en donde vamos a poner los datos.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Macintosh:mongo josevnz$ mongo
MongoDB shell version: 2.2.2
connecting to: test
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
	http://docs.mongodb.org/
Questions? Try the support group
	http://groups.google.com/group/mongodb-user
> use logdb
switched to db logdb
> db.createCollection("logs", {capped:true, size:100000})
{ "ok" : 1 }
> db.logs.isCapped()
true

Código del cliente que inserta los datos

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env jython
# Simple class that simulates an event writer
# Author: josevnz@kodegeek.com
# BLOG: http://kodegeek.com/blog
# Asumes that you created a database called 'logsdb' and a collection called 'logs':
# use logdb
# db.createCollection("logs", {capped:true, size:100000})
#
from com.mongodb import Mongo, MongoException, WriteConcern, DB, DBCollection, BasicDBObject, DBObject, DBCursor, ServerAddress
from java.util import Arrays, Date, Random
from java.util.concurrent import Executors, TimeUnit
from java.lang import Runnable, Thread
import sys, os
 
class EventWriter(Runnable):
 
        def __init__(self, db):
                self.db = db
                self.col = db.getCollection("logs")
                self.random = Random(1973)
 
        def run(self):
                number = self.random.nextLong()
                event = BasicDBObject('datetime', Date().toString()).append('text', 'This is an event, random # %d' % number)
                print "New event: %s" % event
                self.col.insert(event)
 
def main(args):
 
        initialDelay = 0
        delay = 5
        # Do not use 'localhost', that makes the driver to report a stupid error and hung
        list = Arrays.asList(ServerAddress("127.0.0.1", 27017))
        m = Mongo(list)
        # m.setWriteConcern(WriteConcern.JOURNALED)
        m.setWriteConcern(WriteConcern.NONE) # Do not care if the write makes it or not
        db = m.getDB( "logsdb" )
        print "Connected to %s" % db.getName()
 
        command = EventWriter(db)
        # Use a sigle thread for this example, but in reality the report uses a separate thread to avoid blocking the application
        executor = Executors.newSingleThreadScheduledExecutor()
        print "Press Ctrl-C to abort this script, events will be written periodically into the database"
        future = executor.scheduleWithFixedDelay(command, initialDelay, delay, TimeUnit.SECONDS)
        #sys.exit(0)
 
if __name__ == "__main__":
        main(sys.argv[1:])

La salida se ve asi:

Macintosh:mongodb josevnz$./log_writer.py
Connected to logsdb
Press Ctrl-C to abort this script, events will be written periodically into the database
New event: { "datetime" : "Thu Dec 13 10:13:21 EST 2012" , "text" : "This is an event, random # -6901132129250388696"}
New event: { "datetime" : "Thu Dec 13 10:13:27 EST 2012" , "text" : "This is an event, random # 2141911474641068654"}
New event: { "datetime" : "Thu Dec 13 10:13:32 EST 2012" , "text" : "This is an event, random # -7447082860282012741"}
New event: { "datetime" : "Thu Dec 13 10:13:37 EST 2012" , "text" : "This is an event, random # 3042277681337134497"}
New event: { "datetime" : "Thu Dec 13 10:13:42 EST 2012" , "text" : "This is an event, random # -2682038860783877385"}
New event: { "datetime" : "Thu Dec 13 10:13:47 EST 2012" , "text" : "This is an event, random # -6576368686118448135"}
New event: { "datetime" : "Thu Dec 13 10:13:52 EST 2012" , "text" : "This is an event, random # -294840040020254100"}
New event: { "datetime" : "Thu Dec 13 10:13:57 EST 2012" , "text" : "This is an event, random # 4202626908153060298"}
New event: { "datetime" : "Thu Dec 13 10:14:02 EST 2012" , "text" : "This is an event, random # -6313895213434337152"}
New event: { "datetime" : "Thu Dec 13 10:14:07 EST 2012" , "text" : "This is an event, random # 983475561958631366"}
New event: { "datetime" : "Thu Dec 13 10:14:12 EST 2012" , "text" : "This is an event, random # -6651143639772223084"}
New event: { "datetime" : "Thu Dec 13 10:14:17 EST 2012" , "text" : "This is an event, random # -7909942155638967101"}

En otra ventana verificamos que en verdad estamos recibiendo eventos, usando la mongo Shell:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Macintosh:mongodb josevnz$ mongo
MongoDB shell version: 2.2.2
connecting to: test
> use logsdb
switched to db logsdb
// Imprime el último registro recibido
> db.logs.find().skip(db.logs.count()-1).forEach(printjson)
{
	"_id" : ObjectId("50d6ec7cef869f82b3f656ea"),
	"datetime" : "Sun Dec 23 06:35:24 EST 2012",
	"text" : "This is an event, random # -8356391245437638799"
}
 
// Imprime el primer registro recibido
> db.logs.findOne()
{
	"_id" : ObjectId("50c9f092ef863d028e1ab74b"),
	"datetime" : "Thu Dec 13 10:13:21 EST 2012",
	"text" : "This is an event, random # -6901132129250388696"
}

Código del cliente que lee continuamente de la base de datos

Siempre ayuda tener a la mano la equivalencia de MongoDB a SQL. Aqui queremos simular el mismo comportamiento de la herramienta de UNIX ‘tail’, asi que utilizamos algo llamado ‘Tailable cursors‘:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env jython
# Simple class that simulates an event reader
# Author: josevnz@kodegeek.com
# BLOG: http://kodegeek.com/blog
# Asumes that you created a database called 'logsdb' and a collection called 'logs':
# use logdb
# db.createCollection("logs", {capped:true, size:100000})
# Or convert an existing one to capped: db.runCommand({"convertToCapped": "logs", size: 100000})
# It is very important that you get a driver version more recent than 2.7.1 (Collection.isCapped is broken there)
#
from com.mongodb import Mongo, BasicDBObjectBuilder, DB, DBCollection, BasicDBObject, DBObject, ServerAddress, Bytes
from java.util import Arrays, Date
from java.util.concurrent import Executors, TimeUnit
from java.lang import Runnable, Thread
import sys, os
 
class EventReader(Runnable):
 
        def __init__(self, db):
                self.db = db
                if not db.collectionExists("logs"):
                        raise Exception("Logs doesn't exist, please create!")
                self.coll = db.getCollection("logs")
                if not self.coll.isCapped():
                        raise Exception("Logs is not a capped collection!")
                self.sortBy = BasicDBObjectBuilder().start("$natural", 1).get()
                print "Ready to read events..."
 
        def run(self):
                lastVal = None # This could be refined to get the last event
                cursor = self.coll.find(lastVal).sort(self.sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA)
                while cursor.hasNext():
                        print "%s" % cursor.next()
 
def main(args):
 
        delay = 1
 
        list = Arrays.asList(ServerAddress("127.0.0.1", 27017))
        m = Mongo(list)
        db = m.getDB( "logsdb" )
        print "Connected to %s" % db.getName()
 
        command = EventReader(db)
        executor = Executors.newSingleThreadScheduledExecutor()
        print "Press Ctrl-C to abort this script, reading events from the database"
        future = executor.schedule(command, delay, TimeUnit.SECONDS)
        #sys.exit(0)
 
if __name__ == "__main__":
        main(sys.argv[1:])

Pienso implementar una herramienta que use esto en mi trabajo, pero el código se ve fácil de usar y promete mucho 🙂