Flask

  1. 短小精悍,可扩展强的一个web框架。上下文管理

  2. 安装:pip3 install Flask,我们做web开发时,是站在两大东西之上做的web框架和wsgi,Flask和Django中都是应用的并不是自己写的。Flask中werkzurg就是。

  3. web服务网关接口,wsgi是一个协议,实现该写一个的模块:wsgiref|werkzeug,实现其协议的模块本质上就是socket服务端用于接收用户请求,并处理。一般web框架基于wsgi实现,这样实现关注点分离。

  4. flask就是基于Werkzurg搭建起来的

    from werkzeug.wrappers import Request, Response
    from werkzeug.serving import run_simple
    ​
    @Request.application
    def run(environ, start_response):
        return Response("Hello World!")         # 返回字符串
    if __name__=="__main__":
        run_simple("localhost", 4000, run)      # 监听本机的4000端口,如果访问就执行run函数
  5. 一个最简单的Flask

    from flask import Flask
    app = Flask(__name__)                       # flask名称,一般这样写。
    ​
    @app.route("/index")                        # url路径
    def index():                                #所调用的函数
        return "Hello World!"
    if __name__ == "__main__":
        app.run()                               # 执行口
  6. Flask实现用户登录程序

    from flask import Flask, render_template, request, redirect, session
    app = Flask(__name__, template_folder="templates", static_folder="static")
    app.secret_key = "asdf"                         # session加盐
    # request.form          # 请求体
    # request.args          # 请求头
    # request.method        # 请求类型
    @app.route("/login", methods=["GET", "POST"])
    def login():
        if request.method == "GET":
            return render_template("login.html")
        user = request.from.get("user")
        pwd= request.from.get("pwd")
        if user == "alex" and pwd == "666":
            session["user"] = user
            return redirect("/index")
        return render_template("login.html", error="用户名或密码错误")
    ​
    @app.route("/index")
    def index():
        user = session.get("user")
        if not user:
            return redirect("/login")
        return render_template("index.html")
    ​
    if __name__ == "__main__":
        app.run()

1.配置文件

  1. settings

    class Config(object):
        DEBUG = False
        TESTING = False
        DATABASE_URL = "sqlite://:memory:"
    ​
    class ProductionConfig(Config):
        DATABASE_URL = "mysql://user@localhost/foo"
        
    class DevelopmentConfig(Config):
        DEBUT = True
        
    class TestingConfig(Config):
        TESTING = True
        
    flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
    {
        'DEBUG':                                get_debug_flag(default=False),  是否开启Debug模式
        'TESTING':                              False,                          是否开启测试模式
        'PROPAGATE_EXCEPTIONS':                 None,                          
        'PRESERVE_CONTEXT_ON_EXCEPTION':        None,
        'SECRET_KEY':                           None,
        'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),
        'USE_X_SENDFILE':                       False,
        'LOGGER_NAME':                          None,
        'LOGGER_HANDLER_POLICY':               'always',
        'SERVER_NAME':                          None,
        'APPLICATION_ROOT':                     None,
        'SESSION_COOKIE_NAME':                  'session',
        'SESSION_COOKIE_DOMAIN':                None,
        'SESSION_COOKIE_PATH':                  None,
        'SESSION_COOKIE_HTTPONLY':              True,
        'SESSION_COOKIE_SECURE':                False,
        'SESSION_REFRESH_EACH_REQUEST':         True,
        'MAX_CONTENT_LENGTH':                   None,
        'SEND_FILE_MAX_AGE_DEFAULT':            timedelta(hours=12),
        'TRAP_BAD_REQUEST_ERRORS':              False,
        'TRAP_HTTP_EXCEPTIONS':                 False,
        'EXPLAIN_TEMPLATE_LOADING':             False,
        'PREFERRED_URL_SCHEME':                 'http',
        'JSON_AS_ASCII':                        True,
        'JSON_SORT_KEYS':                       True,
        'JSONIFY_PRETTYPRINT_REGULAR':          True,
        'JSONIFY_MIMETYPE':                     'application/json',
        'TEMPLATES_AUTO_RELOAD':                None,
    }
  2. xx

    # 如何通过代码找到指定的类
    import importlib
    path = "settings.Foo"
    ​
    p, c = path.rsplit(".", maxsplit=1)
    m = importlib.import_module(p)
    cls = getattr(m, c)
    # 找出类中的所有静态变量
    for key in dir(cls):
        if key.isupper():
            print(key,getattr(cls, key))
  3. Flask中有一个配置文件

    from flask import Flask, render_template, request, redirect, session
    app = Flask(__name__)
    ​
    print(app.config)                           # 这是flask提供的配置文件
    # app.config.["DEBUG"] = True                   # 我们可以通过这样来修改配置,但是flask也提供了另一种方式
    app.config.from_object("sttings.DevelopmentConfig")     # 这种方式就是自己创建一个文件和类,你在类中写的所有配置都会被应用。
    ​
    ​
    if __name__ == "__main__":
        app.run()

     

2.路由和视图

  1. flask的路由系统

    from flask import Flask, render_template, request, redirect, session, url_for
    app = Flask(__name__)
    # int 是值的类型,nid是名称:/login/1234
    """
    # 这是常用的类型,不加类型默认是字符串类型
    DEFAULT_CONVERTERS = {
        'default':          UnicodeConverter,
        'string':           UnicodeConverter,
        'any':              AnyConverter,
        'path':             PathConverter,
        'int':              IntegerConverter,
        'float':            FloatConverter,
        'uuid':             UUIDConverter,
    }
    """
    @app.route("/login/<int:nid>", endpoint="n1")               # endpoint就是设置名字,不写默认是函数名
    def login():
        url_for("n1", nid=122)                      # 反向查找url,nid是携带的值
    ​
    if __name__ == "__main__":
        app.run()
  2. 路由的两种方式

    @app.route("/xxx")
        def index():
            return "index"
    # 其实到内部调用时就是使用了app.add_url_rule来实现的,为了方便所以在外面套了个装饰器。
    def index():
        return "index"
    app.add_url_rule("/xxx", None, index)
    ​
    # 注意事项:
        # - 不用让endpoint重名
        # - 如果重名函数也一定要相同。
  3. app.route和app.add_url_rule参数

    @app.route和app.add_url_rule参数:
        rule,                       URL规则
        view_func,                  视图函数名称
        defaults=None,              默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
        endpoint=None,              名称,用于反向生成URL,即: url_for('名称')
        methods=None,               允许的请求方式,如:["GET","POST"]
                
        strict_slashes=None,        对URL最后的 / 符号是否严格要求,
            如:
                @app.route('/index',strict_slashes=False),
                    访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
                @app.route('/index',strict_slashes=True)
                    仅访问 http://www.xx.com/index 
        redirect_to=None,           重定向到指定地址
            如:
                @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
                或
                def func(adapter, nid):
                    return "/home/888"
                @app.route('/index/<int:nid>', redirect_to=func)
        subdomain=None,             子域名访问
            from flask import Flask, views, url_for
    ​
            app = Flask(import_name=__name__)
            app.config['SERVER_NAME'] = 'wupeiqi.com:5000'
    ​
            @app.route("/", subdomain="admin")
            def static_index():
                """Flask supports static subdomains
                This is available at static.your-domain.tld"""
                return "static.your-domain.tld"
    ​
    ​
            @app.route("/dynamic", subdomain="<username>")
            def username_index(username):
                """Dynamic subdomains are also supported
                Try going to user1.your-domain.tld/dynamic"""
                return username + ".your-domain.tld"
    ​
    ​
            if __name__ == '__main__':
            app.run()
  4. CBV

    import functools
    from flask import Flask,views
    app = Flask(__name__)
    ​
    ​
    def wrapper(func):
        @functools.wraps(func)
        def inner(*args,**kwargs):
            return func(*args,**kwargs)
    ​
        return inner
    ​
    ​
    ​
    class UserView(views.MethodView):
        methods = ['GET']
        decorators = [wrapper,]
    ​
        def get(self,*args,**kwargs):
            return 'GET'
    ​
        def post(self,*args,**kwargs):
            return 'POST'
    ​
        app.add_url_rule('/user',None,UserView.as_view('uuuu'))
    ​
        if __name__ == '__main__':
            app.run()
  5. 自定义正则

    from flask import Flask,url_for
    ​
    app = Flask(__name__)
    ​
    # 步骤一:定制类
    from werkzeug.routing import BaseConverter
    class RegexConverter(BaseConverter):
        """
        自定义URL匹配正则表达式
        """
    ​
        def __init__(self, map, regex):
            super(RegexConverter, self).__init__(map)
            self.regex = regex
    ​
        def to_python(self, value):
            """
            路由匹配时,匹配成功后传递给视图函数中参数的值
            :param value:
            :return:
            """
            return int(value)
    ​
        def to_url(self, value):
            """
            使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
            :param value:
            :return:
            """
            val = super(RegexConverter, self).to_url(value)
            return val
    ​
    # 步骤二:添加到转换器
    app.url_map.converters['reg'] = RegexConverter
    ​
    """
    1. 用户发送请求
    2. flask内部进行正则匹配
    3. 调用to_python(正则匹配的结果)方法
    4. to_python方法的返回值会交给视图函数的参数
    """
    ​
    # 步骤三:使用自定义正则
    @app.route('/index/<reg("d+"):nid>')
    def index(nid):
        print(nid,type(nid))
    ​
        print(url_for('index',nid=987))
        return "index"
    ​
    if __name__ == '__main__':
        app.run()

     

