class Rosterを試してみるには
d:id:imait:20090221:1235228058に掲載したclass Roster。これを試すのに使っていたコードも載せておこうと思います。こうしてばんばか掲載するのは、なんだかリソースの無駄のような気もするし、私の書くコードにしてもなにか参考になるようなものでもないしで、ちょっと微妙な気分でいるのですが、まあ一応生きて動いて書いている証拠を残すという意味で掲載しておこうと思います。
d:id:imait:20090210:1234268457でも説明しましたように、Sessionはsession.py、HTMLはhtml.py、Rosterはroster.pyというファイルにしてlibというディレクトリに放り込む。SQLiteのファイルを置くためのディレクトリdbも作っておきます。
それで、テスト用のコードを動かしてやればいいんですが、コマンドラインでは不都合が出るかも知れません。基本的にはCGIとして動かすものと諒解ください。
#! /usr/bin/env python # -*- coding: utf-8 -*- ''' test code ''' import codecs import string import cgi import cgitb; cgitb.enable() import sys import locale import datetime import Cookie import os import pickle import base64 sys.path.append('./lib/') import html import session import roster locale.setlocale(locale.LC_ALL, '') sys.stdout = codecs.lookup('utf-8')[-1](sys.stdout) sitetitle = u'サイト名' html = html.HTML(encode=u'utf-8', lang=u'ja', sitetitle=sitetitle) html.set_page_title(u'ページ名') cookie = Cookie.SimpleCookie(os.environ.get(u'HTTP_COOKIE', u'')) sesid = u'' if cookie.has_key(u'session'): sesid = cookie['session'].value ro = roster.Roster(u'./db/roster') validity = u'15 minutes' mys = session.Session(u'./db/session', sesid, validity, True) if sesid == mys.get_id(): form = cgi.FieldStorage() if form.has_key(u'delete_key'): mys.delete() html.print_resp_header() html.print_html_header() print(html.p(u'鍵を捨てました。')) else: cookie['session'] = sesid html.set_cookie(cookie) c_vals = mys.get_data() if not isinstance(c_vals, list): c_vals = [] if form.has_key(u'c_val'): c_val = form.getvalue(u'c_val', u'') c_val = c_val.decode('utf-8') c_vals.append(c_val) mys.set_data(c_vals) mys.save_data() html.print_resp_header() html.print_html_header() print(html.p(u'鍵は有効です。')) print(html.p(sesid)) print(u'<dl>') print(u'<dt>鍵の有効期間</dt><dd>' + validity + u'</dd>') print(u'<dt>鍵の作成時刻</dt><dd>' + mys.get_created_time() + u'</dd>') print(u'<dt>最終アクセス時</dt><dd>' + mys.get_accessed_time() + u'</dd>') print(u'<dt>鍵の廃棄時刻</dt><dd>' + mys.get_expire_time() + u'</dd>') print(u'<dt>前回アクセス時のリモートアドレス</dt><dd>' + mys.get_remote_addr() + u'</dd>') print(u'</dl>') print(html.p(u'時間帯はGMTです。')) print(u''' <form method="post" action="index.py"> <input type="hidden" name="delete_key" value="bye" /> <p><input type="submit" value="鍵を捨てる" /></p> </form> ''') print(u''' <form method="post" action="index.py"> <p>セッションにデータを追加 : <input type="text" name="c_val" /> <input type="submit" /></p> </form> ''') if len(c_vals) > 0: for cv in c_vals: print(html.p(cv)) else: form = cgi.FieldStorage() if form.has_key(u'namae') and form.has_key(u'aikotoba'): namae = form.getvalue(u'namae', u'').decode('utf-8').strip() aikotoba = form.getvalue(u'aikotoba', u'').decode('utf-8').strip() if len(namae) != 0 and len(aikotoba) != 0: if ro.exists(namae): if ro.confirm_user(namae, aikotoba): cookie['session'] = mys.get_id() html.set_cookie(cookie) html.print_resp_header() html.print_html_header() print(html.p(u'鍵がわたされました。')) print(html.p(u'ページを更新してください。')) c_vals = mys.get_data() if not isinstance(c_vals, list): c_vals = [] c_vals.append(u'ようこそ<strong>' + namae + u'</strong>さん') mys.set_data(c_vals) mys.save_data() else: html.print_resp_header() html.print_html_header() print(html.p(u'認証されませんでした。')) else: html.print_resp_header() html.print_html_header() print(html.p(u'ユーザー \'' + namae + u'\' は存在しません。')) else: html.print_resp_header() html.print_html_header() print(u'<h2>ユーザーログイン</h2>') print(u''' <form method="post" action="index.py"> <p>名前:<input type="text" name="namae" value="" /></p> <p>合言葉:<input type="password" name="aikotoba" value="" /> <input type="submit" /></p> </form> ''') else: html.print_resp_header() html.print_html_header() print(html.h2(u'ユーザーログイン')) print(u''' <form method="post" action="index.py"> <p>名前:<input type="text" name="namae" value="" /></p> <p>合言葉:<input type="password" name="aikotoba" id="aikotoba" value="" /> <input type="submit" /></p> </form> ''') print(u''' <form method="post" action="index.py"> <p><input type="submit" value="ページの更新" /></p> </form> ''') # test code for Roster print(html.h2(u'ユーザー管理')) print(u''' <h3>追加</h3> <form method="post" action"index.py"> <p>ユーザー名:<input type="text" name="add_user" /></p> <p>パスワード:<input type="password" name="add_password" /> <input type="submit" value="ユーザー追加" /></p> </form> ''') print(u''' <h3>パスワード変更</h3> <form method="post" action"index.py"> <p>ユーザー名:<input type="text" name="chpw_user" /></p> <p>旧パスワード:<input type="password" name="chpw_old" /> <p>新パスワード:<input type="password" name="chpw_new" /> <input type="submit" value="パスワード変更" /></p> </form> ''') print(u''' <h3>有効無効切り替え</h3> <form method="post" action"index.py"> <p>ユーザー名:<input type="text" name="act_user" /> <input type="submit" value="切り替え" /></p> </form> ''') print(u''' <h3>削除</h3> <form method="post" action"index.py"> <p>ユーザー名:<input type="text" name="delete_user" /> <input type="submit" value="ユーザー削除" /></p> </form> ''') if form.has_key(u'add_user'): newuid = form[u'add_user'].value.decode('utf-8').strip() newpassword = u'' if form.has_key(u'add_password'): newpassword = form[u'add_password'].value.decode('utf-8').strip() if len(newuid) == 0: print(html.p(u'ユーザー名 \'' + newuid + u'\' は不正です。')) elif len(newpassword) == 0: print(html.p(u'パスワードに空の文字列は使えません。')) else: try: ro.create_user(newuid, newpassword) except roster.UserExistsError, e: print(html.p(u'ユーザー \'' + newuid + u'\' は既に存在します。')) if form.has_key(u'chpw_user'): targetuid = form.getvalue(u'chpw_user', u'').decode('utf-8').strip() pw_old = form.getvalue(u'chpw_old', u'').decode('utf-8').strip() pw_new = form.getvalue(u'chpw_new', u'').decode('utf-8').strip() if len(targetuid) == 0: print(html.p(u'ユーザー名 \'' + targetuid + u'\' は不正です。')) elif len(pw_old) == 0: print(html.p(u'パスワードが不正です。')) elif len(pw_new) == 0: print(html.p(u'パスワードに空の文字列は使えません。')) else: if ro.exists(targetuid): if ro.confirm_user(targetuid, pw_old): try: ro.change_password(targetuid, pw_new) except: raise else: print(html.p(u'パスワードは変更されました。')) else: print(html.p(u'認証されませんでした。')) else: print(html.p(u'ユーザー \'' + targetuid + u'\' は存在しません。')) if form.has_key(u'act_user'): targetuid = form[u'act_user'].value.decode('utf-8').strip() if len(targetuid) == 0: print(html.p(u'ユーザー名 \'' + targetuid + u'\' は不正です。')) else: try: if ro.is_active(targetuid): ro.inactivate_user(targetuid) print(html.p(u'ユーザー \'' + targetuid + \ u'\' を無効化しました。')) else: ro.activate_user(targetuid) print(html.p(u'ユーザー \'' + targetuid + \ u'\' を有効化しました。')) except roster.UserNotExistsError as e: print(html.p(u'ユーザー \'' + targetuid + u'\' は存在しません。')) if form.has_key(u'delete_user'): targetuid = form[u'delete_user'].value.decode('utf-8').strip() if len(targetuid) == 0: print(html.p(u'ユーザー名 \'' + targetuid + u'\' は不正です。')) else: try: ro.delete_user(targetuid) except roster.UserNotExistsError as e: print(html.p(u'ユーザー \'' + targetuid + u'\' は存在しません。')) print(u'<h3>' + u'ユーザー一覧' + u'</h3>') print(u'<table border="1">\n<thead>') print u'''<tr><th>%s</th><th>%s</th><th>%s</th><th>%s</th><th>%s</th></tr>''' % \ (u'user id', u'status', u'created date', u'changed date', u'data') print(u'</thead>\n<tbody>') for item in ro.list(): status = u'' created_date = u'' changed_date = u'' data = None if ro.is_active(item): status = u'active' else: status = u'inactive' created_date = ro.get_created_time(item) changed_date = ro.get_changed_time(item) data = ro.load_user_data(item) print u'''<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>''' % \ (item, status, created_date, changed_date, data) if data != None and len(data) == 5: data = data[1:-1] else: data = [1, 2, 3, {u'key1': u'val1', u'key2': u'val2'}, 5] ro.save_user_data(item, data) print(u'</tbody>\n<table>') c = ro.count() print(html.p(u'ユーザー数:' + str(c))) print(html.p(u'REMOTE_ADDR : ' + os.environ.get('REMOTE_ADDR', u''))) print(html.p(u'REMOTE_HOST : ' + os.environ.get('REMOTE_HOST', u''))) html.print_html_close()
HTML
# -*- coding: utf-8 -*- ''' Class for assist to make HTML ''' import Cookie class HTML: '''Class to make HTML code.''' def __init__(self, encode='utf-8', lang='en', sitetitle=u'Untitled Site', \ pagetitle=u'Untitled', titledelimiter=u' :: ', \ cssfiles=None, jsfiles=None, cookie=None): self.encode = encode self.lang = lang self.sitetitle = sitetitle self.pagetitle = pagetitle self.titledelimiter = titledelimiter self.cssfiles = cssfiles self.jsfiles = jsfiles self.cookie = cookie # setters def set_encode(self, encode): self.encode = encode def set_lang(self, lang): self.lang = lang def set_site_title(self, sitetitle): self.sitetitle = sitetitle def set_page_title(self, pagetitle): self.pagetitle = pagetitle def set_cookie(self, cookie): self.cookie = cookie # printers def print_resp_header(self): if self.encode == u'' or not isinstance(self.encode, basestring): print(u'Content-Type: text/html') else: print(u'Content-Type: text/html; charset=' + self.encode) if isinstance(self.cookie, Cookie.SimpleCookie): print self.cookie.output() print(u'') def print_html_header(self): dtd = u'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"\n' + \ u'\t"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">' print(dtd) print(u'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="' + \ self.lang + '" lang="' + self.lang + '">') print(u'<head>') print(u'<meta http-equiv="Content-Style-Type" content="text/css" />') print(u'<meta http-equiv="Content-Script-Type" content="text/javascript" />') print(u'<title>' + self.pagetitle + u' ' + self.titledelimiter + \ u' ' + self.sitetitle + u'</title>') if isinstance(self.cssfiles, list): for cssfile in self.cssfiles: print(u'<link rel="stylesheet" type="text/css" href="' + \ cssfile + \ u'" />') elif isinstance(self.cssfiles, basestring): print(u'<link rel="stylesheet" type="text/css" href="' + \ self.cssfiles + \ u'" />') if isinstance(self.jsfiles, list): for jsfile in self.jsfiles: print(u'<script type="text/javascript" src="' + \ jsfile + \ u'"></script>') elif isinstance(self.jsfiles, basestring): print(u'<script type="text/javascript" src="' + \ self.jsfiles + \ u'"></script>') print(u'</head>') print(u'<body>') def print_html_close(self): print(u'</body>\n</html>') # elements def h1(self, content, attrs=None): return self._create_element(u'h1', content, attrs) def h2(self, content, attrs=None): return self._create_element(u'h2', content, attrs) def h3(self, content, attrs=None): return self._create_element(u'h3', content, attrs) def p(self, content, attrs=None): return self._create_element(u'p', content, attrs) def a(self, content, attrs=None): return self._create_element(u'a', content, attrs) def div(self, content, attrs=None): return self._create_element(u'div', content, attrs) # internal methods def _create_start_tag(self, elemname, attrs=None): starttag = u'<' + elemname if isinstance(attrs, dict): for attrname in (attrs.keys()): starttag = starttag + u' ' + attrname + u'="' + \ attrs[attrname] + u'"' starttag = starttag + u'>' return starttag def _create_end_tag(self, elemname): endtag = u'</' + elemname + u'>' return endtag def _create_element(self, elemname, content, attrs=None): starttag = self._create_start_tag(elemname, attrs) endtag = self._create_end_tag(elemname) if isinstance(content, int): content = str(content) if isinstance(content, basestring): return starttag + content + endtag else: raise TypeError(u'need string or int, got %r' % \ type(content))
Session
# -*- coding: utf-8 -*- ''' Class for provide session with sqlite3 ''' import os import datetime import sys import random import hashlib import sqlite3 import pickle import base64 class Session: '''Class to provide session.''' def __init__(self, dbpath, sid=None, validity=u'3 hours', ipmatch=False): self.sid = sid self.dbpath = dbpath self.dbtablename = u'sessiontable' self.validity = validity self.ipmatch = ipmatch self.data = None connection = self._open_db() cursor = connection.cursor() cursor.execute('select * from sqlite_master \ where type=\'table\' and name=?;', \ (self.dbtablename, )) tablecount = cursor.fetchall() if len(tablecount) == 0: cursor.execute('create table %s (id primary key, data, \ created_time, accessed_time, expire_time, remote_addr);' % \ self.dbtablename) cursor.execute('delete from %s where expire_time<datetime(\'now\');' \ % self.dbtablename) if isinstance(self.sid, basestring): cursor.execute('select id from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) idcount = cursor.fetchall() if len(idcount) == 0: self._create_session_id() self._insert_session_record(cursor) else: if self.ipmatch: current_addr = os.environ.get('REMOTE_ADDR', u'') past_addr = self.get_remote_addr() if current_addr == past_addr: self._update_session_record(cursor) else: self._create_session_id() self._insert_session_record(cursor) else: self._update_session_record(cursor) else: self._create_session_id() self._insert_session_record(cursor) cursor.close() connection.commit() connection.close() def get_id(self): return self.sid def get_created_time(self): connection = self._open_db() cursor = connection.cursor() cursor.execute('select created_time from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) created_time = cursor.fetchone() cursor.close() connection.close() return created_time[0] def get_accessed_time(self): connection = self._open_db() cursor = connection.cursor() cursor.execute('select accessed_time from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) accessed_time = cursor.fetchone() cursor.close() connection.close() return accessed_time[0] def get_expire_time(self): connection = self._open_db() cursor = connection.cursor() cursor.execute('select expire_time from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) expire_time = cursor.fetchone() cursor.close() connection.close() return expire_time[0] def get_remote_addr(self): connection = self._open_db() cursor = connection.cursor() cursor.execute('select remote_addr from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) remote_addr = cursor.fetchone() cursor.close() connection.close() return remote_addr[0] def get_data(self): if self.data == None: connection = self._open_db() cursor = connection.cursor() cursor.execute('select data from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) data = cursor.fetchone() data = data[0] if data != None: self.data = pickle.loads(base64.decodestring(data)) return self.data def set_data(self, data): self.data = data def reset_data(self): self.data = None def save_data(self): data = self.data if data != None: data = base64.encodestring(pickle.dumps(data)) connection = self._open_db() cursor = connection.cursor() cursor.execute('update %s set data=\'%s\' where id=\'%s\';' % \ (self.dbtablename, data, self.sid)) cursor.close() connection.commit() connection.close() def delete(self): connection = self._open_db() cursor = connection.cursor() cursor.execute('delete from %s where id=\'%s\';' % \ (self.dbtablename, self.sid)) cursor.close() connection.commit() connection.close() # internal methods def _open_db(self): return sqlite3.connect(self.dbpath) def _create_session_id(self): connection = self._open_db() cursor = connection.cursor() while True: now = datetime.datetime.today() seed = str(os.getpid()) + \ str(now.isoformat()) + \ str(random.randint(0, sys.maxint - 1)) message = hashlib.new('sha256') message.update(seed) sid = message.hexdigest() cursor.execute('select id from %s where id=\'%s\';' % \ (self.dbtablename, sid)) idcount = cursor.fetchall() if len(idcount) == 0: self.sid = sid break cursor.close() connection.close() def _insert_session_record(self, cursor): cursor.execute('insert into %s (id, created_time, accessed_time, \ expire_time, remote_addr) values(\'%s\', datetime(\'now\'), \ datetime(\'now\'), datetime(\'now\', \'%s\'), \'%s\');' % \ (self.dbtablename, self.sid, self.validity, \ os.environ.get('REMOTE_HOST', u''))) def _update_session_record(self, cursor): cursor.execute('update %s set accessed_time=datetime(\'now\'), \ expire_time=datetime(\'now\', \'%s\'), remote_addr=\'%s\' \ where id=\'%s\';' % \ (self.dbtablename, self.validity, \ os.environ.get('REMOTE_HOST', u''), self.sid))