文档章节

Node——微服务架构(一)

o
 osc_4nmshwhm
发布于 2018/08/07 05:06
字数 6423
阅读 0
收藏 0

行业解决方案、产品招募中!想赚钱就来传!>>>

new ServiceBroker

default settings

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();

custom settings

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    logLevel: "info"
});

communicate with remote nodes

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    nodeID: "node-1",
    transporter: "nats://localhost:4222",
    logLevel: "debug",
    requestTimeout: 5 * 1000,
    requestRetry: 3
});

broker options

  • logLevel
    • type:string
    • default:info
    • des:可选项目还有 trace、debug、 info、 warn、 error、 fatal
  • middlewares
    • type:Array<Function>
    • default:null
    • des:中间件
  • created
    • type:Function
    • default:null
    • des:broker 实例被创建的时候将会触发此函数
  • started
    • type:Function
    • default:null
    • des:broker 实例开始执行时触发此函数
  • stopped
    • type:Function
    • default:null
    • des:broker 实例停止执行时触发此函数
  • hotReload
    • type:Boolean
    • default:false
    • des:是否启动热加载
  • cacher
    • type:String、Object、Cacher
    • default:null
    • des:若是启动缓存,两个相同模型的 broker.call,只有第一个 call 会让 action 中对应 handler 完整的执行一遍,第二个 call 就不会了,它会直接从缓存中取数据,不常用
    • https://moleculer.services/docs/0.13/caching.html
  • transporter
  • serializer
  • nodeID
    • type:string
    • default:hostname + PID
    • des:这是节点的id,挂载在 某一个 namespace 中是不能够同名的
  • namespace
    • type:string
    • defalut:”“
    • des:分割一个网咯中的不同区域,基本上用不到,除非项目特别复杂,子服务特别多
  • requestTimeout
    • type:Number
    • default:0
    • des:请求超时设置,单位毫秒

createService

该服务表示Moleculer框架中的一个微服务。您可以定义操作并订阅事件。若要创建服务,必须定义架构。服务模式类似于VueJS的一个组件

// 定义了两个actions
broker.createService({
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        },

        sub(ctx) {
            return Number(ctx.params.a) - Number(ctx.params.b);
        }
    }
});

name

  • 强制属性,最后去 call 某一个微服务的时候必须带上 name

version

settings

  • 此属性相当于仓库
    • 赋值可以是对象,对象中设置任意键值对,action 中通过 this.settings.xxxx 能够访问到设置项
    • 远程节点上可以获得这些设置项
    • 有一些内部设置是由核心模块使用的。这些设置名称以$(美元符号)开头
      • $noVersionPrefix
        • type:Boolean
        • default:false
        • des:禁用 action 版本前缀
      • $noServiceNamePrefix
        • type:Boolean
        • default:false
        • des:禁用 action 中的服务名称前缀。
      • $dependencyTimeout
        • type:Number
        • default:0
        • des:依赖等待超时
      • $shutdownTimeout
        • type:Number
        • default:0
        • des:关闭时等待活动请求的超时

mixins

Mixins是一种为Moleculer服务分发可重用功能的灵活方法。服务构造函数将这些混合与当前架构合并。它是在您的服务中扩展其他服务。当服务使用混音时,混音中的所有属性都将“混合”到当前服务中。

const ApiGwService = require("moleculer-web");

module.exports = {
    name: "api",
    mixins: [ApiGwService]
    settings: {
        // Change port setting
        port: 8080
    },
    actions: {
        myAction() {
            // Add a new action to apiGwService service
        }
    }
}

上面的示例创建了一个API服务,该服务继承了ApiGwService的所有内容,但是覆盖了端口设置,并使用新的myAction操作对其进行了扩展

actions

  • action 是服务中可调用的公共方法,broker.call 或 ctx.call,具体 action 必须在 action 中,可以是一个函数,可以是一个对象

events

  • 事件订阅

lifecycle events

  • 有一些生命周期服务事件,这些事件将由代理触发。它们被放置在模式的根中
    • created:broker.loadService 或者 broker.createService 触发
    • started:broker.start() 触发
    • stopped:broker.stop() 触发

methods

  • 创建私有方法,以供 action、event、lifecycle event 使用

dependencies

  • 如果您的服务依赖于其他服务,请使用架构中的依赖项属性。服务在调用已启动的生命周期事件处理程序之前等待依赖服务
  • 除了配做中添加 dependencies 属性,也可以用 broker 实例进行外部设置
    • broker.waitForServices(["posts", "users"]),返回以恶 promise 对象
    • broker.waitForServices("accounts", 10 * 1000, 500),设置超时事件和