3.请求和响应

  1. 请求

    # request.method
    # request.args
    # request.form
    # request.values
    # request.cookies
    # request.headers
    # request.path
    # request.full_path
    # request.script_root
    # request.url
    # request.base_url
    # request.url_root
    # request.host_url
    # request.host
    # request.files
    # obj = request.files['the_file_name']
    # obj.save('/var/www/uploads/' + secure_filename(f.filename))
  2. 响应

    from flask import Flask, render_template, request, redirect, session, url_for, jsonify
    ​
    # 响应相关信息
    # return "字符串"
    # return render_template('html模板路径',**{})
    # return redirect('/index.html')
    # return jsonify({"k1":"v1"})
    ​
    # response = make_response(render_template('index.html'))
    # response是flask.wrappers.Response类型
    # response.delete_cookie('key')
    # response.set_cookie('key', 'value')
    # response.headers['X-Something'] = 'A value'
    # return response
  3. 实现登录后才可查看的效果

    # 版本一:在每个函数中都加入判断
    @app.route('/index')
    def index():
        if not session.get('user'):
            return redirect(url_for('login'))
        return render_template('index.html',stu_dic=STUDENT_DICT)
    # 版本二:使用装饰器,但是需要用到functools的wraps,应为你写的装饰器肯定要先执行,不然无法被应用,但是你写的装饰器到最后被调用的函数是inner,但是我们需要自己的属性才可以实现。wraps的功能就是将inner赋予调用函数的所有属性和特征。
    import functools
    def auth(func):
        @functools.wraps(func)
        def inner(*args,**kwargs):
            if not session.get('user'):
                return redirect(url_for('login'))
            ret = func(*args,**kwargs)
            return ret
        return inner
    ​
    @app.route('/index')
    @auth
    def index():
        return render_template('index.html',stu_dic=STUDENT_DICT)
    ​
    # 应用场景:比较少的函数中需要额外添加功能。
    ​
    # 版本三:before_request,在执行函数时会先执行before_request所定义的函数,返回None就会向后执行,如果返回数据就会直接将其返回到前端了。
    @app.before_request
    def xxxxxx():
        if request.path == '/login':
            return None
    ​
        if session.get('user'):
            return None
    ​
        return redirect('/login')

     

4.模板渲染

  1. 基本数据类型:可以执行python语法,如:dict.get() list['xx']

  2. 传入函数:django,自动执行flask,不自动执行

  3. 全局定义函数

    @app.template_global()
    def sb(a1, a2):
        # {{sb(1,9)}}
        return a1 + a2
    ​
    @app.template_filter()
    def db(a1, a2, a3):
        # {{ 1|db(2,3) }}
        return a1 + a2 + a3
  4. 模板继承

    layout.html
    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
    <meta charset="UTF-8">
    <title>Title</title>
    <meta name="viewport" content="width=device-width, initial-scale=1">
    </head>
    <body>
    <h1>模板</h1>
    {% block content %}{% endblock %}
    </body>
    </html>
    ​
    tpl.html
    {% extends "layout.html"%}
    ​
    ​
    {% block content %}
    {{users.0}}
    ​
    ​
    {% endblock %}  
  5. include

    {% include "form.html" %}
    			
    			
    form.html 
        <form>
            asdfasdf
            asdfasdf
            asdf
            asdf
        </form>
  6. {% macro ccccc(name, type='text', value='') %}
    	<h1>宏</h1>
    	<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
    	<input type="submit" value="提交">
    {% endmacro %}
    
    {{ ccccc('n1') }}
    
    {{ ccccc('n2') }}
  7. flask默认时设置了xss的后端将html代码返回到前端时需要使用安全MarkUp("<div>asdf</div>"),前端也可以直接将其应用{{u|safe}}

5.session

  1. 当请求刚到来:flask读取cookie中session对应的值:eyJrMiI6NDU2LCJ1c2VyIjoib2xkYm95,将该值解密并反序列化成字典,放入内存以便视图函数使用。

    @app.route('/ses')
    def ses():
        session['k1'] = 123
        session['k2'] = 456
        del session['k1']
    
        return "Session"
    
    session['xxx'] = 123
    session['xxx']
  2. 当请求结束时,flask会读取内存中字典的值,进行序列化+加密,写入到用户cookie中。

  3. threading.local【和flask无任何关系】

    1. 为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。

      import threading
      from threading import local
      import time
      
      obj = local()
      
      def task(i):
          obj.xxxxx = i
          time.sleep(2)
          print(obj.xxxxx,i)
      
          for i in range(10):
              t = threading.Thread(target=task,args=(i,))
              t.start()
    2. 根据字典自定义一个类似于threading.local功能?

      import time
      import threading
      
      DIC = {}
      
      def task(i):
          ident = threading.get_ident()
          if ident in DIC:
              DIC[ident]['xxxxx'] = i
          else:
              DIC[ident] = {'xxxxx':i }
              time.sleep(2)
      
      	print(DIC[ident]['xxxxx'],i)
      
      for i in range(10):
      	t = threading.Thread(target=task,args=(i,))
      	t.start()
    3. 通过getattr/setattr 构造出来 threading.local的加强版(协程)

      import time
      import threading
      try:
          import greenlet
          get_ident =  greenlet.getcurrent						# 获取协程唯一标识
      except Exception as e:
          get_ident = threading.get_ident							# 获取线程唯一标识
      
          class Local(object):
              DIC = {}
      
              def __getattr__(self, item):			# obj.xxx时触发,item等于xxx
                  ident = get_ident()
                  if ident in self.DIC:
                      return self.DIC[ident].get(item)
                  return None
      
              def __setattr__(self, key, value):		# obj.xxx = "123"时触发,key等于xxx,value等于123
                  ident = get_ident()
                  if ident in self.DIC:
                      self.DIC[ident][key] = value
                      else:
                          self.DIC[ident] = {key:value}
      						
      
      				obj = Local()
      
      				def task(i):
      					obj.xxxxx = i
      					time.sleep(2)
      					print(obj.xxxxx,i)
      
      				for i in range(10):
      					t = threading.Thread(target=task,args=(i,))
      					t.start()

