开源地址:
一、搭建开发环境
首先,确认系统安装的Python版本是否为3.6.x:
在Mac上安装Python。如果你正在使用Mac,系统是OS X 10.8~10.10,那么系统自带的Python版本是2.7。要安装最新的Python 3.6,有两个方法:
方法一:从Python官网下载Python 3.6的安装程序,双击运行并安装;
方法二:如果安装了Homebrew,直接通过命令brew install python3安装即可。
然后,用pip安装开发Web App需要的第三方库:
异步框架aiohttp:
前端模板引擎jinja2:
MySQL 5.x数据库,从官方网站下载并安装,安装完毕后,请务必牢记root口令。
MySQL的Python异步驱动程序aiomysql:项目结构
选择一个工作目录,然后,我们建立如下的目录结构:blog-python3-webapp/ <-- 根目录|+- backup/ <-- 备份目录|+- conf/ <-- 配置文件|+- dist/ <-- 打包目录|+- www/ <-- Web目录,存放.py文件| || +- static/ <-- 存放静态文件| || +- templates/ <-- 存放模板文件|+- ios/ <-- 存放iOS App工程|+- android/ <-- 存放Android App工程|+- LICENSE <-- 代码LICENSE
创建好项目的目录结构后,同时建立git仓库并同步至GitHub (地址:https://github.com/leebingbin/Python3.WebAPP.Blog)。
由于我们的blog-python3-webapp建立在asyncio的基础上,因此用aiohttp写一个基本的app.py测试异步web服务环境等是否正常。
在测试之前,需要先安装第三方模块asyncio。asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。python 3.4开始就支持异步IO编程,提供了asyncio库,但是3.4中采用的是@asyncio.coroutine和yield from这和原来的generator关键字yield不好区分,在3.5以后,采用了async(表示协程)和await关键字,这样就好区分多了。由于Web框架使用了基于asyncio的aiohttp,这是基于协程的异步模型。在协程中,不能调用普通的同步IO操作,因为所有用户都是由一个线程服务的,协程的执行速度必须非常快,才能处理大量用户的请求。而耗时的IO操作不能在协程中以同步的方式调用,否则,等待一个IO操作时,系统无法响应任何其他用户。 异步编程的一个原则:一旦决定使用异步,则系统每一层都必须是异步。幸运的是aiomysql为MySQL数据库提供了异步IO的驱动。但在这之前,需要先安装MySQL;毕竟Web App里面有很多地方都要访问数据库。采用dmg的方式安装
1、下载完成后,双击打开
2、右键,选打开,紧接着按照提示,傻瓜式安装即可;但是当弹出一个MYSQL Installer提示框的时候一定打开备忘录复制粘贴记下弹出框的密码(这是你MySQL的root账户的密码)
3、 打开MySQL服务
4、配置路径
用文本编辑器打开.bash_profile,加入PATH=$PATH:/usr/local/mysql/bin并保存。我习惯用 Atom 作为文本编辑器,或者直接用自带的 vim 编辑。在命令行输入source ~/.bash_profile路径就配置好了。5、登陆并修改密码
登录——mysql -u root -p登陆,输入之前保存的密码修改密码——用SET PASSWORD FOR 'root'@'localhost' = PASSWORD('root') ;
修改密码。如果你的版本比较新的话(比如我安装的是5.7.x的版本),就会出现如下提示,这个时候已经更新了密码,但是会有 warning。如果想查看warning,可以用SHOW WARNINGS。
6、Mac下MySQL卸载方法 (如果安装错或忘了密码,还不如卸载重装)
mac下mysql的DMG格式安装内有安装文件,却没有卸载文件……很郁闷的事。 先停止所有mysql有关进程。sudo rm /usr/local/mysqlsudo rm -rf /usr/local/mysql*sudo rm -rf /Library/StartupItems/MySQLCOMsudo rm -rf /Library/PreferencePanes/My*#如果电脑上没有这个文件,跳过这步骤就好#如果有打开文件,去掉MYSQLCOM=-YES-保存即可vim /etc/hostconfig (and removed the line MYSQLCOM=-YES-)rm -rf ~/Library/PreferencePanes/My*sudo rm -rf /Library/Receipts/mysql*sudo rm -rf /Library/Receipts/MySQL*sudo rm -rf /var/db/receipts/com.mysql.*
二、用Python写一个ORM
搭建好环境之后,那就让我们用用Python写一个ORM吧!
#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'libingbin2015@aliyun.com'import asyncio, loggingimport aiomysqldef log(sql, args=()): logging.info('SQL: %s ' % sql)#创建连接池async def create_pool(loop, **kw): logging.info('create database connection pool...') global __pool __pool = await aiomysql.create_pool( host = kw.get('host', 'localhost'), port = kw.get('port', 3306), user = kw['root'], password = kw['toor'], db = kw['db'], charset = kw.get('charset', 'utf8mb4'), #utf8mb4是utf8的超集,完全兼容utf8,支持emoji直接插入存储 autocommit = kw.get('autocommit', True), maxsize = kw.get('maxsizze', 10), minsize = kw.get('minsize', 1), loop = loop )#CRUD之Rasync def select(sql, args, size=None): log(sql, args) global __pool async with __pool.get() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql.replace('?', '%s'), args or ()) #SQL语句的占位符是?,而MySQL的占位符是%s,select()函数在内部自动替换。防止SQL注入攻击:注意要始终坚持使用带参数的SQL,而不是自己拼接SQL字符串 if size: rs = await cur.fetchmany(size) #获取最多指定数量的记录 else: rs = await cur.fetchall() #获取所有记录 logging.info('rows returned: %s' % len(rs)) return rs#定义通用的execute()函数,用于执行CRUD方法async def execute(sql, args, autocommit=True): log(sql) async with __pool.get() as conn: if not autocommit: await conn.begin() try: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql.replace('?', '%s'), args) affected = cur.rowcount if not autocommit: await conn.commit() except BaseException as e: if not autocommit: await conn.rollback() raise return affecteddef create_args_string(num): L = [] for n in range(num): L.append('?') return ','.join(L)class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name self.colum_type = column_type self.primary_key = primary_key self.default = default def __str__(self): return '<%s , $s:%s>' % (self.__class__.__name__, self.colum_type, self.name)class StringField(Field): def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'): super().__init__(name, ddl, primary_key, default)class BooleanField(Field): def __init__(self, name=None, default=False): super().__init__(name, 'boolean', False, default)class IntegerField(Field): def __init__(self, name=None, primary_key=False, default=0): super().__init__(name, 'bigint', primary_key, default)class FloatField(Field): def __init__(self, name=None, primary_key=False, default=0.0): super().__init__(name, 'real', primary_key, default)class TextField(Field): def __init__(self, name=None, default=None): super().__init__(name, 'text', False, default)class ModeMetaclass(type): def __new__(cls, name, bases, attrs): if name == 'Model': return type.__new__(cls, name, bases, attrs) tableName = attrs.get('__table__', None) or name logging.info('found model: %s (table: %s)' % (name, tableName)) mappings = dict() fields = [] primaryKey = None for k, v in attrs.items(): if isinstance(v, Field): logging.info(' found mapping: %s ==> %s' % (k, v)) mappings[k] = v if v.primary_key: # 找到主键 if primaryKey: raise StandardErroe('Duplicate primary key for field: %s' % k) primaryKey = k else: fields.append(k) if not primaryKey: raise StandardError('Primary key not found.') for k in mappings.keys(): attrs.pop(k) escaped_fields = list(map(lambda f: '`%s`' % f, fields)) attrs['__mappings__'] = mappings # 保存属性和列的映射关系 attrs['__table__'] = tableName attrs['__primary_key__'] = primaryKey # 主键属性名 attrs['__fields__'] = fields # 除主键外的属性名 attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName) attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1)) attrs['__update__'] = 'update `%s` set %s where `%s` = ?' % (tableName, ', '.join(map(lambda f: '`%s` = ?' % (mappings.get(f).name or f), fields)), primaryKey) attrs['__delete__'] = 'delete from `%s` where `%s` = ?' % (tableName, primaryKey) return type.__new__(cls, name, bases, attrs)class Model(dict, metaclass=ModelMetaclass): def __init__(self, **kw): super(Model, self).__init__(**kw) def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(r"'Model' object has no attribute '%s'" % key) def __setattr__(self, key, value): self[key] = value def getValue(self, key): return getattr(self, key, None) def getValueOrDefault(self, key): value = getattr(self, key, None) if value is None: field = self.__mappings__[key] if field.default is not None: value = field.default() if callable(field.default) else field.default logging.debug('using default value for %s : %s' % (key, str(value))) setattr(self, key, value) return value @classmethod async def findall(cls, where=None, args=None, **kw): ' find objects by where clause.' sql = [cls.__slect__] if where: sql.append('where') sql.append(where) if args is None: args = [] orderBy = kw.get('orderBy', None) if orderBy: sql.append('order by') sql.append(orderBy) limit = kw.get('limit', None) if limit is not None: sql.append('limit') if isinstance(limit, int): sql.append('?') args.append(limit) elif isinstance(limit, tuple) and len(limit) == 2: sql.append('?, ?') args.extend(limit) else: raise ValueError('Invalid limit value: %s' % str(limit)) rs = await select(' '.join(sql), args) return [cls(**r) for r in rs] @classmethod async def findNumber(cls, selectField, where=None, args=None): ' find number by select and where. ' sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)] if where: sql.append('where') sql.append(where) rs = await select(' '.join(sql), args, 1) if len(rs) == 0: return None return rs[0]['_num_'] @classmethod async def find(cls, pk): ' find object by primary key. ' rs = await select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1) if len(rs) == 0: return None return cls(**rs[0]) async def save(self): args = list(map(self.getValueOrDefault, self.__fields__)) args.append(self.getValueOrDefault(self.__primary_key__)) rows = await execute(self.__inset__, args) if rows != 1: logging.warning('failed to update by primary key: affected rows: %s' % rows) async def remove(self): args = [self.getValue(self.__primary_key__)] rows = await execute(self.__delete__, args) if rows != 1: logging.warning('failed to remove by primary key: affected rows: %s' % rows)
有了ORM,我们就可以把需要的3个表用Model表示出来:
#!/usr/bin/env python3# -*- coding: utf-8 -*-''' Models for user, blog, comment.'''__author__ = 'libingbin2015@aliyun.com'import time, uuidfrom orm import Model, StringField, BooleanField, FloatField, TextFielddef next_id(): return '%015%s000' % (int(time.time() * 1000), uuid.uuid4().hex)class User(Model): __table__ = 'users' id = StringField(primary_key=True, default=next_id(), ddl='varchar(50)') email = StringField(ddl='varchar(50)') passwd = StringField(ddl='varchar(50)') admin = BooleanField() name = StringField(ddl='varchar(50)') image = StringField(ddl='varchar(500)') created_at = FloatField(default=time.time)class Blog(Model): __table__ = 'blogs' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') user_id = StringField(ddl='varchar(50)') user_name = StringField(ddl='varchar(50)') user_image = StringField(ddl='varchar(500)') name = StringField(ddl='varchar(50)') summary = StringField(ddl='varchar(200)') content = TextField() created_at = FloatField(default=time.time)class Comment(Model): __table__ = 'comments' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') blog_id = StringField(ddl='varchar(50)') user_id = StringField(ddl='varchar(50)') user_name = StringField(ddl='varchar(50)') user_image = StringField(ddl='varchar(500)') content = TextField() created_at = FloatField(default=time.time)
三、编写Web框架
开始Web开发前,我们需要编写一个Web框架。
#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'libingbin2015@aliyun.com'import asyncio, os, inspect, logging, functoolsfrom urllib import parsefrom aiohttp import webfrom apis import APIErrordef get(path): ''' Define decorator @get('/path') ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = 'GET' wrapper.__route__ = path return wrapper return decoratordef post(path): ''' Define decorator @post('/path') ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = 'POST' wrapper.__route__ = path return wrapper return decoratordef get_required_kw_args(fn): args = [] params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty: args.append(name) return tuple(args)def get_named_kw_args(fn): args = [] params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: args.append(name) return tuple(args)def has_named_kw_args(fn): params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: return Truedef has_var_kw_arg(fn): params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.VAR_KEYWORD: return Truedef has_request_arg(fn): sig = inspect.signature(fn) params = sig.parameters found = False for name, param in params.items(): if name == 'request': found = True continue if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD): raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig))) return foundclass RequestHandler(object): def __init__(self, app, fn): self._app = app self._func = fn self._has_request_arg = has_request_arg(fn) self._has_var_kw_arg = has_var_kw_arg(fn) self._has_named_kw_args = has_named_kw_args(fn) self._named_kw_args = get_named_kw_args(fn) self._required_kw_args = get_required_kw_args(fn) async def __call__(self, request): kw = None if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args: if request.method == 'POST': if not request.content_type: return web.HTTPBadRequest('Missing Content-Type.') ct = request.content_type.lower() if ct.startswith('application/json'): params = await request.json() if not isinstance(params, dict): return web.HTTPBadRequest('JSON body must be object.') kw = params elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'): params = await request.post() kw = dict(**params) else: return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type) if request.method == 'GET': qs = request.query_string if qs: kw = dict() for k, v in parse.parse_qs(qs, True).items(): kw[k] = v[0] if kw is None: kw = dict(**request.match_info) else: if not self._has_var_kw_arg and self._named_kw_args: # remove all unamed kw: copy = dict() for name in self._named_kw_args: if name in kw: copy[name] = kw[name] kw = copy # check named arg: for k, v in request.match_info.items(): if k in kw: logging.warning('Duplicate arg name in named arg and kw args: %s' % k) kw[k] = v if self._has_request_arg: kw['request'] = request # check required kw: if self._required_kw_args: for name in self._required_kw_args: if not name in kw: return web.HTTPBadRequest('Missing argument: %s' % name) logging.info('call with args: %s' % str(kw)) try: r = await self._func(**kw) return r except APIError as e: return dict(error=e.error, data=e.data, message=e.message)def add_static(app): path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') app.router.add_static('/static/', path) logging.info('add static %s => %s' % ('/static/', path))def add_route(app, fn): method = getattr(fn, '__method__', None) path = getattr(fn, '__route__', None) if path is None or method is None: raise ValueError('@get or @post not defined in %s.' % str(fn)) if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): fn = asyncio.coroutine(fn) logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys()))) app.router.add_route(method, path, RequestHandler(app, fn))def add_routes(app, module_name): n = module_name.rfind('.') if n == (-1): mod = __import__(module_name, globals(), locals()) else: name = module_name[n+1:] mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name) for attr in dir(mod): if attr.startswith('_'): continue fn = getattr(mod, attr) if callable(fn): method = getattr(fn, '__method__', None) path = getattr(fn, '__route__', None) if method and path: add_route(app, fn)
编写配置文件config_default.py :
#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'libingbin2015@aliyun.com'''' Default configurations.'''# 'secret': secret 是必需的选项,这是用于签名会话ID cookie的密钥。这可以是单个密钥的字符串或多个秘密的数组(只有第一个元素将用于签名会话ID cookie)而在验证请求中的签名时,将考虑所有元素# 另外, 考虑到安全性, 这个密钥是不建议存储在的程序中的. 最好的方法是存储在你的系统环境变量中, 通过 os.getenv(key, default=None) 获得configs = { 'debug': True, 'db': { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'password': 'toor', 'db': 'db' }, 'session': { 'secret': 'libingbin2015@aliyun.com' }}
编写配置文件config_override.py :
#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'libingbin2015@aliyun.com'''' Override configurations.'''configs = { 'db': { 'host': '127.0.0.1' }}
把 config_default.py 作为开发环境的标准配置,把 config_override.py 作为生产环境的标准配置,我们就可以既方便地在本地开发,又可以随时把应用部署到服务器上。应用程序读取配置文件需要优先从config_override.py 读取。为了简化读取配置文件,可以把所有配置读取到统一的 config.py 中:
#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'libingbin2015@aliyun.com'''' Configuration'''import config_defaultclass Dict(dict): ''' Simple dict but support access as x.y style. ''' def __init__(self, names=(), values=(), **kw): super(Dict, self).__init__(**kw) for k, v in zip(names, values): self[k] = v def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(r"'Dict' object has no attribute '%s'" % key) def __setattr__(self, key, value): self[key] = valuedef merge(defaults, override): r = {} for k, v in defaults.items(): if k in override: if isinstance(v, dict): r[k] = merge(v, override[k]) else: r[k] = override[k] else: r[k] = v return rdef toDict(d): D = Dict() for k, v in d.items(): D[k] = toDict(v) if isinstance(v, dict) else v return Dconfigs = config_default.configstry: import config_override configs = merge(configs, config_override.configs)except ImportError: passconfigs = toDict(configs)
本文为博主原创文章,转载请注明出处!
https://my.oschina.net/u/3375733/blog/