metadata

  • 元数据属性,您可以在这里存储有关服务的任何元信息。在服务函数中可以访问到元数据
  • 元数据时可以被远程节点获取的

this

broker.createService

// 创建微服务实例方式之一
broker.createService({
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        }
    }
});

load service from file

math.service.js

// Export the schema of service
module.exports = {
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        },
        sub(ctx) {
            return Number(ctx.params.a) - Number(ctx.params.b);
        }
    }
}
// Create broker
const broker = new ServiceBroker();

// Load service
broker.loadService("./math.service");

// Start broker
broker.start();

推荐使用这样的方式,一目了然,不会在一个文件写过多的代码

Load multiple services from a folder

如果您有很多服务,建议将它们放到一个服务文件夹中,并使用 Serge.loadService s方法加载所有这些服务

broker.loadServices(folder = "./services", fileMask = "**/*.service.js");
// 从 ./services 文件夹(包括子文件夹)加载每个 *.service.js 文件
broker.loadServices();
// 从当前文件夹(包括子文件夹)加载每个 *.service.js 文件
broker.loadServices("./");
// 从“./svc”文件夹加载每个用户*.service.js文件
broker.loadServices("./svc", "user*.service.js");

hot reloading services

Moleculer具有内置的热重加载功能.在开发期间,注意只针对 service.js 文件的修改被启动热重启,其他位置可以使用 nodemon

const broker = new ServiceBroker({
    hotReload: true
});

broker.loadService("./services/test.service.js");

Internal services

// 列出所有已知节点(包括本地节点)
broker.call("$node.list").then(res => console.log(res))

// 列出所有注册的服务(本地和远程)
broker.call("$node.services").then(res => console.log(res))

// 列出所有已注册 action(本地和远程)。
broker.call("$node.actions").then(res => console.log(res))

// 列出所有订阅的事件
broker.call("$node.events").then(res => console.log(res))

// 列出本地节点的健康信息(包括进程和OS信息)
broker.call("$node.health").then(res => console.log(res));

action

action 是服务的可调用的公共方法。action 调用表示远程过程调用(RPC)。它有请求参数&返回响应,就像HTTP请求一样。如果您有多个服务实例,代理将在实例之间负载平衡请求

call services

若要调用服务,请使用 broke.Call 方法。代理查找具有给定 action service (可能在某一个节点上)并调用它。调用之后将会返回一个承诺

const res = await broker.call(actionName, params, opts)
  • params:参数是作为上下文的一部分传递给 action,action service 可以通过 ctx.params 访问传递参数,这是可选的
  • ops:一个对象,用于设置或者覆盖某些请求参数,例如:timeout、retry Count,这是可选的
    • tiemout:请求超时,以毫秒为单位。如果请求超时,而您没有定义应急响应,将会报错。若要禁用设置0,请执行以下操作。如果未定义,将会启用 new ServiceBroker 中的 requestTimeout 设置
    • retries :请求重试次数,如果请求超时,代理将再次尝试调用。若要禁用设置0。如果没有定义,将启用 new ServiceBroker 中的配置
    • fallbackResponse :若请求失败就返回,这是一个 Function
    • nodeID:目标节点,如果设置,它将直接调用给定的节点
    • meta:请求元数据,通过操作处理程序中的 ctx.meta 访问它,它也将在嵌套调用中被传输和合并
    • parentCtx:父亲的上下文实例
    • requestID:请求ID或相关ID。它出现在标准事件中
broker.call("user.recommendation", { limit: 5 }, {
    timeout: 500,
    retries: 3,
    fallbackResponse: defaultRecommendation
}).then(res => console.log("Result: ", res));

meta

  • 元信息发送到具有元属性的服务,通过 action 处理程序中的ctx.meta访问它。请注意,在嵌套调用时,元被合并。

Streaming

  • Moleculer支持Node.js流作为请求参数和响应。使用它来传输从网关上传的文件,或者编码/解码或压缩/解压缩流
const stream = fs.createReadStream(fileName);

broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});	
  • 请注意,参数应该是一个流,您不能向参数中添加更多的变量。使用元属性传输其他数据。
  • 服务中接受流