5.1 flask-session

  1. 安装:pip3 install flask-session

  2. 使用

    import redis
    from flask import Flask,request,session
    from flask.sessions import SecureCookieSessionInterface
    from flask_session import Session
    
    app = Flask(__name__)
    
    # app.session_interface = SecureCookieSessionInterface()
    # app.session_interface = RedisSessionInterface()
    app.config['SESSION_TYPE'] = 'redis'
    app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234')
    Session(app)
    
    @app.route('/login')
    def login():
        session['user'] = 'alex'
        return 'asdfasfd'
    
    @app.route('/home')
    def index():
        print(session.get('user'))
    
        return '...'
    
    
    if __name__ == '__main__':
        app.run()

     

6.闪现和中间件

  1. 闪现,在session中存储一个数据,读取时通过pop将数据移除。

    from flask import Flask,flash,get_flashed_messages
    @app.route('/page1')
    def page1():
    	flash('临时数据存储','error')
    	flash('sdfsdf234234','error')
    	flash('adasdfasdf','info')
    	return "Session"
    
    @app.route('/page2')
    def page2():
    	print(get_flashed_messages(category_filter=['error']))
    	return "Session"
  2. 中间件

    call方法什么时候出发?
    	用户发起请求时,才执行。
    任务:在执行call方法之前,做一个操作,call方法执行之后做一个操作。
    class Middleware(object):
        def __init__(self,old):
            self.old = old
    
        def __call__(self, *args, **kwargs):
            ret = self.old(*args, **kwargs)
                return ret
    
    if __name__ == '__main__':
    	app.wsgi_app = Middleware(app.wsgi_app)
     	app.run()

7.蓝图

  1. 完整目录图

    ProjectName
    -ProjectName
    --static
    --templates
    --views
    ---account.py
    	from flask import Blueprint,render_template
    	# 创建蓝图对象
        ac = Blueprint('ac',__name__)
    	# 只有调用该文件中的页面时才会触发
        @ac.before_request
        def x1():
            print('app.before_request')
    
    
        @ac.route('/login')
        def login():
            return render_template('login.html')
    
    
        @ac.route('/logout')
        def logout():
            return 'Logout'
    --__init__.py
    	from flask import Flask
        from .views.account import ac
        from .views.user import uc
    
        def create_app():
    
            app = Flask(__name__)
    
            # @app.before_request
            # def x1():
            #     print('app.before_request')
    
    		# 注册蓝图
            app.register_blueprint(ac)
            # 注册蓝图和编写url前缀
            app.register_blueprint(uc,url_prefix='/api')
            return app
    
    
    -manage.py
    	from crm import create_app
    
        app = create_app()
    
        if __name__ == '__main__':
            app.run()

8.特殊装饰器

  1. before_request and after_request

    from flask import Flask
    app = Flask(__name__)
    
    
    @app.before_request
    def x1():
        print('before:x1')
        return '滚'
    
    @app.before_request
    def xx1():
        print('before:xx1')
    
    
    @app.after_request
    def x2(response):
        print('after:x2')
        return response
    
    @app.after_request
    def xx2(response):
        print('after:xx2')
        return response
    
    
    @app.route('/index')
    def index():
        print('index')
        return "Index"
    
    
    @app.route('/order')
    def order():
        print('order')
        return "order"
    
    
    if __name__ == '__main__':
    
        app.run()
    
    # 执行结果是,before的返回会将其返回到页面,但是after还是会执行,并不像django返回后后面的都不会执行
    before:x1
    before:xx1
    request
    after:xx2
    after:x2
  2. before_first_request,只有第一次时才会被触发

    from flask import Flask
    app = Flask(__name__)
    
    @app.before_first_request
    def x1():
        print('123123')
    
    
    @app.route('/index')
    def index():
        print('index')
        return "Index"
    
    
    @app.route('/order')
    def order():
        print('order')
        return "order"
    
    if __name__ == '__main__':
        app.run()
  3. 页面渲染的全局装饰器

    @app.template_global()
    def sb(a1, a2):
        # {{sb(1,9)}}
        return a1 + a2
    
    @app.template_filter()
    def db(a1, a2, a3):
        # {{ 1|db(2,3) }}
        return a1 + a2 + a3
  4. errorhandler,当报指定的错误时会触发。

    @app.errorhandler(404)
    def not_found(arg):
        print(arg)
        return "没找到"

