文档章节

elixir官方教程Mix与OTP(七) 任务与gen_tcp

ljzn
 ljzn
发布于 2016/08/11 17:01
字数 2270
阅读 47
收藏 0
点赞 0
评论 0

#任务与gen_tcp

  1. 回显服务器
  2. 任务
  3. 任务主管

本章,我们将学习如何使用Erlang的:gen_tcp模块来处理请求.这是一个探索Elixir的Task模块的好机会.在后面的章节我们将扩展我们的服务器,让它能够执行命令.

#回显服务器(Echo server)

我们将实现一个回显服务器,作为我们的TCP服务器的开始.它会简单地发送一个回复,内容就是在请求中收到的文本.我们将缓慢地升级服务器,直到它被监督并准备好处理多重连接.

一个TCP服务器,概括地说,会有如下表现:

  1. 监听一个端口,直到端口可用,并获得套接字
  2. 在那个端口等待客户端连接并接受它
  3. 读取客户端请求并进行回复

让我们来实现这些步骤.进入apps/kv_server应用,打开lib/kv_server.ex,并添加下列函数:

require Logger

def accept(port) do
  # 下列选项的意思是:
  #
  # 1. `:binary` - 以二进制数接受数据 (而非列表)
  # 2. `packet: :line` - 逐行接收数据
  # 3. `active: false` - 阻塞在 `:gen_tcp.recv/2` 直到数据可用
  # 4. `reuseaddr: true` - 允许我们重用数据当监听器崩溃时
  {:ok, socket} = :gen_tcp.listen(port,
                    [:binary, packet: :line, active: false, reuseaddr: true])
  Logger.info "Accepting connections on port #{port}"
  loop_acceptor(socket)
end

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

defp serve(socket) do
  socket
  |> read_line()
  |> write_line(socket)

  serve(socket)
end

defp read_line(socket) do
  {:ok, data} = :gen_tcp.recv(socket, 0)
  data
end

defp write_line(line, socket) do
  :gen_tcp.send(socket, line)
end

我们将通过调用KVServer.accept(4040)来开启我们的服务器,这里的4040是端口.accept/1的第一步就是监听端口,直到套接字可用,然后调用loop_acceptor/1.loop_acceptor/1只是一个接收客户端连接的循环.对于每个被接收的连接,我们调用serve/1.

serve/1是另一个循环,它从套接字中读取一行并将那些行写回套接字中.注意serve/1函数使用了管道操作符|>来表示这个操作流.管道操作符的左边会被执行,并将结果作为右边函数的第一个参数.上面的例子:

socket |> read_line() |> write_line(socket)

等同于:

write_line(read_line(socket), socket)

read_line/1通过:gen_tcp.recv/2实现了从套接字中接收数据,而write_line/2使用:gen_tcp.send/2向套接字中写入.

这些就是我们实现回显服务器所需的一切.让我们来试一试!

通过iex -S mixkv_server应用中运行一个IEx会话.在IEx中,运行:

iex> KVServer.accept(4040)

现在服务器已经运行了,你会发现控制台阻塞了.让我们使用一个telnet客户端来访问我们的服务器.这些客户端在大多数操作系统中都是可用的,它们的命令行也是类似的:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

输入"hello",敲击回车,将返回"hello".完美!

我的telnet客户端可以通过输入ctrl + ],输入quit,再敲击<Enter>来退出,但你的客户端可能步骤会有不同.

