利用Flask-Socketio扩展构建实时流应用

原创
2017/03/16 11:20
阅读数 5.6K

模块简介:

说明: 此模块主要用于构建支持实时,双向基于事件的通信,将Websocket和Polling等其它实时通信方式封装成了通用接口,从而可在各个平台/浏览器/设备上稳定工作.

 

快速安装:

1

2

pip install flask-socketio

<script src="https://cdn.socket.io/socket.io-1.4.5.js"></script>

 

应用场景:

1. 实时分析, 服务端将数据推送到客户端,客户端可以为实时计数器,图表,日志等

2. 实时聊天, 通过命名空间和房间实现服务端Socket多路复用.

3. 流式传输, 已经支持任何二进制文件的传输,包括图片,视频,音频.

4. 文档合并, 运行多个用户同时编辑一个文档,并且能够看到每个用户做出的修改.

 

原理介绍:

wKiom1hs3QPRH_G_AACGnZkTY9w659.png

客户端: 利用基于flashsocket/websocket/iframe等封装的的socket对象和通用抽象方法(transport接口),包含数据编码/解码/心跳处理等

服务端: 利用namespace+room实现服务端socket多路复用,namespace基于客户端url中path部分区分应用,不同应用相互隔离,默认为/,room基于客户端指定namespace和room限制应用消息有效范围,如果没有指定,则除了自己外其它属于此namespace的socket都会收到消息.

 

常用事件:

服务端
connect 当客户端与服务端连接成功后被触发
message 当客户端使用send发送数据时被触发
disconnect 当客户端与服务端失去连接时被触发,如关闭浏览器,主动断开,掉线等任何断开连接的情况.
客户端
connect 当客户端与服务端连接成功后被触发
message 当服务端使用send发送数据时被触发
disconnect 当客户端主动断开连接时被触发.

 

接收消息: [客户端发送消息<- 回调确认 -> 服务接收消息]

# 方式一: 客户端通过send发送的未命名事件数据,服务端只能使用默认message事件接收处理, 客户端定义的事件回调函数接收的数据来自于服务端message事件处理函数的返回值

socket.send([data], ...., callback)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    socket.on('connect'function(){

        socket.send({'data''hello word!'}, function(data){

            console.log('#=> recive server data', data.data);

        });

    });

</script>

1

2

3

4

@io.on('message')

def message_handler(*args):

    print '#=> recive {0} data from client'.format(type(args[0])), args

    return args

# 方式二: 客户端通过emit发送的命名事件数据,服务端只能使用对应自定义事件接收处理, 客户端定义的事件回调函数接收的数据来自于服务端对应事件处理函数的返回值

socket.emit(event_name, [data], ...., callback)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    socket.on('connect'function(){

        socket.emit('connect event', {'data''hello word!'}, function(data){

            console.log('#=> recive server data', data.data);

        });

    });

</script>

1

2

3

4

@io.on('connect event')

def connect_event_handler(*args):

    print '#=> recive {0} data from client'.format(type(args[0])), args

    return args

 

发送消息: [服务端发送消息 <- 回调确认 -> 客户端接收消息]

# 方式一: 服务端通过send发送的未命名事件数据,可指定namespace, callback, broadcast, room, include_self额外参数,客户端只能使用默认message事件接收处理,服务端自定义的回调函数接收的数据来自于客户端对回调函数的调用,不是return的值.

send(message, namespace, callback, broadcast, room, include_self)

1

2

3

4

5

6

7

8

9

10

11

12

13

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    socket.on('message'function(data, func){

      console.log('#=> recive server data', data.data);

      func();

    });

</script>

1

2

3

4

5

def message_event_callback(*args):

    print '#=> client called {0}'.format(inspect.stack()[0][-4:-2])

@io.on('connect')

def connect_event_handler():

    io.send({'data''hello word!'}, callback=message_event_callback)

# 方式二: 服务端通过emit发送的命名事件数据,可指定namespace, callback, broadcast, room, include_self额外参数,客户端只能使用对应自定义事件接收处理,服务端自定义的回调函数接收的数据来自于客户端对回调函数的调用,不是return的值.

emit(event, args, namespace, callback, broadcast, room, include_self)

1

2

3

4

5

6

7

8

9

10

11

12

13

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    socket.on('connect event'function(data, func){

      console.log('#=> recive server data', data.data);

      func();

    });

</script>

1

2

3

4

5

6

7

8

9

def connect_event_callback(*args):

    print '#=> client called {0}'.format(inspect.stack()[0][-4:-2])

@io.on('connect')

def connect_event_handler():

    io.emit(

        'connect event',

        {'data''hello word!'},

        callback=message_event_callback

    )

 

广播消息: [服务端发送消息 <- 回调确认 -> 客户端接收消息]