9.上下文管理

  1. 偏函数

    import functools
    
    def index(a1,a2):
        return a1 + a2
    
    # 原来的调用方式
    # ret = index(1,23)
    # print(ret)
    
    # 偏函数,帮助开发者自动传递参数
    new_func = functools.partial(index,666)
    ret = new_func(1)
    print(ret)
  2. 父类方法的执行

    """
    class Base(object):
    
        def func(self):
            print('Base.func')
    
    class Foo(Base):
    
        def func(self):
            # 方式一:根据mro的顺序执行方法
            # super(Foo,self).func()
            # 方式二:主动执行Base类的方法
            # Base.func(self)
    
            print('Foo.func')
    
    
    obj = Foo()
    obj.func()
    """
    ####################################
    class Base(object):
    
        def func(self):
            super(Base, self).func()
            print('Base.func')
    
    class Bar(object):
        def func(self):
            print('Bar.func')
    
    class Foo(Base,Bar):
        pass
    
    # 示例一
    # obj = Foo()
    # obj.func()
    # print(Foo.__mro__)
    
    # 示例二
    # obj = Base()
    # obj.func()
  3. # by luffycity.com
    
    class Stack(object):
    
        def __init__(self):
            self.data = []
    
        def push(self,val):
            self.data.append(val)
    
        def pop(self):
            return self.data.pop()
    
        def top(self):
            return self.data[-1]
    
    _stack = Stack()
    
    _stack.push('佳俊')
    _stack.push('咸鱼')
    
    print(_stack.pop())
    print(_stack.pop())
  4. Local

    """
    {
        1232:{k:v}
    }
    
    
    """
    try:
        from greenlet import getcurrent as get_ident
    except:
        from threading import get_ident
    
    """
    class Local(object):
        
        def __init__(self):
            object.__setattr__(self,'storage',{})
        
        def __setattr__(self, key, value):
            ident = get_ident()
            if ident not in self.storage:
                self.storage[ident] = {key:value}
            else:
                self.storage[ident][key] = value
                
        def __getattr__(self, item):
            ident = get_ident()
            if ident in self.storage:
                return self.storage[ident].get(item)
    """
    
    
    class Local(object):
        __slots__ = ('__storage__', '__ident_func__')
    
        def __init__(self):
            # __storage__ = {1231:{'stack':[]}}
            object.__setattr__(self, '__storage__', {})
            object.__setattr__(self, '__ident_func__', get_ident)
    
        def __getattr__(self, name):
            try:
                return self.__storage__[self.__ident_func__()][name]
            except KeyError:
                raise AttributeError(name)
    
        def __setattr__(self, name, value):
            ident = self.__ident_func__()
            storage = self.__storage__
            try:
                storage[ident][name] = value
            except KeyError:
                storage[ident] = {name: value}
    
        def __delattr__(self, name):
            try:
                del self.__storage__[self.__ident_func__()][name]
            except KeyError:
                raise AttributeError(name)
    
    
    
    obj = Local()
    obj.stack = []
    obj.stack.append('佳俊')
    obj.stack.append('咸鱼')
    print(obj.stack)
    print(obj.stack.pop())
    print(obj.stack)
  5. LocalStack

    import functools
    try:
        from greenlet import getcurrent as get_ident
    except:
        from threading import get_ident
    
    class Local(object):
        __slots__ = ('__storage__', '__ident_func__')
    
        def __init__(self):
            # __storage__ = {1231:{'stack':[]}}
            object.__setattr__(self, '__storage__', {})
            object.__setattr__(self, '__ident_func__', get_ident)
    
        def __getattr__(self, name):
            try:
                return self.__storage__[self.__ident_func__()][name]
            except KeyError:
                raise AttributeError(name)
    
        def __setattr__(self, name, value):
            # name=stack
            # value=[]
            ident = self.__ident_func__()
            storage = self.__storage__
            try:
                storage[ident][name] = value
            except KeyError:
                storage[ident] = {name: value}
    
        def __delattr__(self, name):
            try:
                del self.__storage__[self.__ident_func__()][name]
            except KeyError:
                raise AttributeError(name)
    
    
    """
    __storage__ = {
        12312: {stack:[ctx(session/request) ,]}
    }
    
    """
    
    # obj = Local()
    # obj.stack = []
    # obj.stack.append('佳俊')
    # obj.stack.append('咸鱼')
    # print(obj.stack)
    # print(obj.stack.pop())
    # print(obj.stack)
    
    
    class LocalStack(object):
        def __init__(self):
            self._local = Local()
    
        def push(self,value):
            rv = getattr(self._local, 'stack', None) # self._local.stack =>local.getattr
            if rv is None:
                self._local.stack = rv = [] #  self._local.stack =>local.setattr
            rv.append(value) # self._local.stack.append(666)
            return rv
    
    
        def pop(self):
            """Removes the topmost item from the stack, will return the
            old value or `None` if the stack was already empty.
            """
            stack = getattr(self._local, 'stack', None)
            if stack is None:
                return None
            elif len(stack) == 1:
                return stack[-1]
            else:
                return stack.pop()
    
        def top(self):
            try:
                return self._local.stack[-1]
            except (AttributeError, IndexError):
                return None
    
    
    
    class RequestContext(object):
        def __init__(self):
            self.request = "xx"
            self.session = 'oo'
    
    
    
    _request_ctx_stack = LocalStack()
    
    _request_ctx_stack.push(RequestContext())
    
    
    def _lookup_req_object(arg):
    
        ctx = _request_ctx_stack.top()
    
        return getattr(ctx,arg) # ctx.request / ctx.session
    
    request = functools.partial(_lookup_req_object,'request')
    session = functools.partial(_lookup_req_object,'session')
    
    
    print(request())
    print(session())
  6. slots

    class Foo(object):
        __slots__ = ('name',)
        def __init__(self):
            self.name = 'alex'
            # self.age = 18
    
    
    obj = Foo()
    print(obj.name)
    # print(obj.age)
  7. 请求上下文管理(ctx):request,session

    - 请求到来之后wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法
    - 在wsgi_app方法中:
    - 首先将 请求相关+空session 封装到一个RequestContext对象中,即:ctx。
    - 将ctx交给LocalStack对象,再由LocalStack将ctx添加到Local中,Local结构:
        __storage__ = {
            1231:{stack:[ctx,] }
        }
    - 根据请求中的cookie中提取名称为sessionid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session
    
    -> 视图函数
    
    - 把session中的数据再次写入到cookie中。
    - 将ctx删除
    - 结果返回给用户浏览器
    - 断开socket连接
  8. LocalProxy

    from flask import Flask,request,session
    
    app = Flask(__name__)
    
    
    @app.route('/index')
    def index():
        # 1. request是LocalProxy对象
        # 2. 对象中有method、执行__getattr__
        print(request.method)
        # request['method']
        # request + 1
    
        # 1. session是LocalProxy对象
        # 2. LocalProxy对象的__setitem__
        session['x'] = 123
    
        return "Index"
    
    
    if __name__ == '__main__':
        app.run()
        # app.__call__
        # app.wsgi_app
    
    """
    第一阶段:请求到来
        将request和Session相关数据封装到ctx=RequestContext对象中。
        再通过LocalStack将ctx添加到Local中。
        __storage__ = {
            1231:{'stack':[ctx(request,session)]}
        }
    第二阶段:视图函数中获取request或session
        方式一:直接找LocalStack获取
                from flask.globals import _request_ctx_stack
                print(_request_ctx_stack.top.request.method)
                
        方式二:通过代理LocalProxy(小东北)获取
                from flask import Flask,request
                print(request.method)
                
    """
  9. g只存在在一次请求中当你return时g就会被清除。