一旦你退出了telnet客户端,你会在IEx会话中看到一个错误:

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:41: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:33: KVServer.serve/1
    (kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1

这是因为我们期望从:gen_tcp.recv/2中获得数据,但客户端关闭了连接.我们将在之后对服务器的修改中更好地处理这种情形.

现在,这里有一个更加重要的bug需要我们去修复:当我们的TCP接收器崩溃时会发生什么?因为这里没有监督,所以服务器死亡后,我们不能再处理更多请求,因为它不会重启.这就是为什么我们必须将服务器移动到一个监督树上.

#任务(Tasks)

我们已经学习了代理,通用服务器,以及主管.它们都需要处理多重的消息或消息状态.但是当我们只需要执行一些任务时应该使用什么呢?

Task模块提供了这个功能.例如,它有一个start_link/3函数用于接收模块,函数和参数,允许我们运行一个给定的函数作为监督树的一部分.

来试一下.打开lib/kv_server.ex,将start/2函数中的主管修改成:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

通过这个改动,我们将KVServer.accept(4040)作为一个工人来运行.我们一直在编写端口,而等一下我们将会讨论它可以被如何改动.

现在服务器是监督树的一部分了,它会在我们运行应用时自动启动.在终端中输入mix run --no-halt,并再次使用telnet客户端来确认一切正常:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

正常!如果你杀死客户端,导致整个服务器崩溃,你会看到另一个立刻启动了.然而,它是规模化(scale)的吗?

试着同时连接两个telnet客户端.当你这么做时,你会发现,第二个客户端没有回声:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

它似乎完全不能工作.原因是我们接受连接和处理请求是在同一个进程中.当一个客户端连接上,我们就不能再接受其它客户端了.

#任务主管(Task Supervisor)

为了使我们的服务器能够处理同时连接,我们需要有一个接收器进程,它能够生成其它进程来处理请求.一个方法要修改一下:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

要使用Task.start_link/1,它类似于Task.start_link/3,但是它接收的是一个匿名函数而非模块,函数和参数:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

我们直接在接收器进程中启动了一个链接任务.但我们已经犯过这种错误了.你还记得吗?

这类似于我们从注册表中直接调用KV.Bucket.start_link/0时所犯的错误.那意味着任何桶的失败都会导致整个注册表崩溃.

上述代码也有同样的缺陷:如果我们将serve(client)任务链接到接收器,一个处理请求时的崩溃将会传递给接收器,从而导致其它所有连接失败.

我们为注册表修正了这个问题,通过使用一个简单地一对一主管.我们在此将如法炮制,由于这个模式太常用于任务了,Task已经提供了一个方案:在我们的监督树上,可以使用一个简单的一对一主管,附带临时的工人.

让我们再次修改start/2,添加一个主管到我们的树上:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

我们简单地启动了一个名为KVSever.TaskSupervisorTask.Supervisor进程.记住,因为接受其任务依赖于该主管,所以主管必须先启动.

现在我们只需要修改loop_acceptor/1来使用Task.Supervisor处理每个请求:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid)
  loop_acceptor(socket)
end

你可能注意到我们添加了一行,:ok = :gen_tcp.controlling_process(client, pid).这使得子进程成为了client套接字的"控制进程".如果不这样做的话,一旦接收器崩溃,它就会关闭所有客户端,因为套接字默认被绑定到了那个接收它们的进程上.

通过mix run --no-halt启动一个新的服务器,现在我们可以打开许多并发的telnet客户端了.你也会发现退出一个客户端不会导致接收器崩溃了.优秀!

这里是完整的回显服务器实现,在单独的模块中:

defmodule KVServer do
  use Application
  require Logger

  @doc false
  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
      worker(Task, [KVServer, :accept, [4040]])
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

  @doc """
  Starts accepting connections on the given `port`.
  """
  def accept(port) do
    {:ok, socket} = :gen_tcp.listen(port,
                      [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

由于我们改变了主管的规则,我们不禁要问:我们的监督策略还正确吗?

在这种情况下,是正确的:如果接收器崩溃,不需要关闭现存的连接.换句话说,如果任务主管崩溃,也不需要关闭接收器.

下一章我们将开始解析客户端请求和发送回复,完成我们的服务器.

© 著作权归作者所有

共有 人打赏支持
ljzn
粉丝 29
博文 69
码字总数 96245
作品 0
南平
程序员
elixir官方教程Mix与OTP(一) Mix入门

Mix入门 我们的第一个项目 编辑项目 执行测试 环境 探索 在本教程中,我们将学习如何构建一个完整的Elixir应用,包括监督树,配置,测试等等. 这个应用的功能是分布式键值仓库.我们将把键值对安排...

ljzn ⋅ 2016/08/07 ⋅ 2

elixir官方入门教程 学习资料

下一步该去哪 构建你的第一个Elixir项目 元编程 社区与其它资源 Erlang基础 想要学习更多?继续阅读! 构建你的第一个Elixir项目 为了开始你的第一个项目,Elixir装载了一个叫做Mix的构建工具....

ljzn ⋅ 2016/08/06 ⋅ 0

elixir官方教程Mix与OTP(九) 分布式任务与配置

分布式任务与配置 我们的第一个分布式代码 同步/等待 分布式任务 路由层 测试过滤器与标签 应用环境与配置 总结 本章,我们将回到应用并添加一个路由层,它能让我们根据桶名来在节点之间分布请...

ljzn ⋅ 2016/08/12 ⋅ 0

elixir官方入门教程 介绍

介绍 安装 交互模式 运行脚本 提出疑问 欢迎! 在本教程中我们将教给你Elixir的基础,语法,如何定义模块,如何操作常用数据结构的特性等等.本章将确保Elixir安装好了,并且你能够成功运行Elixir的...

ljzn ⋅ 2016/08/06 ⋅ 0

elixir官方教程Mix与OTP(八) 文档,测试与with

文档,测试与with 文档测试 with 运行命令 本章,我们将实现能够解析我们在第一章中描述的命令的代码: 解析完成后,我们将更新我们的服务器来调遣解析后的命令到应用中. 文档测试(Doctests) 在语...

ljzn ⋅ 2016/08/11 ⋅ 0

(译)循序渐进学习Elixir

——Chris Bell Learning Elixir at Made by Many 在今年奥兰多的Elixir大会上演讲之后,就经常有人问我:你们团队是如何学习Elixir和OTP的? 和学习其他语言一样,学习Elixir也需要付出时间...

ljzn ⋅ 2016/09/29 ⋅ 0

elixir官方入门教程 进程

进程 和 链接 任务 状态 在Elixir中,所有代码都运行在进程内。进程相互独立,并发地运行,通过传送信息来交流。进程不是Elixir中唯一的并发基础,但它意味着能够构建分布式的,可容错的程序...

ljzn ⋅ 2016/08/04 ⋅ 0

Elixir v1.2.0-rc.1 发布,函数式编程语言

Elixir v1.2.0-rc.1 发布,Elixir v1.2 依赖于 Erlang 18 的大量特性,至少要求 Erlang 18+ 版本。 此版本更新内容如下: 改进 [Mix] Raise readable error message when parsertools is not...

oschina ⋅ 2015/12/31 ⋅ 2

elixir官方教程Mix与OTP(六) 依赖和伞形项目

依赖和雨伞项目 外部依赖 内部依赖 雨伞项目 伞内依赖 总结 本章,我们将讨论如何在Mix中管理依赖. 我们的应用已经完成,所以是时候实现能够处理我们在第一章中定义的请求的服务器了: 然而,我们...

ljzn ⋅ 2016/08/11 ⋅ 0

Phoenix官方教程 (一) 构建和运行

首个教程的目的在于尽可能快地让一个Phoenix应用构建好并运行起来. 在我们开始前,请花一分钟阅读Installation Guide.安装好了所有必要的依赖,我们才能让应用流畅地构建和运行. 所以,我们需要...

ljzn ⋅ 2016/08/15 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

vbs 取文件大小 字节

dim namedim fs, s'name = Inputbox("姓名")'msgbox(name)set fs = wscript.createobject("scripting.filesystemobject") 'fs为FSO实例if (fs.folderexists("c:\temp"))......

vga ⋅ 9分钟前 ⋅ 0

高并发之Nginx的限流

首先Nginx的版本号有要求,最低为1.11.5 如果低于这个版本,在Nginx的配置中 upstream web_app { server 到达Ip1:端口 max_conns=10; server 到达Ip2:端口 max_conns=10; } server { listen ...

算法之名 ⋅ 今天 ⋅ 0

Spring | IOC AOP 注解 简单使用

写在前面的话 很久没更新笔记了,有人会抱怨:小冯啊,你是不是在偷懒啊,没有学习了。老哥,真的冤枉:我觉得我自己很菜,还在努力学习呢,正在学习Vue.js做管理系统呢。即便这样,我还是不...

Wenyi_Feng ⋅ 今天 ⋅ 0

博客迁移到 https://www.jianshu.com/u/aa501451a235

博客迁移到 https://www.jianshu.com/u/aa501451a235 本博客不再更新

为为02 ⋅ 今天 ⋅ 0

win10怎么彻底关闭自动更新

win10自带的更新每天都很多,每一次下载都要占用大量网络,而且安装要等得时间也蛮久的。 工具/原料 Win10 方法/步骤 单击左下角开始菜单点击设置图标进入设置界面 在设置窗口中输入“服务”...

阿K1225 ⋅ 今天 ⋅ 0

Elasticsearch 6.3.0 SQL功能使用案例分享

The best elasticsearch highlevel java rest api-----bboss Elasticsearch 6.3.0 官方新推出的SQL检索插件非常不错,本文一个实际案例来介绍其使用方法。 1.代码中的sql检索 @Testpu...

bboss ⋅ 今天 ⋅ 0

informix数据库在linux中的安装以及用java/c/c++访问

一、安装前准备 安装JDK(略) 到IBM官网上下载informix软件:iif.12.10.FC9DE.linux-x86_64.tar放在某个大家都可以访问的目录比如:/mypkg,并解压到该目录下。 我也放到了百度云和天翼云上...

wangxuwei ⋅ 今天 ⋅ 0

PHP语言系统ZBLOG或许无法重现月光博客的闪耀历史[图]

最近在写博客,希望通过自己努力打造一个优秀的教育类主题博客,名动江湖,但是问题来了,现在写博客还有前途吗?面对强大的自媒体站点围剿,还有信心和可能型吗? 至于程序部分,我选择了P...

原创小博客 ⋅ 今天 ⋅ 0

IntelliJ IDEA 2018.1新特性

工欲善其事必先利其器,如果有一款IDE可以让你更高效地专注于开发以及源码阅读,为什么不试一试? 本文转载自:netty技术内幕 3月27日,jetbrains正式发布期待已久的IntelliJ IDEA 2018.1,再...

Romane ⋅ 今天 ⋅ 0

浅谈设计模式之工厂模式

工厂模式(Factory Pattern)是 Java 中最常用的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。 在工厂模式中,我们在创建对象时不会对客户端暴露创建逻...

佛系程序猿灬 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部