module.exports = {
    name: "storage",
    actions: {
        save(ctx) {
            const s = fs.createWriteStream(`/tmp/${ctx.meta.filename}`);
            ctx.params.pipe(s);
        }
    }
};
  • 将流作为服务中的响应返回
module.exports = {
    name: "storage",
    actions: {
        get: {
            params: {
                filename: "string"
            },
            handler(ctx) {
                return fs.createReadStream(`/tmp/${ctx.params.filename}`);
            }
        }
    }
};
  • 调用方接受流
const filename = "avatar-123.jpg";
broker.call("storage.get", { filename })
    .then(stream => {
        const s = fs.createWriteStream(`./${filename}`);
        stream.pipe(s);
        s.on("close", () => broker.logger.info("File has been received"));
    })
  • AES编解码示例服务
const crypto = require("crypto");
const password = "moleculer";

module.exports = {
    name: "aes",
    actions: {
        encrypt(ctx) {
            const encrypt = crypto.createCipher("aes-256-ctr", password);
            return ctx.params.pipe(encrypt);
        },

        decrypt(ctx) {
            const decrypt = crypto.createDecipher("aes-256-ctr", password);
            return ctx.params.pipe(decrypt);
        }
    }
};

action visibility

  • visibility:该属性控制 action service 是否可见、可调用
    • published:公共的 action,它可以在本地调用,也可以远程调用,并且可以通过API网关发布
    • pulic:公共的 action ,可以在本地或者远程调用,但不能通过APIGW发布
    • protected:只能在本地 action service 调用(从本地服务调用)
    • private:只能在内部调用(通过 this.actions.xy() 内部服务)
    • 不设置,默认是 null,也就是 published,公共的
module.exports = {
    name: "posts",
    actions: {
        // It's published by default
        find(ctx) {},
        clean: {
            // Callable only via `this.actions.clean`
            visibility: "private",
            handler(ctx) {}
        }
    },
    methods: {
        cleanEntities() {
            // Call the action directly
            return this.actions.clean();
        }
    }
}

action hooks

  • 定义 action 钩子来包装来自混合器的某些 action
  • 有 before、after、error 钩子,将其分配给指定的 action 或者所有 action service (*)
  • 钩子可以是函数,也可以是字符串。字符串必须是本地服务方法名。
const DbService = require("moleculer-db");
// before hook
module.exports = {
    name: "posts",
    mixins: [DbService]
    hooks: {
        before: {
            // Define a global hook for all actions
            // The hook will call the `resolveLoggedUser` method.
            "*": "resolveLoggedUser",

            // Define multiple hooks
            remove: [
                function isAuthenticated(ctx) {
                    if (!ctx.user)
                        throw new Error("Forbidden");
                },
                function isOwner(ctx) {
                    if (!this.checkOwner(ctx.params.id, ctx.user.id))
                        throw new Error("Only owner can remove it.");
                }
            ]
        }
    },

    methods: {
        async resolveLoggedUser(ctx) {
            if (ctx.meta.user)
                ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });
        }
    }
}
const DbService = require("moleculer-db");
// after hook
// error hook
module.exports = {
    name: "users",
    mixins: [DbService]
    hooks: {
        after: {
            // Define a global hook for all actions to remove sensitive data
            "*": function(ctx, res) {
                // Remove password
                delete res.password;

                // Please note, must return result (either the original or a new)
                return res;
            },
            get: [
                // Add a new virtual field to the entity
                async function (ctx, res) {
                    res.friends = await ctx.call("friends.count", { query: { follower: res._id }});

                    return res;
                },
                // Populate the `referrer` field
                async function (ctx, res) {
                    if (res.referrer)
                        res.referrer = await ctx.call("users.get", { id: res._id });

                    return res;
                }
            ]
        },
        error: {
            // Global error handler
            "*": function(ctx, err) {
                this.logger.error(`Error occurred when '${ctx.action.name}' action was called`, err);

                // Throw further the error
                throw err;
            }
        }
    }
};
  • 推荐的用例是创建混合元素,用方法填充服务,并在钩子中设置方法名
module.exports = {
    methods: {
        checkIsAuthenticated(ctx) {
            if (!ctx.meta.user)
                throw new Error("Unauthenticated");
        },
        checkUserRole(ctx) {
            if (ctx.action.role && ctx.meta.user.role != ctx.action.role)
                throw new Error("Forbidden");
        },
        checkOwner(ctx) {
            // Check the owner of entity
        }
    }
}
// Use mixin methods in hooks
const MyAuthMixin = require("./my.mixin");