10.文件的上传和解压

  1. view.py

    from flask import Blueprint, render_template, Flask, request, redirect,session
    import os
    import uuid
    from ..utils import helper
    
    
    ind = Blueprint('ind', __name__)
    ind.config["MAX_CONTENT_LENGTH"] =  = 1024 * 1024 * 7
    @ind.before_request
    def process_request():
        if not session.get("user_info"):
            return redirect("/login")
        return None
    
    
    @ind.route('/home')
    def home():
        return render_template('home.html')
    
    @ind.route('/user_list')
    def user_list():
        # import pymysql
        # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
        # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        # cursor.execute("SELECT id,user,nickname FROM userinfo")
        # data_list = cursor.fetchall()
        # cursor.close()
        # conn.close()
        data_list = helper.fetch_all("SELECT id,user,nickname FROM userinfo",[])
    
        return render_template('user_list.html',data_list=data_list)
    
    @ind.route('/detail/<int:nid>')
    def detail(nid):
        # import pymysql
        # conn = Config.POOL.connection()
        # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        # cursor.execute("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
        # record_list = cursor.fetchall()
        # cursor.close()
        # conn.close()
        record_list = helper.fetch_all("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
    
        return render_template('detail.html',record_list=record_list)
    
    @ind.route('/upload',methods=['GET','POST'])
    def upload():
        if request.method == "GET":
            return render_template('upload.html')
        from werkzeug.datastructures import FileStorage
        file_obj = request.files.get('code')
    
        # 1. 检查上传文件后缀名
        name_ext = file_obj.filename.rsplit('.',maxsplit=1)
        if len(name_ext) != 2:
            return "请上传zip压缩文件"
        if name_ext[1] != 'zip':
            return "请上传zip压缩文件"
    
        """
        # 2. 接收用户上传文件,并写入到服务器本地.
        file_path = os.path.join("files",file_obj.filename)
        # 从file_obj.stream中读取内容,写入到文件
        file_obj.save(file_path)
    
        # 3. 解压zip文件
        import shutil
        # 通过open打开压缩文件,读取内容再进行解压。
        shutil._unpack_zipfile(file_path,'xsadfasdfasdf')
        """
    
        # 2+3, 接收用户上传文件,并解压到指定目录
        import shutil
        target_path = os.path.join('files',str(uuid.uuid4()))
        shutil._unpack_zipfile(file_obj.stream,target_path)
    
        # 4. 遍历某目录下的所有文件
        # for item in os.listdir(target_path):
        #     print(item)
        total_num = 0
        for base_path,folder_list,file_list in os.walk(target_path):
            for file_name in file_list:
                file_path = os.path.join(base_path,file_name)
                file_ext = file_path.rsplit('.',maxsplit=1)
                if len(file_ext) != 2:
                    continue
                if file_ext[1] != 'py':
                    continue
                file_num = 0
                with open(file_path,'rb') as f:
                    for line in f:
                        line = line.strip()
                        if not line:
                            continue
                        if line.startswith(b'#'):
                            continue
                        file_num += 1
                total_num += file_num
        
        # 获取当前时间
        import datetime
        ctime = datetime.date.today()
        print(total_num,ctime,session['user_info']['id'])
    
        # import pymysql
        # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
        # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        # cursor.execute("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
        # data = cursor.fetchone()
        # cursor.close()
        # conn.close()
        data = helper.fetch_one("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
        if data:
            return "今天已经上传"
    
    
        # import pymysql
        # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
        # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        # cursor.execute("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
        # conn.commit()
        # cursor.close()
        # conn.close()
        helper.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
    
        return "上传成功"
  2. html.py

    {% extends "layout.html"%}
    
    {% block content %}
    
        <h1>代码上传</h1>
    
        <form method="post" enctype="multipart/form-data">
            <input type="file" name="code">
            <input type="submit" value="上传">
        </form>
    
    
    {% endblock %}

11.Database Join 池

  1. 安装pip3 install DBUtils

  2. 创建连接池可以共享出去,谁要用就来获取一个连接,这样可以管理连接的数量,提高链接的利用率。

    from DBUtils.PooledDB import PooledDB, SharedDBConnection
    import pymysql
    
    POOL = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            maxshared=3,
            # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            database='s9day118',
            charset='utf8'
        )
    # 获取连接
    conn = POOL.connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute("select * from tb1")
    result = cursor.fetchall()
    cursor.close()
    # 归还连接
    conn.close()

12.wtforms

  1. WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。

  2. 安装pip3 install wtforms

12.1 用户登录注册示例

  1. 用户登录

    1. 当用户登录时候,需要对用户提交的用户名和密码进行多种格式校验。如:

    2. 用户不能为空;用户长度必须大于6;

    3. 密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则);

      #!/usr/bin/env python
      # -*- coding:utf-8 -*-
      from flask import Flask, render_template, request, redirect
      from wtforms import Form
      from wtforms.fields import core
      from wtforms.fields import html5
      from wtforms.fields import simple
      from wtforms import validators
      from wtforms import widgets
      
      app = Flask(__name__, template_folder='templates')
      app.debug = True
      
      
      class LoginForm(Form):
          name = simple.StringField(
              label='用户名',
              validators=[
                  validators.DataRequired(message='用户名不能为空.'),
                  validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
              ],
              widget=widgets.TextInput(),
              render_kw={'class': 'form-control'}
      
          )
          pwd = simple.PasswordField(
              label='密码',
              validators=[
                  validators.DataRequired(message='密码不能为空.'),
                  validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                  validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                                    message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
      
              ],
              widget=widgets.PasswordInput(),
              render_kw={'class': 'form-control'}
          )
      
      
      
      @app.route('/login', methods=['GET', 'POST'])
      def login():
          if request.method == 'GET':
              form = LoginForm()
              return render_template('login.html', form=form)
          else:
              form = LoginForm(formdata=request.form)
              if form.validate():
                  print('用户提交数据通过格式验证,提交的值为:', form.data)
              else:
                  print(form.errors)
              return render_template('login.html', form=form)
      
      if __name__ == '__main__':
          app.run()
      #######################################################
      
      <!DOCTYPE html>
      <html lang="en">
      <head>
          <meta charset="UTF-8">
          <title>Title</title>
      </head>
      <body>
      <h1>登录</h1>
      <form method="post">
          <!--<input type="text" name="name">-->
          <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
      
          <!--<input type="password" name="pwd">-->
          <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
          <input type="submit" value="提交">
      </form>
      </body>
      </html>
  2. 用户注册

    1. 注册页面需要让用户输入:用户名、密码、密码重复、性别、爱好等。

      from flask import Flask, render_template, request, redirect
      from wtforms import Form
      from wtforms.fields import core
      from wtforms.fields import html5
      from wtforms.fields import simple
      from wtforms import validators
      from wtforms import widgets
      
      app = Flask(__name__, template_folder='templates')
      app.debug = True
      
      
      
      class RegisterForm(Form):
          name = simple.StringField(
              label='用户名',
              validators=[
                  validators.DataRequired()
              ],
              widget=widgets.TextInput(),
              render_kw={'class': 'form-control'},
              default='alex'
          )
      
          pwd = simple.PasswordField(
              label='密码',
              validators=[
                  validators.DataRequired(message='密码不能为空.')
              ],
              widget=widgets.PasswordInput(),
              render_kw={'class': 'form-control'}
          )
      
          pwd_confirm = simple.PasswordField(
              label='重复密码',
              validators=[
                  validators.DataRequired(message='重复密码不能为空.'),
                  validators.EqualTo('pwd', message="两次密码输入不一致")
              ],
              widget=widgets.PasswordInput(),
              render_kw={'class': 'form-control'}
          )
      
          email = html5.EmailField(
              label='邮箱',
              validators=[
                  validators.DataRequired(message='邮箱不能为空.'),
                  validators.Email(message='邮箱格式错误')
              ],
              widget=widgets.TextInput(input_type='email'),
              render_kw={'class': 'form-control'}
          )
      
          gender = core.RadioField(
              label='性别',
              choices=(
                  (1, '男'),
                  (2, '女'),
              ),
              coerce=int
          )
          city = core.SelectField(
              label='城市',
              choices=(
                  ('bj', '北京'),
                  ('sh', '上海'),
              )
          )
      
          hobby = core.SelectMultipleField(
              label='爱好',
              choices=(
                  (1, '篮球'),
                  (2, '足球'),
              ),
              coerce=int
          )
      
          favor = core.SelectMultipleField(
              label='喜好',
              choices=(
                  (1, '篮球'),
                  (2, '足球'),
              ),
              widget=widgets.ListWidget(prefix_label=False),
              option_widget=widgets.CheckboxInput(),
              coerce=int,
              default=[1, 2]
          )
      
          def __init__(self, *args, **kwargs):
              super(RegisterForm, self).__init__(*args, **kwargs)
              self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
      
          def validate_pwd_confirm(self, field):
              """
              自定义pwd_confirm字段规则,例:与pwd字段是否一致
              :param field: 
              :return: 
              """
              # 最开始初始化时,self.data中已经有所有的值
      
              if field.data != self.data['pwd']:
                  # raise validators.ValidationError("密码不一致") # 继续后续验证
                  raise validators.StopValidation("密码不一致")  # 不再继续后续验证
      
      
      @app.route('/register', methods=['GET', 'POST'])
      def register():
          if request.method == 'GET':
              form = RegisterForm(data={'gender': 1})
              return render_template('register.html', form=form)
          else:
              form = RegisterForm(formdata=request.form)
              if form.validate():
                  print('用户提交数据通过格式验证,提交的值为:', form.data)
              else:
                  print(form.errors)
              return render_template('register.html', form=form)
      
      
      
      if __name__ == '__main__':
          app.run()
          
          
      ############################################################
      
      <!DOCTYPE html>
      <html lang="en">
      <head>
          <meta charset="UTF-8">
          <title>Title</title>
      </head>
      <body>
      <h1>用户注册</h1>
      <form method="post" novalidate style="padding:0  50px">
          {% for item in form %}
          <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
          {% endfor %}
          <input type="submit" value="提交">
      </form>
      </body>
      </html>
  3. 动态获取数据库数据

    from flask import Flask,request,render_template,session,current_app,g,redirect
    from wtforms import Form
    from wtforms.fields import simple
    from wtforms.fields import html5
    from wtforms.fields import core
    
    from wtforms import widgets
    from wtforms import validators
    
    app = Flask(__name__)
    
    
    class LoginForm(Form):
        name = simple.StringField(
            validators=[
                validators.DataRequired(message='用户名不能为空.'),
                # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
            ],
            widget=widgets.TextInput(),
            render_kw={'placeholder':'请输入用户名'}
        )
        pwd = simple.PasswordField(
            validators=[
                validators.DataRequired(message='密码不能为空.'),
                # validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
            ],
            render_kw={'placeholder':'请输入密码'}
        )
    
    
    @app.route('/login',methods=['GET','POST'])
    def login():
        if request.method == "GET":
            form = LoginForm()
            # print(form.name,type(form.name)) # form.name是StringField()对象, StringField().__str__
            # print(form.pwd,type(form.pwd))   # form.pwd是PasswordField()对象,PasswordField().__str__
            return render_template('login.html',form=form)
    
        form = LoginForm(formdata=request.form)
        if form.validate():
            print(form.data)
            return redirect('https://www.luffycity.com/home')
        else:
            # print(form.errors)
            return render_template('login.html', form=form)
    
    
    import helper
    class UserForm(Form):
        city = core.SelectField(
            label='城市',
            choices=(),
            coerce=int		# 从数据库获取的数据返回后是字符串类型的,把他写上后会先将其转为int后再返回到前端
        )
        name = simple.StringField(label='姓名')
    	# 当你定义成静态字段是,只有启动时才会加载一次,但是从数据库需要实时拿到数据,下面是解决方法。你应该可以看懂。
        def __init__(self,*args,**kwargs):
            super(UserForm,self).__init__(*args,**kwargs)
    
            self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None)
    
    
    @app.route('/user')
    def user():
        if request.method == "GET":
            #form = UserForm(data={'name':'alex','city':3})
            form = UserForm()
            return render_template('user.html',form=form)
    
    
    if __name__ == '__main__':
        app.run()

     

  4. meta

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from flask import Flask, render_template, request, redirect, session
    from wtforms import Form
    from wtforms.csrf.core import CSRF
    from wtforms.fields import core
    from wtforms.fields import html5
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    from hashlib import md5
    
    app = Flask(__name__, template_folder='templates')
    app.debug = True
    
    
    class MyCSRF(CSRF):
        """
        Generate a CSRF token based on the user's IP. I am probably not very
        secure, so don't use me.
        """
    
        def setup_form(self, form):
            self.csrf_context = form.meta.csrf_context()
            self.csrf_secret = form.meta.csrf_secret
            return super(MyCSRF, self).setup_form(form)
    
        def generate_csrf_token(self, csrf_token):
            gid = self.csrf_secret + self.csrf_context
            token = md5(gid.encode('utf-8')).hexdigest()
            return token
    
        def validate_csrf_token(self, form, field):
            print(field.data, field.current_token)
            if field.data != field.current_token:
                raise ValueError('Invalid CSRF')
    
    
    class TestForm(Form):
        name = html5.EmailField(label='用户名')
        pwd = simple.StringField(label='密码')
    
        class Meta:
            # -- CSRF
            # 是否自动生成CSRF标签
            csrf = True
            # 生成CSRF标签name
            csrf_field_name = 'csrf_token'
    
            # 自动生成标签的值,加密用的csrf_secret
            csrf_secret = 'xxxxxx'
            # 自动生成标签的值,加密用的csrf_context
            csrf_context = lambda x: request.url
            # 生成和比较csrf标签
            csrf_class = MyCSRF
    
            # -- i18n
            # 是否支持本地化
            # locales = False
            locales = ('zh', 'en')
            # 是否对本地化进行缓存
            cache_translations = True
            # 保存本地化缓存信息的字段
            translations_cache = {}
    
    
    @app.route('/index/', methods=['GET', 'POST'])
    def index():
        if request.method == 'GET':
            form = TestForm()
        else:
            form = TestForm(formdata=request.form)
            if form.validate():
                print(form)
        return render_template('index.html', form=form)
    
    
    if __name__ == '__main__':
        app.run()