# 方式一: 服务端通过send发送的未命名事件数据,指定broadcast=True额外参数,可配合namespace/room/include_self额外参数来控制消息发往的应用,发往的房间,是否发给自己,客户端只能使用默认message事件接收处理

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    var user = {

        name: ['zmanman''qmanman''smanman''lmanman'][Math.ceil(Math.random()*3)],

    };

    console.log('#=> current user: ', user);

    socket.on('connect'function(){

        socket.send(user);

    });

    socket.on('message'function(data){

        var $content = $('<p>广播: '+ data.name +'上线.</p>');

        $('#socketio').append($content);

    });

</script>

1

2

3

4

@io.on('message', namespace='/')

def online_notify_handler(*args):

    print '#=> recive {0} data from client'.format(type(args[0])), args

    send(args[0], broadcast=True)

# 方式二: 服务端通过emit发送的命名事件数据,指定broadcast=True额外参数,可配合namespace/room/include_self额外参数来控制消息发往的应用,发往的房间,是否发给自己,客户端只能使用对应自定义事件接收处理

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    var user = {

        name: ['zmanman''qmanman''smanman''lmanman'][Math.ceil(Math.random()*3)],

    };

    console.log('#=> current user: ', user);

    socket.on('connect'function(){

        socket.emit('online notify', user);

    });

    socket.on('online notify'function(data){

        var $content = $('<p>广播: '+ data.name +'上线.</p>');

        $('#socketio').append($content);

    });

</script>

1

2

3

4

@io.on('online notify', namespace='/')

def online_notify_handler(*args):

    print '#=> recive {0} data from client'.format(type(args[0])), args

    emit('online notify', args[0], broadcast=True)

 

分组广播: [服务端发送消息 <- 回调确认 -> 客户端接收消息]

# 方式一: 服务端通过send发送的未命名事件数据,指定broadcast=True和room=xxoo额外参数,可配合namespace/include_self额外参数来控制消息发往的应用,发往的房间,是否发给自己,服务端提供了join_room和leave_room函数来对请求分组,客户端只能使用默认message事件接收处理

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    var name = [

                    'zmanman'

                    'qmanman'

                    'smanman'

                    'lmanman',

               ][Math.ceil(Math.random()*3)];

    socket.on('connect'function(){

        socket.send('user join', {'room''room1''user': name});

    });

    socket.on('message'function(data){

        if(data.action=='sys boradcast'){

            console.log('公告: ' + data.message);

        };

    });

</script>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Date    : 2016-12-27 19:22:04

# @Author  : 李满满 (xmdevops@vip.qq.com)

# @Link    : http://xmdevops.blog.51cto.com/

# @Version : $Id$

from __future__ import absolute_import

# 说明: 导入公共模块

from flask import request

from datetime import datetime

from flask_socketio import join_room, leave_room

# 说明: 导入其它模块

from import io

io.room_users = {}

io.reqid_info = {}

@io.on('connect')

def connect_event_handler():

    @io.on('message')

    def user_join_handler(action, data):

        if action == 'user join':

            room = data['room']

            user = data['user']

            susr = '{0}_{1}'.format(user, request.sid)

            io.room_users.setdefault(room, []).append(susr)

            join_room(room)

            io.reqid_info.setdefault(request.sid, {}).update({

                'room': room,

                'user': user,

            })

            message = '#=> [{0}] user: {1} action: {2} room {3}.'.format(

                datetime.now(), susr, 'join', room

            )

            print message

            print '#=> current rooms:', io.room_users.keys()

            users = []

            for item_users in io.room_users.itervalues():

                users.extend(item_users)

            print '#=> current users:', users

            io.send({'action''sys boradcast''message': message})

    @io.on('disconnect')

    def disconnect_handler():

        user = io.reqid_info[request.sid]['user']

        susr = '{0}_{1}'.format(user, request.sid)

        room = io.reqid_info[request.sid]['room']

        io.room_users[room].remove(susr)

        message = '#=> [{0}] user: {1} action: {2} room {3}.'.format(

            datetime.now(), susr, 'leave', room

        )

        print message

        print '#=> current rooms:', io.room_users.keys()

        users = []

        for item_users in io.room_users.values():

            users.extend(item_users)

        print '#=> current users:', users

        io.send({'action''sys boradcast''message': message})

        leave_room(room)

说明: 如上定义io.room_users = {},io.reqid_info = {}分别存储房间用户和请求信息,主要是为了存储在线用户以及用户信息,实际生产中可用Redis等后端替换.

扩展: 此插件运行时内存中也维护了一份儿request.sid属于哪个namespace哪个room,但由于connect时没有指定namespace和room而room并没有默认值,作者使用了None代替,并且为了后续的双向回调跟踪,所以在room中也包含了request.sid,而且在接口方面也做的不尽人意,所以还是推荐自己手动实现几个全局对象来存储这些信息.