module.exports = {
    name: "posts",
    mixins: [MyAuthMixin]
    hooks: {
        before: {
            "*": ["checkIsAuthenticated"],
            create: ["checkUserRole"],
            update: ["checkUserRole", "checkOwner"],
            remove: ["checkUserRole", "checkOwner"]
        }
    },

    actions: {
        find: {
            // No required role
            handler(ctx) {}
        },
        create: {
            role: "admin",
            handler(ctx) {}
        },
        update: {
            role: "user",
            handler(ctx) {}
        }
    }
};

context

  • 当你去 call 一个 action service,broker 就会创建一个上下文 context 实例,这个实例包含着所有的请求信息,最后这些信息都会被当做 action service 中 handler 的一个参数 ctx 进行传递使用
  • 在 handler 中可以点出的上下文信息(属性或者方法)
    • ctx.id:context id
    • ctx.broker:broker 对象实例
    • ctx.action:action 定义实例
    • ctx.nodeID:caller 或者 目标节点 id
    • ctx.requestID:请求ID,如果在 nested-calls 中使用,它将是相同的ID。
    • ctx.parentID:父亲上下文实例 id(在 nested-calls 中使用)
    • ctx.params:请求参数,也就是 broker.call 中第二个参数具体设置
    • ctx.meta:请求元数据,它将会传递到 nested-calls 中
    • ctx.level:请求等级(在 nested-calls 内部使用),第一层等级为1
    • ctx.call():在 nested-calls 中触发 action service,参数形式与 broker.call 一样
    • ctx.emit():emit an event,same as broker.emit
    • ctx.broadcast():Broadcast an event, same as broker.broadcast
  • 优雅地关闭服务,请在代理选项中启用上下文跟踪功能。如果启用它,所有服务都将在关闭之前等待所有正在运行的上下文
    • 一个超时值可以通过关闭Timeout Broker选项来定义。默认值为5秒
    • 在 action services 中,关闭超时设置可以通过 $Shupdown Timeout 属性重写
const broker = new ServiceBroker({
    nodeID: "node-1",
    tracking: {
        enabled: true,
        shutdownTimeout: 10 * 1000
    }
});
broker.call("posts.find", {}, { tracking: false }) // 关闭追踪

event

  • Broker 有一个内置的事件总线来支持事件驱动体系结构,并将事件发送到本地和远程服务
  • 事件侦听器被排列成逻辑组,这意味着每个组中只触发一个侦听器
  • 例如你有两个主要服务 users、payments,这两个服务都订阅了 user.created 事件。此时,从 users 服务上注册 3 个具体实例,同时从 paymengs 服务上注册 2 个具体实例,当 emit 触发 user.created 事件,只有一个 user 和一个 payments 服务会被触发,效果如下

  • 组名来自服务名称,但可以在服务中的事件定义中覆盖它。
module.exports = {
    name: "payment",
    events: {
        "order.created": {
            // Register handler to the "other" group instead of "payment" group.
            group: "other",
            handler(payload) {
                // ...
            }
        }
    }
}

Emit balanced events

  • broker.emit 函数发送平衡的事件,第一个参数是事件的名称,第二个参数是传递的载荷,如果是复杂数据,可以传递一个对象
// The `user` will be serialized to transportation.
broker.emit("user.created", user);
  • 指定哪些组/服务接收事件
// Only the `mail` & `payments` services receives it
broker.emit("user.created", user, ["mail", "payments"]);

Broadcast event

  • 广播事件被发送到所有可用的本地和远程服务,它是不平衡的,所有服务实例都会收到它

  • 利用 broker.broadcast 发送广播
broker.broadcast("config.changed", config);
  • 指定哪些组/服务接收事件
// Send to all "mail" service instances
broker.broadcast("user.created", { user }, "mail");

// Send to all "user" & "purchase" service instances.
broker.broadcast("user.created", { user }, ["user", "purchase"]);

Local broadcast event

  • Send broadcast events to only all local services with broker.broadcastLocal method
broker.broadcastLocal("config.changed", config);

Subscribe to events

  • 通过 service 中的属性 event 可以订阅具体事件,在事件名称中可以使用通配符
module.exports = {
    events: {
        // Subscribe to `user.created` event
        "user.created"(user) {
            console.log("User created:", user);
        },

        // Subscribe to all `user` events
        "user.*"(user) {
            console.log("User event:", user);
        }

        // Subscribe to all internal events
        "$**"(payload, sender, event) {
            console.log(`Event '${event}' received from ${sender} node:`, payload);
        }
    }
}