12.2 其他

  1. metaclass

    # new方法的返回值决定对象到底是什么?
    class Bar(object):
        pass
    
    class Foo(object):
    
        def __new__(cls, *args, **kwargs):
            # return super(Foo,cls).__new__(cls,*args, **kwargs)
            return Bar()
        obj = Foo()
        print(obj)
    # 当Foo()时先执行__new__方法,返回一个Bar,obj就等于Bar对象
    
    
    
    ##########################################################################
    
    
    
    
    class MyType(type):
        def __init__(self, *args, **kwargs):
            print('MyType创建类',self)
            super(MyType, self).__init__(*args, **kwargs)
    
        def __call__(self, *args, **kwargs):
            obj = super(MyType, self).__call__(*args, **kwargs)
            print('类创建对象', self, obj)
            return obj
    
    
    class Foo(object,metaclass=MyType):
        user = 'wupeiqi'
        age = 18
    
    obj = Foo()
    # 在创建Foo对象前先执行mytype的__call__方法,__call__方法后才会执行Foo。不写metaclass默认是由type类来做的。
    # 是先执行父类的__call__后再执行自己的__init__方法。
    
    """
    创建类时,先执行type的__init__。
    类的实例化时,执行type的__call__,__call__方法的的返回值就是实例化的对象。
    __call__内部调用:
    	类.__new__,创建对象
    	类.__init__,对象的初始化
    """
  2. 实例化流程分析

        # 源码流程
        1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中
        2. 执行构造方法
            
            a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,然后将返回值添加到 self._fields[name] 中。
                即:
                    _fields = {
                        name: wtforms.fields.core.StringField(),
                    }
                    
                PS:由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,才变成执行 wtforms.fields.core.StringField()
            
            b. 循环_fields,为对象设置属性
                for name, field in iteritems(self._fields):
                    # Set all the fields to attributes so that they obscure the class
                    # attributes with the same names.
                    setattr(self, name, field)
            c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs)
                优先级:obj,data,formdata;
                
                再循环执行每个字段的process方法,为每个字段设置值:
                for name, field, in iteritems(self._fields):
                    if obj is not None and hasattr(obj, name):
                        field.process(formdata, getattr(obj, name))
                    elif name in kwargs:
                        field.process(formdata, kwargs[name])
                    else:
                        field.process(formdata)
                
                执行每个字段的process方法,为字段的data和字段的raw_data赋值
                def process(self, formdata, data=unset_value):
                    self.process_errors = []
                    if data is unset_value:
                        try:
                            data = self.default()
                        except TypeError:
                            data = self.default
            
                    self.object_data = data
            
                    try:
                        self.process_data(data)
                    except ValueError as e:
                        self.process_errors.append(e.args[0])
            
                    if formdata:
                        try:
                            if self.name in formdata:
                                self.raw_data = formdata.getlist(self.name)
                            else:
                                self.raw_data = []
                            self.process_formdata(self.raw_data)
                        except ValueError as e:
                            self.process_errors.append(e.args[0])
            
                    try:
                        for filter in self.filters:
                            self.data = filter(self.data)
                    except ValueError as e:
                        self.process_errors.append(e.args[0])
                    
            d. 页面上执行print(form.name) 时,打印标签
                
                因为执行了:
                    字段的 __str__ 方法
                    字符的 __call__ 方法
                    self.meta.render_field(self, kwargs)
                        def render_field(self, field, render_kw):
                            other_kw = getattr(field, 'render_kw', None)
                            if other_kw is not None:
                                render_kw = dict(other_kw, **render_kw)
                            return field.widget(field, **render_kw)
                    执行字段的插件对象的 __call__ 方法,返回标签字符串
  3. 验证流程分析

            a. 执行form的validate方法,获取钩子方法
                def validate(self):
                    extra = {}
                    for name in self._fields:
                        inline = getattr(self.__class__, 'validate_%s' % name, None)
                        if inline is not None:
                            extra[name] = [inline]
            
                    return super(Form, self).validate(extra)
            b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)
                def validate(self, extra_validators=None):
                    self._errors = None
                    success = True
                    for name, field in iteritems(self._fields):
                        if extra_validators is not None and name in extra_validators:
                            extra = extra_validators[name]
                        else:
                            extra = tuple()
                        if not field.validate(self, extra):
                            success = False
                    return success
            c. 每个字段进行验证时候
                字段的pre_validate 【预留的扩展】
                字段的_run_validation_chain,对正则和字段的钩子函数进行校验
                字段的post_validate【预留的扩展】

