Redis and Python

Redis test memo in Python.

installation:

sudo port install redis
sudo port intall py-redis
mkdir ~/.redis
touch ~/.redis.conf

start redis server:

redis-server

test:

insert:
python redistest.py insert

read:
python redistest.py read
page0
page1
page2
page3
page4
page5
page6
page7
page8
page9

pub:
python redistest.py publish 'hello, hoge'
python redistest.py publish 'bye, page'

sub:
python redistest.py subscribe
{'pattern': None, 'type': 'subscribe', 'channel': '2channel', 'data': 1L}
{'pattern': None, 'type': 'message', 'channel': '2channel', 'data': 'hello, hoge'}
{'pattern': None, 'type': 'message', 'channel': '2channel', 'data': 'bye, page'}

test script:

import os
import sys
import redis

channel='2channel'

def connect():
	# use default port, db index=0
	r = redis.Redis(host='localhost', port=6379, db=0)
	return r


def data_delete():
	r = connect()
	# delete all data in all databases on the current host
	r.flushall()

def data_insert():
	r = connect()
	for i in range(0,10):
		r.set('hoge%d' % i,'page%d' % i)


def data_read():
	r = connect()
	for i in range(0,10):
		print r.get('hoge%d' % i)

def data_publish(message):
	global channel
	r = connect()
	r.publish(channel, message)

def data_subscribe():
	global channel
	r = connect()
	ps = r.pubsub()
	ps.subscribe(channel)
	for message in ps.listen():
		print message



def main():
	functions = {
			"delete" : (lambda x:data_delete()),
			"insert" : (lambda x:data_insert()),
			"read" : (lambda x:data_read()),
			"publish" : (lambda x:data_publish(x)),
			"subscribe" : (lambda x:data_subscribe()),
			}

	param1 = sys.argv[2] if len(sys.argv)>2 else None
	functions[sys.argv[1]](param1)

main()

How Syslog can lose data

From Rainer’s Blog. It looks syslog assumes data sent whenTCP send() was successful by design.

Simulated the behavior and confirmed by running below one in client mode & the other in server mode. When you kill the server process & run it again while client process is running, you’ll see a message lost.

import sys
import SocketServer
import socket
import time

def usage():
    print "%s [client | server]" % sys.argv[0]
    
def server(HOST, PORT):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((HOST, PORT))
    sock.listen(1)
    (conn, sa) = sock.accept()
    try:
        while True:
            msg = conn.recv(8192)
            print "* received '%s'" % msg
            if len(msg) == 0:
                break;
    except socket.error, e:
        print 'Error: %s' % e
    
    conn.close()

def client_connect(HOST, PORT):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.connect((HOST, PORT))
    except socket.error, e:
        print "Error: %s" % e
        return None
    
    return sock

def client(HOST, PORT):
    sock = None
    i = 0
    while i< 10:
        if sock == None:
            print "connecting..."
            sock = client_connect(HOST, PORT)
        if sock == None:
            print "no connection. sleeping..."
            time.sleep(3)
            continue
        
        data = "* hoge %d" % i
        print "sending %s" % data
        
        try:
            ret = sock.send(data)
            
            print "Sent:     %s, return: %d" % (data, ret)
            i = i + 1
        except socket.error, e:
            print 'Error: %s' % e
            sock.close()
            sock = None
        print "sleeping..."
        time.sleep(1)

    sock.shutdown(socket.SHUT_RDWR)
    sock.close()

def main():
    if len(sys.argv) != 2:
        usage()
        return

    HOST, PORT = "localhost", 9999

    if sys.argv[1].lower() == "server":
        print "running server..."
        server(HOST, PORT)
    else:
        print "running client..."
        client(HOST, PORT)

if __name__ == "__main__":
    main()

ZeroMQ PUB/SUB memo

def sub_handler(sock, events):
    msg = sock.recv()
    print msg

def sub():
    print "*** sub"
    loop = ioloop.IOLoop.instance()
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect('tcp://127.0.0.1:5555')
    sock.setsockopt(zmq.SUBSCRIBE, '')
    loop.add_handler(sock, sub_handler, zmq.POLLIN)
    loop.start()

def pub():
    print "*** pub"
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind('tcp://127.0.0.1:5555')
    for i in range(0,10):
        msg = "[%d] Hello" % i
        print "publishing %s" % msg
        sock.send(msg)
        time.sleep(0.5)

ZeroMQ REQ/REP memo

ZeroMQ request/reply memo.
install:

sudo port install py-zmq

py:

import sys
import zmq
from zmq.eventloop import ioloop

def rep_handler(sock, events):
    msg = sock.recv()
    sock.send(msg)

def rep():
    print "*** rep"
    loop = ioloop.IOLoop.instance()
    ctx = zmq.Context()
    sock = ctx.socket(zmq.REP)
    sock.bind('tcp://127.0.0.1:5555')
    loop.add_handler(sock, rep_handler, zmq.POLLIN)
    loop.start()

def req():
    print "*** req"
    ctx = zmq.Context()
    sock = ctx.socket(zmq.REQ)
    sock.connect('tcp://127.0.0.1:5555')
    sock.send('hello')
    print sock.recv()

def main():
    command = sys.argv[1].lower()
    if command == "req":
        req()
    elif command == "rep":
        rep()

main()