Internal events

  • broker broadcasts 广播内部事件,这些事件总是以$前缀开头
  • $services.changed
    • 如果本地节点或远程节点加载或破坏服务,代理将发送此事件
  • $circuit-breaker.opened
    • The broker sends this event when the circuit breaker module change its state to open
  • $circuit-breaker.half-opened
    • The broker sends this event when the circuit breaker module change its state to half-open.
  • $circuit-breaker.closed
    • The broker sends this event when the circuit breaker module change its state to closed.
  • $node.connected
    • The broker sends this event when a node connected or reconnected.
  • $node.updated
    • The broker sends this event when it has received an INFO message from a node, (i.e. a service is loaded or destroyed).
  • $node.disconnected
    • The broker sends this event when a node disconnected (gracefully or unexpectedly).
  • $broker.started
    • The broker sends this event once broker.start() is called and all local services are started.
  • $broker.stopped
    • The broker sends this event once broker.stop() is called and all local services are stopped.
  • $transporter.connected
    • The transporter sends this event once the transporter is connected.
  • $transporter.disconnected
    • The transporter sends this event once the transporter is disconnected.

lifecycle

Broker lifecycle

  • starting logic
    • broker 启动传输连接,但是不会将本地服务列表发送到远程节点
    • 完成后,broker 将启动所有服务(call service started handler)
    • 一旦所有服务启动成功,broker 就会将本地服务列表发布到远程节点上
    • 因此,远程节点只有在所有本地服务正确启动之后才能发送请求

  • avoid deadlocks
    • broker start...
    • user service has dependencies: ["post"]
    • posts service has dependencies: ["users"]
    • 这就死锁了,按照顺序加载,user 永元无法加载到依赖项 post
  • stopping logic
    • call broker.stop 或者停止进程
    • 首先,broker 会向远程节点发送一个空的服务列表,所以他们可以将请求路由到其他实例而不是停止服务
    • 之后,broker 开始停止所有本地服务,之后 transporter 断开连接

Service lifecycle

  • created event handler
    • broker.createService or broker.loadService 会触发此事件
    • 函数内部拿到 broker 实例(this),还可以创建其他模块实例,例如 http 服务器、数据库模块
const http = require("http");

module.exports = {
    name: "www",
    created() {
        // Create HTTP server
        this.server = http.createServer(this.httpHandler);
    }
};
// created function is sync event handler,can not use async/await
  • started event handler
    • 它被触发的时候,代理会启动所有的本地服务,而 broker 会启动所有的本地服务。使用它连接到数据库,侦听服务器…等
module.exports = {
    name: "users",
    async started() {
        try {
            await this.db.connect();
        } catch(e) {
            throw new MoleculerServerError("Unable to connect to database.", e.message);
        }
    }
};
// started function is async handler. you can use async/await
  • stopped event handler
    • 它被触发的时候,broker.stop 被调用和 broker 开始停止所有的本地服务。使用它关闭数据库连接,关闭套接字…等
module.exports = {
    name: "users",
    async stopped() {
        try {
            await this.db.disconnect();
        } catch(e) {
            this.logger.warn("Unable to stop database connection gracefully.", e);
        }
    }
};
// stopped function is async handler. you can use async/await

logging

  • 在Moleculer框架中,所有核心模块都有一个自定义记录器实例。它们是从Broker记录器实例继承的,该实例可以在Broker选项中进行配置。

Built-in logger

  • Moleculer有一个内置控制台记录器。这是默认的记录器
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    nodeID: "node-100",
    // logger: true,
    logLevel: "info"
});

broker.createService({
    name: "posts",
    actions: {
        get(ctx) {
            this.logger.info("Log message via Service logger");
        }
    }
});

broker.start()
    .then(() => broker.call("posts.get"))
    .then(() => broker.logger.info("Log message via Broker logger"));
[2018-06-26T11:38:06.728Z] INFO  node-100/POSTS: Log message via Service logger
[2018-06-26T11:38:06.728Z] INFO  node-100/BROKER: Log message via Broker logger
[2018-06-26T11:38:06.730Z] INFO  node-100/BROKER: ServiceBroker is stopped. Good bye.
  • 可以使用Broker选项中的logLevel选项更改日志级别。只与内置控制台记录器一起使用