13.SQLAlchemy

  1. MySQLdb,只支持python2.0的,而pymysql是支持python2.0和3.0的。

    import MySQLdb
      
    conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
      
    cur = conn.cursor()
      
    reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa'))
    # reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'})
      
    conn.commit()
      
    cur.close()
    conn.close()
      
    print reCount
  2. SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

  3. 安装pip3 install sqlalchemy

  4. 组成部分:

    • Engine,框架的引擎

    • Connection Pooling ,数据库连接池

    • Dialect,选择连接数据库的DB API种类

    • Schema/Types,架构和类型

    • SQL Exprression Language,SQL表达式语言

  5. SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
        
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
        
    更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
  6. 执行原生SQL语句

    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
     
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
     
     
    def task(arg):
        conn = engine.raw_connection()
        cursor = conn.cursor()
        cursor.execute(
            "select * from t1"
        )
        result = cursor.fetchall()
        cursor.close()
        conn.close()
     
     
    for i in range(20):
        t = threading.Thread(target=task, args=(i,))
        t.start()
        
        
    ########################################################################################
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
    
    
    def task(arg):
        conn = engine.contextual_connect()
        with conn:
            cur = conn.execute(
                "select * from t1"
            )
            result = cur.fetchall()
            print(result)
    
    
    for i in range(20):
        t = threading.Thread(target=task, args=(i,))
        t.start()
        
    
    ########################################################################################
    
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
    from sqlalchemy.engine.result import ResultProxy
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
    
    
    def task(arg):
        cur = engine.execute("select * from t1")
        result = cur.fetchall()
        cur.close()
        print(result)
    
    
    for i in range(20):
        t = threading.Thread(target=task, args=(i,))
        t.start()
        
        
    # 注意: 查看连接 show status like 'Threads%';

13.1 ORM

  1. 创建单表

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    
    Base = declarative_base()
    
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        # email = Column(String(32), unique=True)
        # ctime = Column(DateTime, default=datetime.datetime.now)
        # extra = Column(Text, nullable=True)
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'),
            # Index('ix_id_name', 'name', 'email'),
        )
    
    
    def init_db():
        """
        根据类创建数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    
    def drop_db():
        """
        根据类删除数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.drop_all(engine)
    
    
    if __name__ == '__main__':
        drop_db()
        init_db()
  2. 创建多个表并包含Fk、M2M关系

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    from sqlalchemy.orm import relationship
    
    Base = declarative_base()
    
    
    # ##################### 单表示例 #########################
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        age = Column(Integer, default=18)
        email = Column(String(32), unique=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
        extra = Column(Text, nullable=True)
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'),
            # Index('ix_id_name', 'name', 'extra'),
        )
    
    
    class Hosts(Base):
        __tablename__ = 'hosts'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
    
    
    # ##################### 一对多示例 #########################
    class Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        hobby_id = Column(Integer, ForeignKey("hobby.id"))
    
        # 与生成表结构无关,仅用于查询方便
        hobby = relationship("Hobby", backref='pers')
    
    
    # ##################### 多对多示例 #########################
    
    class Server2Group(Base):
        __tablename__ = 'server2group'
        id = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))
    
    
    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便
        servers = relationship('Server', secondary='server2group', backref='groups')
    
    
    class Server(Base):
        __tablename__ = 'server'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
    
    def init_db():
        """
        根据类创建数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    
    def drop_db():
        """
        根据类删除数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.drop_all(engine)
    
    
    if __name__ == '__main__':
        drop_db()
        init_db()
  3. 操作数据库表

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
      
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
      
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
      
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    session.add(obj1)
      
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
  4. 基于scoped_session实现线程安全

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Users
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    """
    # 线程安全,基于本地线程实现每个线程用同一个session
    # 特殊的:scoped_session中有原来方法的Session中的一下方法:
    
    public_methods = (
        '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
        'close', 'commit', 'connection', 'delete', 'execute', 'expire',
        'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
        'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
        'bulk_update_mappings',
        'merge', 'query', 'refresh', 'rollback',
        'scalar'
    )
    """
    session = scoped_session(Session)
    
    
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    session.add(obj1)
    
    
    
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
  5. 多线程执行示例

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from db import Users
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    
    def task(arg):
        session = Session()
    
        obj1 = Users(name="alex1")
        session.add(obj1)
    
        session.commit()
    
    
    for i in range(10):
        t = threading.Thread(target=task, args=(i,))
        t.start()
  6. 基本增删改查示例

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    
    from db import Users, Hosts
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    session = Session()
    
    # ################ 添加 ################
    """
    obj1 = Users(name="wupeiqi")
    session.add(obj1)
    
    session.add_all([
        Users(name="wupeiqi"),
        Users(name="alex"),
        Hosts(name="c1.com"),
    ])
    session.commit()
    """
    
    # ################ 删除 ################
    """
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    """
    # ################ 修改 ################
    """
    session.query(Users).filter(Users.id > 0).update({"name" : "099"})
    session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
    session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
    session.commit()
    """
    # ################ 查询 ################
    """
    r1 = session.query(Users).all()
    r2 = session.query(Users.name.label('xx'), Users.age).all()
    r3 = session.query(Users).filter(Users.name == "alex").all()
    r4 = session.query(Users).filter_by(name='alex').all()
    r5 = session.query(Users).filter_by(name='alex').first()
    r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
    r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
    """
    
    
    session.close()
  7. 常用操作

    # 条件
    ret = session.query(Users).filter_by(name='alex').all()
    ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
    ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
    ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
    ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
    from sqlalchemy import and_, or_
    ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    ret = session.query(Users).filter(
        or_(
            Users.id < 2,
            and_(Users.name == 'eric', Users.id > 3),
            Users.extra != ""
        )).all()
    
    
    # 通配符
    ret = session.query(Users).filter(Users.name.like('e%')).all()
    ret = session.query(Users).filter(~Users.name.like('e%')).all()
    
    # 限制
    ret = session.query(Users)[1:2]
    
    # 排序
    ret = session.query(Users).order_by(Users.name.desc()).all()
    ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
    
    # 分组
    from sqlalchemy.sql import func
    
    ret = session.query(Users).group_by(Users.extra).all()
    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).all()
    
    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
    
    # 连表
    
    ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
    
    ret = session.query(Person).join(Favor).all()
    
    ret = session.query(Person).join(Favor, isouter=True).all()
    
    
    # 组合
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union(q2).all()
    
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union_all(q2).all()
  8. 原生SQL语句

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    session = Session()
    
    # 查询
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    
    session.close()
  9. 基于relationship操作ForeignKey

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    # 添加
    """
    session.add_all([
        Hobby(caption='乒乓球'),
        Hobby(caption='羽毛球'),
        Person(name='张三', hobby_id=3),
        Person(name='李四', hobby_id=4),
    ])
    
    person = Person(name='张九', hobby=Hobby(caption='姑娘'))
    session.add(person)
    
    hb = Hobby(caption='人妖')
    hb.pers = [Person(name='文飞'), Person(name='博雅')]
    session.add(hb)
    
    session.commit()
    """
    
    # 使用relationship正向查询
    """
    v = session.query(Person).first()
    print(v.name)
    print(v.hobby.caption)
    """
    
    # 使用relationship反向查询
    """
    v = session.query(Hobby).first()
    print(v.caption)
    print(v.pers)
    """
    
    session.close()
  10. 基于relationship操作m2m

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    # 添加
    """
    session.add_all([
        Server(hostname='c1.com'),
        Server(hostname='c2.com'),
        Group(name='A组'),
        Group(name='B组'),
    ])
    session.commit()
    
    s2g = Server2Group(server_id=1, group_id=1)
    session.add(s2g)
    session.commit()
    
    
    gp = Group(name='C组')
    gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
    session.add(gp)
    session.commit()
    
    
    ser = Server(hostname='c6.com')
    ser.groups = [Group(name='F组'),Group(name='G组')]
    session.add(ser)
    session.commit()
    """
    
    
    # 使用relationship正向查询
    """
    v = session.query(Group).first()
    print(v.name)
    print(v.servers)
    """
    
    # 使用relationship反向查询
    """
    v = session.query(Server).first()
    print(v.hostname)
    print(v.groups)
    """
    
    
    session.close()
  11. 其他

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text, func
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 关联子查询
    subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
    result = session.query(Group.name, subqry)
    """
    SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
    FROM server 
    WHERE server.id = `group`.id) AS anon_1 
    FROM `group`
    """
    
    
    # 原生SQL
    """
    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    """
    
    session.close()