# 方式二: 服务端通过emit发送的命名事件数据,指定broadcast=True和room=xxoo额外参数,可配合namespace/include_self额外参数来控制消息发往的应用,发往的房间,是否发给自己,服务端提供了join_room和leave_room函数来对请求分组,客户端只能使用自定义事件接收处理

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

<script type="text/javascript">

    var socket = io.connect(

        location.protocol+

        '//'+

        document.domain+

        ':'+

        location.port

    );

    var name = [

                    'zmanman'

                    'qmanman'

                    'smanman'

                    'lmanman',

               ][Math.ceil(Math.random()*3)];

    socket.on('connect'function(){

        socket.emit('user join', {'room''room1''user': name});

    });

    socket.on('sys broadcast'function(data){

        console.log('公告: ' + data.message);

    });

</script>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Date    : 2016-12-27 19:22:04

# @Author  : 李满满 (xmdevops@vip.qq.com)

# @Link    : http://xmdevops.blog.51cto.com/

# @Version : $Id$

from __future__ import absolute_import

# 说明: 导入公共模块

import inspect

from flask import request

from datetime import datetime

from flask_socketio import join_room, leave_room

# 说明: 导入其它模块

from import io

io.room_users = {}

io.reqid_info = {}

@io.on('connect')

def connect_event_handler():

    @io.on('user join')

    def user_join_handler(data):

        user = data['user']

        room = data['room']

        susr = '{0}_{1}'.format(user, request.sid)

        io.room_users.setdefault(room, []).append(susr)

        io.reqid_info.setdefault(request.sid, {}).update({

            'room': room,

            'user': user,

        })

        join_room(room)

        message = '#=> [{0}] user: {1} action: {2} room: {3}.'.format(

            datetime.now(), susr, inspect.stack()[0][-4:-2], room

        )

        print message

        users = []

        for item_users in io.room_users.itervalues():

            users.extend(item_users)

        print '#=> current users: ', users

        io.emit('sys broadcast', {'message': message})

    @io.on('disconnect')

    def disconnect_handler():

        user = io.reqid_info[request.sid]['user']

        susr = '{0}_{1}'.format(user, request.sid)

        room = io.reqid_info[request.sid]['room']

        io.room_users[room].remove(susr)

        message = '#=> [{0}] user: {1} action: {2} room: {3}.'.format(

            datetime.now(), susr, inspect.stack()[0][-4:-2], room

        )

        print message

        io.emit('sys broadcast', {'message': message})

        leave_room(room)

 

错误处理:

1

2

3

4

5

6

7

8

9

10

# 说明: 处理指定NameSpace的异常

@io.on_error()

def error_handler(e):

    print request.event['message']

    print request.event['args']

# 说明: 处理所有NameSpace的异常

@io.on_error_default

def default_error_handler(e):

    print request.event['message']

    print request.event['args']

说明: request.event是在被io.on装饰的时候被附加上去的属性,是一个字典,默认包含message和args,也就是事件名和事件相关的参数,在出现异常时可通过打印它们来获取异常请求信息.

 

异步切换:

说明: flask-socketio和客户端和服务端的交互是双向的,当你循环send/emit的时候会出现缓冲区阻塞,可通过io.sleep(0.1)或import eventlet;eventlet.monkey_patch()打补丁来实现异步IO,但是这样发送极快,如果使用eventlet或基于eventlet的gevent异步协程库的时候使用eventlet.sleep(0.1),减慢切换时间,这样防止浏览器端卡死.

 

全局对象:

说明: 作为FLASK插件,基于程序上下文可使用current_app, g全局对象,基于请求上下文可使用request, session全局对象, 所有的会话是基于request.sid, 且运行时会自动注册event和namespace到request对象.

 

用户验证:

说明: flask-socketio默认是不支持用户验证的,可借助flask-login插件通过login_user()函数来将用户ID信息存储到本地,然后通过程序上下文中的current_user对象来还原用户对象,判断是否登录是否有权限访问等.

 

消息队列:

说明: flask-socketio还支持多消费者分布式横向扩展,只需在实例化SocketIO时指定async_mode和message_queue,message_queue目前只支持redis://和amqp://,接口分别使用的是redis和kombu.由于每个节点都维护着一份儿客户端连接集,所以如果使用了前端Nginx负载均衡,需要Session同步或IP_HASH算法负载,节点启动时会自动订阅flask-socketio频道,所以我们可以用PY-REDIS客户端去发布消息,那么所有节点都可以获取到消息然后再分发给指定的客户端.

 

 

登录乐搏学院官网http://www.learnbo.com/

或关注我们的官方微博微信,还有更多惊喜哦~

 

本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1889053

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部