const broker = new ServiceBroker({
    logger: true, // the `true` is same as `console`
    logLevel: "warn" // only logs the 'warn' & 'error' entries to the console
});
  • Available log levels: fatalerrorwarninfodebugtrace
  • 可以为每个Moleculer模块设置日志级别。允许通配符使用
const broker = new ServiceBroker({
    logLevel: {
        "MY.**": false,         // Disable log
        "TRANS": "warn",        // Only 'warn ' and 'error' log entries
        "*.GREETER": "debug",   // All log entries
        "**": "info",           // All other modules use this level
    }
});
// 此设置是从上到下计算的,因此*级别必须是最后一项。
  • 有一些内置的日志格式化程序
    • default:[2018-06-26T13:36:05.761Z] INFO node-100/BROKER: Message
    • simple:INFO - Message
    • short:[13:36:30.968Z] INFO BROKER: Message
  • 可以为内置控制台记录器设置自定义日志格式化程序函数
const broker = new ServiceBroker({ 
    logFormatter(level, args, bindings) {
        return level.toUpperCase() + " " + bindings.nodeID + ": " + args.join(" ");
    }
});
broker.logger.warn("Warn message");
broker.logger.error("Error message");
WARN dev-pc: Warn message
ERROR dev-pc: Error message
  • 自定义对象&数组打印格式化程序

    • 设置一个自定义格式化程序函数来打印对象和数组。默认函数将对象和数组打印到一行,以便便于使用外部日志工具进行处理。但是,当您正在开发时,将对象打印成人类可读的多行格式将是有用的。为此,在代理选项中覆盖logObjectPrint函数。
const util = require("util");

const broker = new ServiceBroker({  
    logObjectPrinter: o => util.inspect(o, { depth: 4, breakLength: 100 })
});
broker.logger.warn(process.release);
[2017-08-18T12:37:25.720Z] INFO  dev-pc/BROKER: { name: 'node',
  lts: 'Carbon',
  sourceUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0.tar.gz',
  headersUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0-headers.tar.gz' }

External loggers

  • 外部记录器可以与Moleculer一起使用。在这种情况下,将创建者函数设置为LOGER。当一个新模块继承一个新的记录器实例时,ServiceBroker将调用它
// pino
const pino = require("pino")({ level: "info" });
const broker = new ServiceBroker({ 
    logger: bindings => pino.child(bindings)
});
// bunyan
const bunyan = require("bunyan");
const logger = bunyan.createLogger({ name: "moleculer", level: "info" });
const broker = new ServiceBroker({ 
    logger: bindings => logger.child(bindings)
});

middlewares

networking

要通信其他节点(ServiceBrokers),您需要配置一个传输程序。大多数传输者连接到中心消息代理服务器,该服务器负责节点之间的消息传输。这些消息代理主要支持发布/订阅消息传递模式

Transporters

如果要在多个节点上运行服务,传输程序是一个重要的模块。传送器与其他节点通信。它传输事件、调用请求和处理响应… 如果一个服务在不同节点上的多个实例上运行,则请求将在活动节点之间实现负载平衡

整个通信逻辑是在传输类之外的。这意味着在不改变代码行的情况下,在传送器之间切换是很容易的。

Moleculer框架中有几个内置的运输机。

NATS

NATS服务器是一个简单、高性能的开源消息传递系统,用于云本机应用程序、物联网消息传递和微服务体系结构。

let { ServiceBroker } = require("moleculer");

const broker = new ServiceBroker({
    nodeID: "server-1",
    transporter: "nats://nats.server:4222"
});

使用 nats 传输需要安装 nats 模块 npm install nats

// Connect to 'nats://localhost:4222'
const broker = new ServiceBroker({
    transporter: "NATS"
});

// Connect to a remote NATS server
const broker = new ServiceBroker({
    transporter: "nats://nats-server:4222"
});

// Connect with options
const broker = new ServiceBroker({
    transporter: {
        type: "NATS",
        options: {
            url: "nats://localhost:4222"
            user: "admin",
            pass: "1234"
        }
    }
});

// Connect with TLS
const broker = new ServiceBroker({
    transporter: {
        type: "NATS",
        options: {
            url: "nats://localhost:4222"
            // More info: https://github.com/nats-io/node-nats#tls
            tls: {
                key: fs.readFileSync('./client-key.pem'),
                cert: fs.readFileSync('./client-cert.pem'),
                ca: [ fs.readFileSync('./ca.pem') ]
            }
        }
    }
});

Serialization

传输程序需要一个序列化模块来序列化和反序列化传输的数据包。默认的串行化程序是JSONS序列化程序,但是有几个内置的串行化程序。

const { ServiceBroker } = require("moleculer");

const broker = new ServiceBroker({
    nodeID: "server-1",
    transporter: "NATS",
    serializer: "ProtoBuf"
});
  • JSON serializer:这是内置的默认序列化程序。它将数据包序列化为JSON字符串,并将接收到的数据反序列化为数据包。
const broker = new ServiceBroker({
    // serializer: "JSON" // don't need to set, because it is the default
});

Load balancing

Built-in strategies

若要配置策略,请在注册表属性下设置策略代理选项。它可以是一个名称(在内置策略的情况下),也可以是一个策略类(在自定义策略的情况下)。

Random strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "Random"
    }
});