14.flask武装

  1. Flask_SQLAlchemy

    a. 下载安装
        pip3 install flask-sqlalchemy
    b. chun.__init__.py 
        导入并实例化SQLAlchemy
        from flask_sqlalchemy import SQLAlchemy
        db = SQLAlchemy()
        
        注意事项:
             - 必须在导入蓝图之前
             - 必须导入models.py 
    c. 初始化
        db.init_app(app)
        
    d. 在配置文件中写入配置
        # ##### SQLALchemy配置文件 #####
        SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/s9day122?charset=utf8"
        SQLALCHEMY_POOL_SIZE = 10
        SQLALCHEMY_MAX_OVERFLOW = 5
    
    e. 创建models.py中的类(对应数据库表)
        chun/models.py 
            from sqlalchemy.ext.declarative import declarative_base
            from sqlalchemy import Column
            from sqlalchemy import Integer,String,Text,Date,DateTime
            from sqlalchemy import create_engine
            from chun import db
    
    
            class Users(db.Model):
                __tablename__ = 'users'
    
                id = Column(Integer, primary_key=True)
                name = Column(String(32), index=True, nullable=False)
                depart_id = Column(Integer)
    
    f. 生成表(使用app上下文)
        from chun import db,create_app
    
        app = create_app()
        app_ctx = app.app_context() # app_ctx = app/g
        with app_ctx: # __enter__,通过LocalStack放入Local中
            db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
    
            
    g. 基于ORM对数据库进行操作。
        from flask import Blueprint
        from chun import db
        from chun import models
        us = Blueprint('us',__name__)
    
    
        @us.route('/index')
        def index():
            # 使用SQLAlchemy在数据库中插入一条数据
            # db.session.add(models.Users(name='高件套',depart_id=1))
            # db.session.commit()
            # db.session.remove()
            result = db.session.query(models.Users).all()
            print(result)
            db.session.remove()
    
            return 'Index'
  2. flask-script

    install the module "pip3 install flask-script "
    
    a. 增加 runserver
        from chun import create_app
        from flask_script import Manager
    
    
        app = create_app()
        manager = Manager(app)
    
        if __name__ == '__main__':
            # app.run()
            manager.run()
            
    b. 位置传参
        from chun import create_app
        from flask_script import Manager
    
    
        app = create_app()
        manager = Manager(app)
    
        @manager.command
        def custom(arg):
            """
            自定义命令
            python manage.py custom 123
            :param arg:
            :return:
            """
            print(arg)
    
    
        if __name__ == '__main__':
            # app.run()
            manager.run()
    c. 关键字传参
        from chun import create_app
        from flask_script import Manager
    
    
        app = create_app()
        manager = Manager(app)
    
        @manager.option('-n', '--name', dest='name')
        @manager.option('-u', '--url', dest='url')
        def cmd(name, url):
            """
            自定义命令
            执行: python manage.py  cmd -n wupeiqi -u http://www.oldboyedu.com
            :param name:
            :param url:
            :return:
            """
            print(name, url)
    
    
        if __name__ == '__main__':
            # app.run()
            manager.run()
  3. flask-migrate

    install the module "pip3 install flask-migrate"
    flask-migrate rely on flask-script.
    
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from sansa import create_app
    from sansa import db
    
    from flask_script import Manager
    from flask_migrate import Migrate, MigrateCommand
    
    app = create_app()
    manager = Manager(app)
    Migrate(app, db)
    
    """
    # 数据库迁移命名
        python manage.py db init
        python manage.py db migrate
        python manage.py db upgrade
    """
    manager.add_command('db', MigrateCommand)
    
    
    if __name__ == '__main__':
        manager.run()
        # app.run()
  4. 找到项目使用的所有组件和版本。

    install the module "pip install pipreqs"
    # 终端输入后就可以找到所有的依赖
    pipreqs ./ --encoding=utf-8 
  5. python虚拟环境

    install module "pip3 install virtualenv"
    
    create "virtualenv env1  --no-site-packages"
    		
    找到虚拟环境的script后输入"activate"激活或"deactivate"退出。
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/InvincibleGrass/p/17562878.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!