套接字编程
我(作者)编写的大多数更为有趣的程序,都以一种或另一方式,地涉及到网络套接字。套接字下的编程很有趣,因为他允许应用,与互联网上的其他机器进行交互,这远比仅执行一些本地操作,更具潜力。
所谓套接字,是允许机器使用互联网协议(IP),经由互联网通信的某个信道。在本章中,我们将着重于互联网的两个核心协议:传输控制协议,TCP,与 用户数据报协议,UDP。
UDP 允许应用程序相互发送一些短的报文(称为 数据报,datagrams),但没有这些报文送达的保证。这些报文还会不按顺序到达。相反 TCP 则提供了一旦连接建立,就会按顺序送达的可靠字节流。以 TCP 发送数据,会比以 UDP 发送数据产生更大开销。咱们可选择可靠但较慢的信道(TCP),或较快但不可靠的信道(UDP)。
供套接字编程的主要库有两个:用于编写 TCP 应用的 gen_tcp
,以及用于编写 UDP 应用的 gen_udp
。
在本章中,我们将学习如何使用 TCP 和 UDP 套接字,编程一些客户端及服务器。我们将介绍服务器的各种可能形式(并行、顺序、阻塞与非阻塞),并了解如何编程一些可控制到应用数据流的流量整形应用。
使用 TCP
我们将从一个从服务器获取数据的简单 TCP 程序,开始我们的套接字编程之旅。这之后,我们将编写一个简单顺序 TCP 服务器,并展示其如何并行化,来处理多个并行会话。
获取服务器上的数据
我们来以编写一个名为 nano_get_url/0
,使用 TCP 套接字从 http://httpforever.com 获取 HTML 页面的小函数开始。
]).
-import(lists, [reverse/1]).
nano_get_url() ->
nano_get_url("httpforever.com").
nano_get_url(Host) ->
{ok, Socket} = gen_tcp:connect(Host, 80, [binary, {packet, 0}]), %% 1
ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"), %% 2
receive_data(Socket, []).
receive_data(Socket, SoFar) ->
receive
{tcp,Socket,Bin} -> %% 3
receive_data(Socket, [Bin|SoFar]);
这段代码会如下工作:
-
通过调用
gen_tcp:connect
,我们打开一个到 http://www.httpforever.com 上 80 端口的 TCP 套接字。该连接调用中的参数binary
,告诉系统以 “二进制” 模式打开这个套接字,并以将全部数据作为一些二进制值,投送给该应用。{packet,0}
表示 TCP 数据会以未经修改的形式,直接投送到该应用; -
我们调用了
gen_tcp:send
,并将消息GET / HTTP/1.0\r\n
发送到这个套接字。然后我们等待回复。回复不会都在一个数据包中,而是会分片到来,每次到来一点。这些分片将作为发送到打开(或控制)该套接字进程的报文序列被接收; -
我们会收到一条
{tcp,Socket,Bin}
消息。这个元组中的第三个参数,是个二进制值。这是因为我们是以二进制模式,打开的套接字。这条信息是 web 服务器发送给我们的数据分片之一。我们要将其添加到我们已收到分片的列表,并等待下一个分片; -
我们收到一条
{tcp_closed, Socket}
消息。当服务器已完成向我们发送数据时,这就会发生; -
当所有分片都到来时,我们已将他们已错误顺序存储下来,因此我们要逆转顺序,并将所有分片连接起来。
重组数据分片的代码,看起来如下:
ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"), %% 2
receive_data(Socket, []).
receive_data(Socket, SoFar) ->
receive
{tcp,Socket,Bin} -> %% 3
receive_data(Socket, [Bin|SoFar]);
那么,当分片到达时,我们只需将他们添加到 SoFar
这个列表的头部。当所有片段都到达且套接字关闭时,我们就反转这个列表,并将所有分片连接起来。
咱们可能会认为,向下面这样编写代码累积数据分片的代码会更好:
receive_data(Socket, SoFar) ->
receive
{tcp,Socket,Bin} ->
receive_data(Socket, list_to_binary([SoFar, Bin]));
{tcp_closed,Socket} ->
SoFar
end.
这段代码正确,但会比原始版本低效。原因是在后一版中,我们在持续将一个新的二进制值,追加到缓冲区末尾,这会涉及大量的数据拷贝。将所有分片累积到一个列表(这将以错误顺序结束),然后反转整个列表,并在一次操作中连接所有分片,会好的多。
我们来测试一下,我们的 HTTP 客户端是否工作。
1> B = socket_examples:nano_get_url().
<<"HTTP/1.1 200 OK\r\nServer: nginx/1.18.0 (Ubuntu)\r\nDate: Tue, 07 Oct 2025 08:42:42 GMT\r\nContent-Type: text/html\r\nConten"...>>
注意:当咱们运行 nano_get_url
时,结果是个二进制值,所以咱们将看到一个二进制值在 Erlang shell 中,美化打印时的样子。在二进制值被美化打印时,所有控制字符都会以转义格式显示。同时这个二进制值会被截断,表现为打印输出末尾处的三个点(...>>
)。当咱们想要查看这个二进制值的全部时,咱们可使用 io:format
打印,或使用 string:tokens
将其拆分为若干片段。
编写 Web 服务器
编写像是 Web 客户端或服务器之类的东西非常有趣。当然,其他人已经写过这些东西了,但当我们真的想要了解他们的原理时,深入挖掘表象,找出他们的工作原理,就非常有启发性。谁知道呢 -- 也许我们的 web 服务器,将比最好的服务器还要更好。
要构建一个 web 服务器,或者任何一个实现了标准互联网协议的软件,我们都需要用到正确的工具,并准确了解要实现哪些协议。
在我们获取 web 页面的示例代码中,我们打开了 80 端口,并发送给他一条
GET / HTTP/1.0\r\n
命令。我们使用了定义在 RFC 1945 中的 HTTP 协议。互联网服务的所有主要协议,都定义在 请求评议,RFC 中。全部 RFC 的官方网站为 ietf.org(互联网工程任务组的主页)。另一个宝贵的信息来源,是 数据包嗅探器。使用数据包嗅探器,我们可捕获并分析进出我们应用的所有 IP 数据包。大多数数据包嗅探器都包含了可解码及分析数据包中的数据,并以有意义方式呈现这些数据的软件。其中最著名也可能是最好的软件,是 Wireshark(以前称为 Ethereal),可从 wireshark.org 获取。
有了数据包嗅探器转储和相应 RFC 的加持,我们就可以编写下咱们的一个杀手级应用了。
2> io:format("~p~n", [B]).
<<"HTTP/1.1 200 OK\r\nServer: nginx/1.18.0 (Ubuntu)\r\nDate: Tue, 07 Oct 2025 09:00:22 GMT\r\nContent-Type: text/html\r\nContent-Length: 5
124\r\nLast-Modified: Wed, 22 Mar 2023 14:54:48
... several lines omitted ...
>>
3> string:tokens(binary_to_list(B), "\r\n").
["HTTP/1.1 200 OK","Server: nginx/1.18.0 (Ubuntu)",
"Date: Tue, 07 Oct 2025 09:00:22 GMT",
"Content-Type: text/html","Content-Length: 5124",
"Last-Modified: Wed, 22 Mar 2023 14:54:48 GMT",
"Connection: close","ETag: \"641b16b8-1404\"",
"Referrer-Policy: strict-origin-when-cross-origin",
"X-Content-Type-Options: nosniff",
... lines omitted ...
请注意,响应代码 302 不是个报错;其是这个命令的预期响应,即重定向到某个新地址。还请注意,这个示例展示套接字通信的原理,而并未严格遵循 HTTP 协议。
译注:译者并未使用 www.google.com 作为目标服务器,而是特意使用了 httpforever.com 这个服务器,因此相应代码为 200。
这或多或少就是 web 客户端的工作方式(重点是 或少 -- 我们将必须完成大量工作,才能在 web 浏览器中正确渲染出结果数据)。不过,前面的代码是咱们自己实验的一个好起点。咱们或许会尝试修改这段代码,获取并存储完整的某个网站,或者自动前往并阅读咱们的电子邮件。可能性是无限的。
一个简单的 TCP 服务器
在上一小节中,我们编写了个简单的客户端。现在我们来编写个服务器。
这个服务器会打开 2345 端口,然后等待一条信息。该消息是包含一个 Erlang 项的二进制值。这个项是个包含一个表达式的 Erlang 字符串。服务器会计算这个表达式,并通过将结果写入套接字,将其发送给客户端。
要编写这个程序(及实际上任何透过 TCP/IP 运行的程序),我们必须回答几个简单问题。
- 数据如何组织?我们怎样知道多少数据构成了一次请求或响应?
- 某个请求或响应中的数据,是如何编码及解码的? (编码数据有时称为 marshaling,解码数据有时称为 demarshaling。)
译注:marshaling, 又作 encoding/serialization;与之对应,demarshaling 又作 decoding/deserilization。
TCP 的套接字数据,只是无差别的字节流。在传输期间,这些数据会被分割成任意大小的分片,因此我们需要一些约定,以便清楚多少数据代表一个请求或响应。
参考:
The terms "bit," "frame," "packet," and "fragment" represent different units of data at various layers of the network model, illustrating the process of data encapsulation and potential division for transmission.
Bit: The most fundamental unit of data in computing and networking, representing a binary value of either 0 or 1. All other data units are composed of sequences of bits.
Packet: At the Network Layer (Layer 3 of the OSI model), data is referred to as a packet. An IP packet, for instance, contains the source and destination IP addresses, along with the data payload.
Frame: At the Data Link Layer (Layer 2 of the OSI model), a packet is encapsulated within a frame. A frame includes additional information relevant to the local network, such as MAC addresses for source and destination, and error checking mechanisms. An Ethernet frame is a common example.
Fragment: When a packet is too large to fit within the Maximum Transmission Unit (MTU) of a particular network link, it can be divided into smaller pieces called fragments. Each fragment is essentially a smaller packet containing a portion of the original data, along with information (like fragment offset and identification) that allows the receiving device to reassemble the original packet. Fragmentation typically occurs at the Network Layer.
在 Erlang 情形下,我们使用一个简单约定,即每个逻辑的请求或响应之前,都会冠以一个 N
(1、2 或 4)的字节长度计数。这就是 gen_tcp:connect
及 gen_tcp:listen
两个函数中,{packet, N}
这个参数的含义。这里 packet
一词,指一条应用请求或响应消息的长度,而不是在线路上所见到的物理数据包长度。请注意,客户端和服务器所使用的 packet
参数 必须 一致。当服务器是以参数 {packet,2}
打开,而客户端则以 {packet,4}
打开时,那么就无法工作。
当咱们以 {packet,N}
选项打开某个套接字后,我们就无需担心数据碎片问题。Erlang 的驱动将确保所有碎片数据消息,在传送到应用前都被重新组装成正确长度。
接下来的关注点,是数据的 编码 和 解码。我们将使用最编码与解码消息的最简单可行方式,即使用 term_to_binary
编码 Erlang 项,并使用其反义词 binary_to_term
解码数据。
请注意,客户端与服务器对话所需的打包约定和编码规则,被打包在了两行代码中。即在我们打开套接字时,使用 {packet,4}
这个选项;以及使用 term_too_binary
及其反义词,编码与解码数据。
与 HTTP 或 XML 等基于文本的方法相比,我们能打包及编码 Erlang 项的这种便利,为我们提供了显著优势。使用 Erlang 的 BIF term_too_binary
及其反义词 binary_to_term
,通常要比使用 XML 项执行同等操作,快一个数量级以上,同时还涉及发送少得多的数据。现在来看程序。首先,下面是个极其简单的服务器:
list_to_binary(reverse(SoFar)) %% 5
end.
start_nano_server() ->
{ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, %% 1
{reuseaddr, true},
{active, true}]),
{ok, Socket} = gen_tcp:accept(Listen), %% 2
gen_tcp:close(Listen), %% 3
loop(Socket).
loop(Socket) ->
receive
{tcp, Socket, Bin} ->
io:format("Server received binary = ~p~n", [Bin]),
Str = binary_to_term(Bin), %% 4
io:format("Server (unpacked) ~p~n", [Str]),
Reply = lib_misc:string2value(Str), %% 5
io:format("Server replying = ~p~n", [Reply]),
gen_tcp:send(Socket, term_to_binary(Reply)), %% 6
loop(Socket);
这段代码工作如下:
-
首先我们调用
gen_tcp:listen
,监听 2345 端口上的连接,并设置好报文打包约定。{packet,4}
表示每条应用报文,都将冠以一个长度为 4 字节长的报文头。随后gen_tcp:listen(...)
会返回{ok, Listen}
或{error, Why}
,但我们只对我们能打开一个套接字的返回情形感兴趣。因此,我们写下了下面的代码:{ok, Listen} = gen_tcp:listen(...),
当
gen_tcp:listen
返回{error, ...}
时,这会导致程序抛出一个模式匹配异常。而在成功情形下下,这条语句会将Listen
,绑定到那个新的监听套接字。对于某个监听套接字,我们只能做一件事,那就是将其用作gen_tcp:accept
的参数; -
现在我们调用
gen_tcp:accept(Listen)
。此刻,程序将暂停并等待某个连接。当我们获得一个连接时,该函数将返回绑定到一个套接字的变量Socket
,其可用于与进行连接客户端对话; -
在
accept
返回后,我们立即调用gen_tcp:close(Listen)
。这会关闭那个监听套接字,因此服务器将不再接受任何新的连接。这不会影响即有连接;其只阻止新的连接; -
我们解码输入数据(unmarshaling);
-
然后我们计算该字符串;
-
然后我们编码回复数据(marshaling),并将其发送回那个套接字。
请注意,这个程序只会接受一次请求;一旦程序运行完成,随后别的连接都将不会被接受。
这是演示如何打包和编码应用数据的最简单服务器。他会接受一次请求、计算出一个回复、发送该回复并终止。
要测试该服务器,我们需要个对应的客户端。
io:format("Server socket closed~n")
end.
nano_client_eval(Str) ->
{ok, Socket} = gen_tcp:connect("localhost", 2345,
[binary, {packet, 4}]),
ok = gen_tcp:send(Socket, term_to_binary(Str)),
receive
{tcp,Socket,Bin} ->
io:format("Client received binary = ~p~n", [Bin]),
Val = binary_to_term(Bin),
为测试我们的代码,我们将在同一台机器上,运行客户端和服务器,因此 gen_tcp:connect
函数中的主机名,被硬连线到了 localhost
。
请留意客户端中 term_too_binary
被调用来编码消息的方式,以及在服务器上 binary_too_term
被调用来重构出消息的方式。
要运行这个示例,我们需要打开两个终端窗口,并在各个窗口中启动 Erlang shell。
首先我们要启动服务器。
1> socket_examples:start_nano_server().
在服务器窗口中,我们将看不到任何输出,因为什么都还没发生。然后移步客户端窗口,并执行以下命令:
译注:执行下面的命令,观察得到的结果,就会发现在 2345 端口上已经有程序在监听。
$ ss -ntlp | grep 2345 ✔ LISTEN 0 5 0.0.0.0:2345 0.0.0.0:* users:(("beam.smp",pid=183649,fd=17))
1> socket_examples:nano_client_eval("list_to_tuple([2+3*4,10+20])").
在服务器的窗口,我们就会看到如下内容:
Server received binary = <<131,107,0,28,108,105,115,116,95,116,111,95,116,117,
112,108,101,40,91,50,43,51,42,52,44,49,48,43,50,48,
93,41>>
Server (unpacked) "list_to_tuple([2+3*4,10+20])"
Server replying = {14,30}
Server socket closed
ok
最后,在服务器窗口中,我们将看到下面这个输出:
Server socket closed
顺序与并行服务器
在上一小节中,我们构造了个只接受一个连接并随后终止的服务器。稍微修改一下这些代码,我们就可以构造出两种不同类型的服务器。
- 顺序服务器 -- 一种一次只接受一个连接的服务器;
- 并行服务器 -- 一种同时接受多个并行连接的服务器。
原先的代码像下面这样启动:
start_nano_server() ->
{ok, Listen} = gen_tcp:listen(...),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
...
我们将修改这种方式,构造出我们的两个服务器变种。
顺序服务器
要构造一个顺序服务器,我们就要将这段代码,修改为如下:
start_seq_server() ->
{ok, Listen} = gen_tcp:listen(...),
seq_loop(Listen).
seq_loop(Listen) ->
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket),
seq_loop(Listen).
loop(..) -> %% as before
这与上一示例中的工作方式基本相同,但由于我们想要服务多个请求,所以我们就让监听套接字打开,而未调用 gen_tcp:close(Listen)
。另一区别是在 loop(Socket)
结束后,我们再次调用了 seq_loop(Listen)
,其会等待下一连接。
当某个客户端在服务器忙于某个现有连接时,尝试连接到该服务器,那么这个连接将被排队,直到服务器完成那个现有连接。当排队连接数超出监听积压队列大小,那么该连接将被拒绝。
我们只给出的是启动服务器的代码。停止服务器很简单(停止并行服务器也很简单);只要杀死启动服务器的进程。gen_tcp
会将自身链接到控制进程,并在控制进程死亡时,他就会关闭套接字。
并行服务器
构造并行服务器的诀窍在于,当每次 gen_tcp:accept
获取一个新连接时,都立即生成一个新的进程。
start_parallel_server() ->
{ok, Listen} = gen_tcp:listen(...),
spawn(fun() -> par_connect(Listen) end).
par_connect(Listen) ->
{ok, Socket} = gen_tcp:accept(Listen),
spawn(fun() -> par_connect(Listen) end),
loop(Socket).
loop(..) -> %% as before
这段代码类似于我们前面看到的顺序服务器。最重要的区别,是增加了个 spawn
,其确保我们会对每个新的套接字连接,创建一个并行进程。现在是比较两者的好机会。咱们应该关注两个 spawn
语句的位置,并看看他们如何把顺序服务器,变成了并行服务器。
译注:运行这个并行服务器,并在另一窗口发起请求时,得到的响应如下:
1> socket_examples:nano_client_eval("list_to_tuple([3+3*4,10+45,10*100])"). Client received binary = <<131,104,3,97,15,97,55,98,0,0,3,232>> Client result = {15,55,1000} ok 2> socket_examples:nano_client_eval("list_to_tuple([3+3*4,10+45,10*1000])"). Client received binary = <<131,104,3,97,15,97,55,98,0,0,39,16>> Client result = {15,55,10000} ok
服务器上的输出为:
2> socket_examples:start_parallel_server(). <0.92.0> Server received binary = <<131,107,0,35,108,105,115,116,95,116,111,95,116,117, 112,108,101,40,91,51,43,51,42,52,44,49,48,43,52,53, 44,49,48,42,49,48,48,93,41>> Server (unpacked) "list_to_tuple([3+3*4,10+45,10*100])" Server replying = {15,55,1000} Server socket closed Server received binary = <<131,107,0,36,108,105,115,116,95,116,111,95,116,117, 112,108,101,40,91,51,43,51,42,52,44,49,48,43,52,53, 44,49,48,42,49,48,48,48,93,41>> Server (unpacked) "list_to_tuple([3+3*4,10+45,10*1000])" Server replying = {15,55,10000} Server socket closed
全部三种服务器,都调用了 gen_tcp:listen
和 gen_tcp:accept
;唯一的区别是,我们是在并行程序中,还是在顺序程序中,调用的这些函数。
注意事项
请留意以下几点:
-
创建出套接字(通过调用
gen_tcp:accept
或gen_tcp:connect
)的进程,叫做该套接字的 控制进程。该套接字上的所有信息,都将发送给这个控制进程;当控制进程死亡时,那么这个套接字将被关闭。通过调用gen_tcp:controlling_process(Socket,NewPid)
,套接字的控制进程可被更改为NewPid
; -
我们的并行服务器,可潜在创建出上万连接。我们可能打算限制同时连接的最大数量。这可以通过维护一个任一时间处于活动状态的连接计数器实现。每次得到个新连接时,我们就会递增这个计数器,而在每次某个连接结束时,我们会递减这个计数器。我们可以此限制系统中,同时连接的总数;
-
在我们已接受某个连接后,显式地设置要求的套接字选项是个好主意,就像这样:
{ok, Socket} = gen_tcp:accept(Listen), inet:setopts(Socket, [{packet,4},binary, {nodelay,true},{active,true}]), loop(Socket)
-
从 Erlang R11B-3 版本开始,就允许多个 Erlang 进程,在同一个监听套接字上调用
gen_tcp:accept/1
。这简化了构造并行服务器,因为咱们可以有一个预生成进程池,其中所有进程都在gen_tcp:accept/1
状态下等待。
主动与被动套接字
Erlang 的套接字,可以三种模式打开:
- 主动,active;
- 主动一次, active once;
- 或 被动, passive。
这是通过在 gen_tcp:connect(Address, Port, Options)
或 gen_tcp:listen(Port, Options)
的 Options
参数中,加入 {active, true | false | once}
选项完成的。
当 {active, true}
被指定时,那么一个主动套接字将被创建;{active, false}
则会指定一个被动套接字。{active,once}
会创建一个主动套接字,但针对一个报文的接收;在其接收完这条报文后,在可接收下一条报文前,其必须被重新启用。
在接下来的几个小节中,我们将介绍这些不同类型套接字的用法。
主动套接字和被动套接字的区别,在于在信息被套接字收到时,会发生什么。
-
在某个主动套接字被创建后,当数据(在该套接字)接收到时,会有
{tcp, Socket, Data}
信息发送到控制进程。控制进程无法控制这些信息的流向。某个异常客户端可能会将数千条消息发送到系统,而这些消息都会发送到控制进程。控制进程无法阻止这种消息流; -
当套接字是在被动模式下打开时,那么控制进程必须调用
gen_tcp:recv(Socket,N)
,从接收该套接字上的数据。然后,他将尝试精准接收该套接字上的N
个字节。当N = 0
时,就会返回所有可用字节。在这种情况下,服务器可通过选择何时调用gen_tcp:recv
,控制来自客户端的消息流。
被动套接字用于控制到服务器的数据流。为了说明这点,我们可以三种方式,编写某个服务器的消息接收循环。
- 主动的消息接收(非阻塞)
- 被动消息接收(阻塞)
- 混合的消息接收(部分阻塞)
主动的消息接收(非阻塞)
我们的首个示例,会以主动模式打开一个套接字,然后接收该套接字上的消息。
{ok, Listen} = gen_tcp:(Port, [..,{active,true}...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
receive
{tcp, Socket, Data} ->
... do something with the data ...
{tcp_closed, Socket} ->
...
end.
此进程无法控制到那个服务器循环的消息流。当客户端产生数据速度,快于服务器消费该数据速度时,那么系统就会被消息 淹没 -- 消息缓冲区将被填满,进而系统可能崩溃,或行为异常。
这种服务器被称为 非阻塞 的服务器,因为他无法阻塞客户端。只有当我们确信非阻塞服务器能够满足客户端需求时,我们才应编写非阻塞服务器。
被动消息接收(阻塞式)
在这一小节中,我们将编写个阻塞式的服务器。该服务器会通过设置 {active, false}
选项,打开一个被动套接字。这个服务器不会被试图以过多数据,淹没他的过度活跃客户端崩溃掉。
服务器循环种的代码,在其每次打算接收数据时,都要调用 gen_tcp:recv
。在服务器调用 recv
之前,客户端将出现阻塞。请注意,即使 recv
未被调用,操作系统也会完成一些允许客户端在阻塞前,发送少量数据的缓冲。
{ok, Listen} = gen_tcp:listen(Port, [..,{active,false}...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
case gen_tcp:recv(Socket, N) of
{ok, B} ->
... do something with the data ...
loop(Socket);
{error, closed} ->
...
end.
混合方式(部分阻塞)
咱们可能会认为,将被动模式用于所有服务器,便是对的做法。不幸的是,当我们处于被动模式中时,我们只能等待一个套接字上的数据。这对于编写那些必须等待多个套接字上数据的服务器,毫无用处。
幸运的是,我们可以采取一种既非阻塞,也非非阻塞的混合方式。我们以选项 {active, once}
打开套接字。在这种模式下,套接字是主动的,但只对一条消息主动。在向控制进程发送一条信息后,他必须显式调用 inet:setopts
,重新启用下一条消息的接收。在此之前,系统将阻塞。这是两全其美的办法。代码如下:
{ok, Listen} = gen_tcp:listen(Port, [..,{active, once}...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
receive
{tcp, Socket, Data} ->
... do something with the data ...
%% when you're ready enable the next messsage
inet:setopts(Socket, [{active, once}]),
loop(Socket);
{tcp_closed, Socket} ->
...
end.
使用 {active, once}
这个选项,用户就可实现流量控制的一些高级形式(有时称为 流量整形),从而防止服务器被过多信息淹没。
找出连接来自何处
设想我们编写了某种在线服务器,发现有人不断向我们的网站发送垃圾数据。要尝试防止这种情况,我们就需要指导连接来自何处。要发现这点,我们可调用
inet:peername(Socket)
。
@spec inet:peername(Socket) -> {ok, {IP_Address, Port}} | {error, Why}
这个函数会返回连接另一端的 IP 地址和端口,从而服务器可以发现是谁发起的该连接。
IP_Address
是个整数元组,{N1,N2,N3,N4}
表示 IPv4 的 IP 地址,{K1,K2,K3,K4,K5,K6,K7,K8}
表示 IPv6 的 IP 地址。这里,Ni
是范围 0 至 255 之间的整数,Ki
是范围在 0 至 65535 间的整数。
套接字下的错误处理
套接字的错误处理极其简单 -- 基本上咱们无需做什么。如前所述,每个套接字都有个控制进程(即创建出该套接字的进程)。当控制进程死掉时,那么该套接字将自动关闭。
这意味着,比如当我们有个客户端和服务器,而由于编程错误服务器死掉时,服务器所拥有的那个套接字将被自动关闭,同时客户端将被发送一条 {tcp_closed, Socket}
消息。
我们可以下面的小的程序,测试这种机制:
error_test() ->
spawn(fun() -> error_test_server() end),
lib_misc:sleep(2000),
{ok, Socket} = gen_tcp:connect("localhost",4321,[binary, {packet, 2}]),
io:format("connected to: ~p~n", [Socket]),
gen_tcp:send(Socket, <<"123">>),
receive
Any ->
io:format("Any=~p~n", [Any])
end.
error_test_server() ->
{ok, Listen} = gen_tcp:listen(4321, [binary, {packet, 2}]),
{ok, Socket} = gen_tcp:accept(Listen),
error_test_server_loop(Socket).
error_test_server_loop(Socket) ->
receive
{tcp, Socket, Data} ->
io:format("received: ~p~n", [Data]),
_ = atom_to_list(Data),
error_test_server_loop(Socket)
end.
当我们运行他时,我们就会看到如下结果:
1> socket_examples:error_test().
connected to: #Port<0.5>
received: <<"123">>
Any={tcp_closed,#Port<0.5>}
=ERROR REPORT==== 10-Oct-2025::10:33:26.068146 ===
Error in process <0.87.0> with exit value:
{badarg,[{erlang,atom_to_list,
[<<"123">>],
[{error_info,#{module => erl_erts_errors}}]},
{socket_examples,error_test_server_loop,1,
[{file,"socket_examples.erl"},{line,105}]}]}
ok
我们生成了个服务器,休眠两秒钟给他启动的机会,然后发送给他一条包含二进制值 <<"123">>
的消息。当这条信息到达服务器时,服务器会尝试计算 atom_to_list(Data)
,其中 Data
是个二进制值,因此会立即崩溃。系统监视器会打印出咱们在 shell 中看到的诊断结果。既然该套接字的服务器侧控制进程已经崩溃,那么(服务器侧)的套接字,就会被自动关闭。然后客户端即会被发送一条 {tcp_closed, Socket}
消息。
UDP
现在我们来看看用户数据报协议(UDP)。使用 UDP,互联网上的机器可互相发送一些称为 数据报 的短信息。UDP 数据报是不可靠的。这意味着,当某个客户端将某个 UDP 数据报的序列,发送给某个服务器时,随后这些数据报可能不按顺序到达,或者根本没有到达,或甚至不止一次到达,但当单个数据报到达时,他们将是非受损的。大的数据报可被分割成较小的分片,而 IP 协议会在将这些片段传送给应用前,将重新组装这些分片。
UDP 是种 无连接 协议,表示在向服务器发送信息前,客户端不必建立连接。这意味着 UDP 非常适合那些有大量客户端,向服务器发送小的消息应用。
编写 Erlang 的 UDP 客户端和服务器,要比编写 TCP 情形下的容易得多,因为我们不必担心维护与服务器的连接。
最简单的 UDP 服务器与客户端
首先我们来谈谈服务器。UDP 服务器的一般形式如下:
server(Port) ->
{ok, Socket} -> gen_udp:open(Port, [binary]),
loop(Socket).
loop(Socket) ->
receive
{udp, Socket, Host, Port, Bin} ->
BinReply = ...,
gen_udp:send(Socket, Host, Port, BinReply),
loop(Socket)
end.
这要比 TCP 情况简单,因为我们无需担心我们的进程会收到 "socket closed"
消息。请注意,我们打开的是 二进制 模式套接字,这会告诉驱动程序,将所有消息作为二进制数据,发送给控制进程。
现在是客户端。这是个非常简单的客户端。他只是打开个 UDP 套接字、发送一条信息到服务器、等待回复(或超时),然后关闭套接字并返回由服务器返回的值。
client(Request) ->
{ok, Socket} = gen_udp:open(0, [binary]),
ok = gen_udp:send(Socket, "localhost", 4000, Request),
Value = receive
{udp, Socket, _, _, Bin} ->
{ok, Bin}
after 2000 ->
error
end,
gen_udp:close(Socket),
Value.
我们必须要有个超时时间,因为 UDP 是不可靠的,进而我们可能不会真正得到一个回复。
UDP 阶乘服务器
我们可轻松构建一个计算发送给他任何数字阶乘的 UDP 服务器。代码是以上一小节中的代码为蓝本。
-module(udp_test).
-export([start_server/0, client/1]).
start_server() ->
spawn(fun() -> server(4000) end).
%% The server
server(Port) ->
{ok, Socket} = gen_udp:open(Port, [binary]),
io:format("server opened socket: ~p~n", [Socket]),
loop(Socket).
loop(Socket) ->
receive
{udp, Socket, Host, Port, Bin} = Msg ->
io:format("server received: ~p~n", [Msg]),
N = binary_to_term(Bin),
Fac = fac(N),
gen_udp:send(Socket, Host, Port, term_to_binary(Fac)),
loop(Socket)
end.
fac(0) -> 1;
fac(N) -> N * fac(N-1).
%% The client
client(N) ->
{ok, Socket} = gen_udp:open(0, [binary]),
io:format("client opened socket = ~p~n", [Socket]),
ok = gen_udp:send(Socket, "localhost", 4000, term_to_binary(N)),
Value = receive
{udp, Socket, _, _, Bin} = Msg ->
io:format("client received: ~p~n", [Msg]),
binary_to_term(Bin)
after 2000 ->
0
end,
gen_udp:close(Socket),
Value.
请注意,我(作者)添加了一些打印语句,这样我们就能看到,在我们运行程序时发生了什么。在开发程序时,我(作者)总是会添加一些打印语句,然后在程序可工作时,编辑或注释掉他们。
现在我们来运行这个示例。首先我们启动服务器。
1> udp_test:start_server().
<0.96.0>
server opened socket: #Port<0.7>
这会在后台运行,因此我们可以构造一个求 40 阶乘值的客户端请求。
1> udp_test:client(40).
client opened socket = #Port<0.3>
client received: {udp,#Port<0.3>,
{127,0,0,1},
4000,
<<131,110,20,0,0,0,0,0,64,37,5,255,100,222,15,8,126,242,
199,132,27,232,234,142>>}
815915283247897734345611269596115894272000000000
译注:原文是在同一个 Erlang shell 下启动服务器,并构造该客户端请求。那样客户端和服务器的打印输出混在一起,影响可读性。这里是在另一个 Erlang shell 窗口中构造客户端请求,而在服务器启动 Erlang shell 窗口中的输出如下。
server received: {udp,#Port<0.3>,{127,0,0,1},49950,<<131,97,10>>}
若在同一个 Erlang shell 窗口中构造客户端请求,则混合的输出如下。
2> udp_test:client(20). client opened socket = #Port<0.4> server received: {udp,#Port<0.3>,{127,0,0,1},58999,<<131,97,20>>} client received: {udp,#Port<0.4>, {127,0,0,1}, 4000, <<131,110,8,0,0,0,180,130,124,103,195,33>>} 2432902008176640000
现在我们就有了个小小的 UDP 的阶乘服务器。为了好玩,咱们可尝试编写与此程序等价的 TCP 对等程序,并对两者进行基准测试。
UDP 数据包陷阱
我们应该注意到,由于 UDP 是种无连接协议,服务器无法通过拒绝读取来自客户端数据来阻止客户端 -- 服务器不知道客户机是谁。
大的 UDP 数据包在通过网络时,可能会被分片。当 UDP 数据的大小,大于了数据包在网络上传输时,所经过路由器的允许最大传输单元 (MTU) 大小时,就会出现分片。优化 UDP 网络的一个常见建议,便是从一个较小的数据包大小(比如约 500 字节)开始,然后在测量吞吐量的同时,逐渐将其增大。当在某个时刻吞吐量急剧下降时,咱们就知道数据包太大了。
某个 UDP 数据包可被两次投送(这会惊讶到一些人),因此咱们必须仔细编写远程过程调用的代码。比如就可能出现对某个第二次查询的回复,实际上是对第一次查询的一个重复回复。为避免这种情况,我们可将客户端代码,修改为包含一个唯一引用,并检查该引用是否被服务器返回。为生成唯一引用,我们要调用 Erlang 的内建函数 make_ref
,其保证会返回一个全局唯一的引用。现在的远程过程调用代码,看起来如下:
client(Request) ->
{ok, Socket} = gen_udp:open(0, [binary]),
Ref = make_ref(), %% make a unique reference
B1 = term_to_binary({Ref, Request}),
ok = gen_udp:send(Socket, "localhost", 4000, B1),
wait_for_ref(Socket, Ref).
wait_for_ref(Socket, Ref) ->
receive
{udp, Socket, _, _, Bin} ->
case binary_to_term(Bin) of
{Ref, Val} ->
%% got the correct value
Val;
{_SomeOtherRef, _} ->
%% some other value throw it away
wait_for_ref(Socket, Ref)
end
after 1000 ->
...
end.
现在我们学完了 UDP。UDP 通常用于有低延迟要求,同时即使偶尔丢失数据包也无所谓的在线游戏。
广播到多台机器
最后,我们将看看设置广播信道的方式。下面这段代码非常简单。
-module(broadcast).
-export([send/1, listen/0]).
send(IoList) ->
case inet:ifget("virbr0", [broadaddr]) of
{ok, [{broadaddr, Ip}]} ->
io:format("Broadcast IP: ~p~n", [Ip]),
{ok, S} = gen_udp:open(5010, [{broadcast, true}]),
gen_udp:send(S, Ip, 6000, IoList),
gen_udp:close(S);
_ ->
io:format("Bad interface name, or\n"
"broadcasting not supported\n")
end.
listen() ->
{ok, _} = gen_udp:open(6000),
loop().
loop() ->
receive
Any ->
io:format("received: ~p~n", [Any]),
loop()
end.
这里我们需要两个端口,一个发送广播,另一个监听应答。我们已选择端口 5010 发送广播请求,端口 6000 监听广播消息(这两个数字没有意义;我(作者)只是在我的系统上,选择了两个空闲端口)。
只有执行广播的那个进程,才会打开端口 5000,但网络中的所有机器,都要调用 broadcast:listen()
,他会打开端口 6000 并监听广播消息。
broadcast:send(IoList)
会将 IoList
广播到局域网上的所有机器。
注意:要使其正常工作,(网络)接口的名字必须正确,并且广播必须被支持。例如,在我(作者)的 iMac 上,我使用了名字为 "en0"
,而不是 "eth0"
。还要注意的是,当运行 UDP 监听器的主机,位于不同网络的子网上,那么 UDP 的广播就不可能到达他们,因为默认情况下路由器会丢弃此类 UDP 广播。
译注:以下是笔者在一个由宿主机及虚拟机组成的本地网络上的实验输出。
- 在一台虚拟机上发送广播消息
1> broadcast:send("test"). Broadcast IP: {192,168,122,255} ok 2> broadcast:send(["weath", " is ", "good"]). Broadcast IP: {192,168,122,255} ok
- 在宿主机及其他虚拟机上收到消息
$ erl -noshell -s broadcast listen received: {udp,#Port<0.3>,{192,168,122,122},5010,"test"} received: {udp,#Port<0.3>,{192,168,122,122},5010,"weath is good"}
SHOUTcast 服务器
我们将运用我们新掌握的套接字编程技能,编写一个 SHOUTcast 服务器,来结束本章。SHOUTcast 是个由 Nullsoft 公司的人,开发的用于串流音频数据的协议。SHOUTcast 使用 HTTP 作为传输协议,发送 MP3 或 AAC 编码的音频数据。
要了解其工作原理,我们将首先了解这个 SHOUTcast 协议。然后,我们将了解服务器的整体结构。我们将完成代码。
SHOUTcast 协议
SHOUTcast 协议相当简单。
SHOUTcast 协议相当简单。
- 首先,客户端(可以是 XMMS、Winamp 或 iTunes 等)向 SHOUTcast 服务器发送一个 HTTP 请求。下面是我(作者)在家中运行 SHOUTcast 服务器时,XMMS 生成的请求:
GET / HTTP/1.1
Host: localhost
User-Agent: xmms/1.2.10
Icy-MetaData:1
- 我(作者)的 SHOUTcast 会以下面的内容回复:
ICY 200 OK
icy-notice1: <BR>This stream requires
<a href=http://www.winamp.com/>;Winamp</a><BR>
icy-notice2: Erlang Shoutcast server<BR>
icy-name: Erlang mix
icy-genre: Pop Top 40 Dance Rock
icy-url: http://localhost:3000
content-type: audio/mpeg
icy-pub: 1
icy-metaint: 24576
icy-br: 96
... data ...
- 现在,SHOUTcast 服务器就发送连续的数据流。而数据则有着如下结构:
F H F H F H F ...
其中 F
是 MP3 音频数据的块,长度必须正好为 24 576 字节(该值是 icy-metaint
参数所给定的)。H
是个头部块。头部块由单个字节 K
后跟刚好 16*K
字节数据组成。因此,能用二进制表示的最小头部块便是 <<0>>
。下一个头部块可被表示如下:
<<1,B1,B2,...,B16>>
头部数据部分的内容,是个 StreamTitle=' ...';StreamUrl='http:// ...';
形式的字符串,其会被向右填充为零,以填满整个块。
SHOUTcast 服务器工作原理
要构造一个服务器,我们必须注意以下细节:
-
构造一个 播放列表。我们的服务器会使用一个包含着我们在 读取 MP3 元数据 中,所创建歌曲标题的列表。音频文件会从该列表中随机选择;
-
构造一个并行服务器,这样我们就能以并行方式提供多个串流。我们可使用 并行服务器 中描述的技术,完成这点;
-
对于每个音频文件,我们打算只发送音频数据,而 不 发送嵌入的 ID3 标签到客户端。音频编码器应会跳过坏的数据,因此原则上我们可以与数据一起发送 ID3 标签。实际上,当我们移除 ID3 标签时,程序似乎会运行得更好。
要移除这些标签,我们就要使用位于本书可下载源代码中的
code/id3_tag_lengths.erl
中代码。
SHOUTcast 服务器的伪代码
在看最终程序前,我们先来看看细节省略后,代码的整体流程。
start_parallel_server(Port) ->
{ok, Listen} = gen_tcp:listen(Port, ..),
%% create a song server -- this just knows about all our music
PidSongServer = spawn(fun() -> songs() end),
spawn(fun() -> par_connect(Listen, PidSongServer) end)..
%% spawn one of these processes per connection
par_connect(Listen, PidSongServer) ->
{ok, Socket} = gen_tcp:accept(Listen),
%% when accept returns spawn a new process to
%% wait for the next connection
spawn(fun() -> par_connect(Listen, PidSongServer) end),
inet:setopts(Socket, [{packet,0},binary,{nodelay,true},
{active,true}]),
%% deal with the request
get_request(Socket, PidSongServer, []).
%% wait for the TCP request
get_request(Socket, PidSongServer, L) ->
receive
{tcp, Socket, Bin} ->
... Bin contains the request from the client
... if the request is fragmented we call loop again ...
... otherwise we call
.... got_request(Data, Socket, PidSongServer)
{tcp_closed, Socket} ->
... this happens if the client aborts
... before it has sent a request (very unlikely)
end.
%% we got the request -- send a reply
got_request(Data, Socket, PidSongServer) ->
.. data is the request from the client ...
.. analyze it ...
.. we'll always allow the request ..
gen_tcp:send(Socket, [response()]),
play_songs(Socket, PidSongServer).
%% play songs forever or until the client quits
play_songs(Socket, PidSongServer) ->
... PidSongServer keeps a list of all our MP3 files
Song = rpc(PidSongServer, random_song),
... Song is a random song ...
Header = make_header(Song),
... make the header ...
{ok, S} = file:open(File, [read,binary,raw]),
send_file(1, S, Header, 1, Socket),
file:close(S),
play_songs(Socket, PidSongServer).
send_file(K, S, Header, OffSet, Socket) ->
... send the file in chunks to the client ...
... returns when the entire file is sent ...
... but exits if we get an error when writing to
... the sockets -- this happens if the client quits
当咱们看到真实代码时,就会发现细节略有不同,但原则是同样的。完整的代码列表为在这里展示,而是在文件 code/shout.erl
中。
运行这个 SHOUTcast 服务器
要运行这个服务器并测试其是否工作,我们需要执行以下三个步骤:
- 构造一个播放列表;
- 启动该服务器;
- 将某个客户端指向该服务器。
要构造播放列表,请完成以下步骤:
-
切换到代码目录;
-
编辑
mp3_manager.erl
这个文件中,start1
函数中的路径,为指向包含咱们要提供服务的音频文件目录的根目录; -
编译
mp3_manager
,并执行mp3_manager:start1()
。咱们将看到如下的一些输出:1> c(mp3_manager). mp3_manager.erl:11:2: Warning: export_all flag enabled - all functions will be exported % 11| -compile(export_all). % | ^ {ok,mp3_manager} 2> mp3_manager:start1(). ** dumping to mp3data.tmp ok
若咱们感兴趣,咱们现在可以查看
mp3data
这个文件,看看分析结果。
现在我们就可以启动这个 SHOUTcast 服务器了。
1> shout:start().
<0.87.0>
要测试该服务器,请完成以下步骤:
-
前往另一个窗口启动某个音频播放器,并将其指向名为
http://localhost:3000
的串流;在我(作者)的系统上,我使用 XMMS 并执行了以下命令:
xmms http://localhost:3000
注意:若咱们想要从另一台电脑访问该服务器,咱们将必须提供服务器正运行在的机器 IP 地址。比如要使用我 Windows 机器上的 Winamp 访问该服务器,那么我就要使用 Winamp 中的
Play > URL
菜单,并在Open URL
对话框中,输入地址http://192.168.1.168:3000
。而在我(作者)的 iMac 使用 iTunes 时,我使用了
Advanced > Open Stream
菜单,并提供前面的 URL 访问该服务器。 -
咱们将在咱们启动该服务器的窗口中,看到一些诊断输出;
-
享用吧!
译注:译者在使用
ffplay http://localhost:3000
测试服务器是否工作时,出现以下报错。... [http @ 0x7fea240013c0] Stream ends prematurely at 0, should be 18446744073709551615 http://localhost:3000: Input/output error nan : 0.000 fd= 0 aq= 0KB vq= 0KB sq= 0B
同时服务器窗口中的报错如下。
... {shout,play_songs,3,[{file,"shout.erl"},{line,76}]}]} =ERROR REPORT==== 11-Oct-2025::18:30:02.877365 === Error in process <0.1189.0> with exit value: {function_clause, [{shout,make_header1, [{function_clause, [{mp3_manager,parse_txt, [<<1>>], [{file,"mp3_manager.erl"},{line,74}]}, {mp3_manager,parse_frame,1, [{file,"mp3_manager.erl"},{line,68}]}, {lists,map_1,2,[{file,"lists.erl"},{line,2390}]}, {lists,map_1,2,[{file,"lists.erl"},{line,2390}]}, {lists,map,2,[{file,"lists.erl"},{line,2385}]}, {mp3_manager,parse_frames,1, [{file,"mp3_manager.erl"},{line,64}]}, {mp3_manager,read_id3_tag,1, [{file,"mp3_manager.erl"},{line,30}]}, {mp3_manager,handle,1,[{file,"mp3_manager.erl"},{line,24}]}]}], [{file,"shout.erl"},{line,187}]}, {shout,unpack_song_descriptor,1,[{file,"shout.erl"},{line,175}]}, {shout,play_songs,3,[{file,"shout.erl"},{line,75}]}]}
在使用 VLC 连接该服务器后,服务器窗口中弹出了诊断输出,但播放器侧没有播放音乐。后期可能需要进一步调试该服务器。
在本章中,我们只介绍了操作套接字的一些最常用到的函数。有关套接字 API 的更多信息,请参阅 gen_tcp
、gen_udp
及 inet
的手册页面。
套接字接口与 term_too_binary/1
及其反义词 binary_too_term
两个 BIFs 的简单组合,就会使网络通讯非常容易。我(作者)建议咱们,完成下面的练习亲身体验一下。
在下一章中,我们将学习 websocket。使用 websocket,Erlang 进程可以直接与 web 浏览器通信,而无需遵循 HTTP 协议。这非常适合于实现一些低延迟的 web 应用,并为 web 应用编程,提供了一种简便方法。
练习
-
请修改
nano_get_url/0
的代码(获取服务器上的数据 小节),在必要处添加一些适当的头部,并执行在需要时进行重定向,以便获取任何网页。请在多个网站上测试这一特性; -
请输入 一个简单的 TCP 服务器 代码。然后修改代码以接收
{Mod, Func, Args}
元组而非字符串。最后计算Reply = apply(Mod, Func, Args)
并将值发送回到套接字上;请编写与本章中前面所给出版本类似的一个函数
nano_client_eval(Mod, Func, Args)
,要将Mod
、Func
及Arity
编码为可被修改后服务器代码,所能理解的形式。请首先在同一台机器上测试客户端与服务器代码是否正确工作,然后在同一局域网内的两台机器上,最后在互联网上的两台机器上测试。
-
请使用 UDP 代替 TCP 重复前一练习;
-
通过在将二进制值发送到传出套接字上前立即对其进行编码,并在传入套接字上收到后立即对其进行解码,从而增加一个加密层;
-
请构造创建一个简单的 "类电子邮件" 系统。使用 Erlang 的项作为消息,并将其存储在
${HOME}/mbox
目录下。