RoundRobin strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "RoundRobin"
    }
});

CPU usage-based strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "CpuUsage"
    }
});

Fault tolerance

Circuit Breaker

  • Moleculer有一个内置的断路器解决方案.这是一个基于阈值的实现。它使用一个时间窗口来检查失败的请求率。一旦达到阈值,它就会触发断路器。
  • 电路断路器可以防止应用程序重复尝试执行可能失败的操作。允许它继续,而不等待故障被修复或浪费CPU周期,而它确定故障是长期的。断路器模式还允许应用程序检测故障是否已经解决。如果问题似乎已经解决,应用程序可以尝试调用操作。
  • 如果启用它,所有服务调用都将受到此内置断路器的保护。
  • 在代理选项中启用它
const broker = new ServiceBroker({
    circuitBreaker: {
        enabled: true,
        threshold: 0.5,
        minRequestCount: 20,
        windowTime: 60, // in seconds
        halfOpenTime: 5 * 1000, // in milliseconds
        check: err => err && err.code >= 500
    }
});
  • settings
    • enabled:是否启动此功能,默认是 false
    • threshold:阈值,默认0.5,意味着50%的跳闸失败
    • minRequestCount:最小请求数,默认20,在它下面,回调函数不会触发
    • windowTime:时间窗口的秒数,默认60秒
    • halfOpenTime:从打开状态切换到半打开状态的毫秒数,默认10000毫秒
    • check:检查失败请求的函数,默认 err && err.code >= 500
  • 如果断路器状态发生更改,ServiceBroker将发送内部事件
  • 这些全局选项也可以在操作定义中重写。
// users.service.js
module.export = {
    name: "users",
    actions: {
        create: {
            circuitBreaker: {
                // All CB options can be overwritten from broker options.
                threshold: 0.3,
                windowTime: 30
            },
            handler(ctx) {}
        }
    }
};

Retry

  • 重试解决方案
const broker = new ServiceBroker({
    retryPolicy: {
        enabled: true,
        retries: 5,
        delay: 100,
        maxDelay: 2000,
        factor: 2,
        check: err => err && !!err.retryable
    }
});
  • settings
    • enabled:是否启用,默认 false
    • retries:重试的次数,默认5次
    • delay:第一次延迟以毫秒为单位,默认 100
    • maxDelay:最大延迟(以毫秒为单位),默认 2000
    • factor:延迟退避系数,默认是 2,表示指数退避
    • check:检查失败请求的函数,err && !!err.retryable
  • 在调用选项中覆盖retry值
broker.call("posts.find", {}, { retries: 3 });
  • 在操作定义中覆盖重试策略值
// users.service.js
module.export = {
    name: "users",
    actions: {
        find: {
            retryPolicy: {
                // All Retry policy options can be overwritten from broker options.
                retries: 3,
                delay: 500
            },
            handler(ctx) {}
        },
        create: {
            retryPolicy: {
                // Disable retries for this action
                enabled: false
            },
            handler(ctx) {}
        }
    }
};

Timeout

  • 可以为服务调用设置超时。它可以在代理选项或调用选项中全局设置。如果定义了超时并且请求超时,代理将抛出RequestTimeoutError错误。
const broker = new ServiceBroker({
    requestTimeout: 5 * 1000 // in seconds
});
  • 覆盖调用选项中的超时值
broker.call("posts.find", {}, { timeout: 3000 });
  • 分布式超时:Moleculer使用分布式超时。在嵌套调用的情况下,超时值会随着时间的推移而递减。如果超时值小于或等于0,则跳过下一个嵌套调用(RequestSkippedError),因为第一个调用已被RequestTimeoutError错误拒绝。

Bulkhead

  • 在Moleculer框架中实现了舱壁特性,以控制动作的并发请求处理。
const broker = new ServiceBroker({
    bulkhead: {
        enabled: true,
        concurrency: 3,
        maxQueueSize: 10,
    }
});
  • settings

    • enabled:是否启动,默认 false

    • concurreny:最大限度的并行数量,默认3

    • maxQueueSize:最大队列大小,默认10

    • concurreny 值限制并发请求执行

    • 如果 maxQueueSize大于0,则如果所有插槽都被占用,则 broker 将额外的请求存储在队列中

    • 如果队列大小达到maxQueueSize限制或为0,则 Broker 将对每个添加请求抛出QueueIsFull异常

  • 这些全局选项也可以在操作定义中重写

// users.service.js
// 在操作定义中覆盖重试策略值
module.export = {
    name: "users",
    actions: {
        find: {
            bulkhead: {
                enabled: false
            },
            handler(ctx) {}
        },
        create: {
            bulkhead: {
                // Increment the concurrency value
                // for this action
                concurrency: 10
            },
            handler(ctx) {}
        }
    }
};

Fallback

  • 当您不想将错误返回给用户时,回退功能是非常有用的。相反,调用其他操作或返回一些常见的内容。可以在调用选项或操作定义中设置回退响应。
  • 它应该是一个返回包含任何内容的承诺的函数。borker 将当前 context&Error对象作为参数传递给此函数。
// fallback settings in calling options 
const result = await broker.call("users.recommendation", { userID: 5 }, {
    timeout: 500,
    fallbackResponse(ctx, err) {
        // Return a common response from cache
        return broker.cacher.get("users.fallbackRecommendation:" + ctx.params.userID);
    }
});
  • 回退响应也可以在接收端,在 action 中定义
  • 请注意,只有在action 处理程序中发生错误时,才会使用此回退响应。如果从远程节点调用请求,并且请求在远程节点上超时,则不使用回退响应。在这种情况下,在调用选项中使用回退响应。
// fallback as a function
module.exports = {
    name: "recommends",
    actions: {
        add: {
            fallback: (ctx, err) => "Some cached result",
            //fallback: "fakeResult",
            handler(ctx) {
                // Do something
            }
        }
    }
};
// fallback as method name
module.exports = {
    name: "recommends",
    actions: {
        add: {
            // Call the 'getCachedResult' method when error occurred
            fallback: "getCachedResult",
            handler(ctx) {
                // Do something
            }
        }
    },

    methods: {
        getCachedResult(ctx, err) {
            return "Some cached result";
        }
    }
};

NATS

帮助文档

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。

暂无文章

基于反射实现DBUtils封装(读取数据库数据生成对象或对象集合以及对数据库的CRUD)version2.0

DBUtils version2.0 附带jdbc.properties配置文件 支持操作: 1.加载驱动 2.获取数据库连接对象 3.关闭资源 4.封装通用的更新操作:INSERT UPDATE DELETE 5.封装通用查询单条数据的方法 (JDB...

osc_dh3qbz0a
15分钟前
9
0
标准驼峰命名转数据库字段

碰到一个这样的场景,数据库字段bill_no 代表单据编号,然后返回前端json 是billNo,严格按照驼峰命名法,现在前端需要自定义按照箭头进行排序,但是并不知道数据库字段,所以前端只能给你"...

osc_kvlhvh2u
16分钟前
0
0
突然的立秋

前几天在某app上面耍到说“8月7号就立秋了,等我们再见面就该穿长袖了,不,我们应该不会再见到了”。 就很突然了,今天立秋了。 秋天到了,和夏天的人和事好好道个别吧。 还记得以前,每年的...

osc_z3ivsxnp
18分钟前
8
0
第一届华数杯A题思路分析

** 华数杯a题浅见 需要本文的话请加2574364134 ** 当我刚拿到这个题目的时候,惊呆了,这个不就是2018年国赛的A题吗?2018年的国赛A题是为了进行高温防护,这道题现在就是低温防护服御寒,所...

osc_zken4nb1
19分钟前
0
0
想象自己在前方等自己-纯内心戏

以下为一年级某个时刻的痛苦挣扎,就是个经历而已,记录经历。 论文的初初稿终于在昨天发给了老师。客观的讲我写的真的很差,很多时候感觉自己写不下去了,很多放弃的念头不是一闪而过,而是...

osc_b67rw1ne
21分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部