Youmai の Blog


  • 首页

  • 分类

  • 关于

  • 归档

  • 标签

nginx代理socket.io服务踩坑

发表于 2018-10-29 | 分类于 socket.io

场景

nginx代理了两台socket.io服务器。socket.io的工作模式是polling升级到websocket

现象

通过nginx请求服务时,出现了大量的400错误,有时候能升级到websocket,有时候会一直报错。但是直接通过ip+端口访问时,100%能成功。

400

分析

sid

sid是我们这个问题的关键。在初始创建连接时(polling模式就是在模拟一个长连接),客户端会发起这样的请求:

1
https://***/?EIO=3&transport=polling&t=1540820717277-0

服务端收到后会创建一个对象,绑定在这个连接上,同时返回一个sid(session id),来标记这个会话。会话指什么呢,会话是一连串的交互,这些交互之间是有联系的,在我们这个场景下就是,下一次的http请求到来,我需要找到之前绑定在理论上的长连接(这里还没有websocket,所以是理论上的)上的那个对象。我们知道http请求是无状态的,每个请求之间独立,所以socket.io引入了sid来做这件事。服务端收到请求后会生成一个sid,看下response:

1
{"sid":"EoGaL3fRQlpTOaLp5eST","upgrades":["websocket"],"pingInterval":8000,"pingTimeout":10000}

之后每次请求都需要带上这个sid,建立websocket请求的连接也不例外。所以说,sid是polling,以及polling升级到websocket的关键。这之后的请求类似于:

1
2
3
4
5
https://***/?EIO=3&transport=polling&t=1540820717314-1&sid=EoGaL3fRQlpTOaLp5eST
or
wss://***/?EIO=3&transport=websocket&t=1540820717314-1&sid=EoGaL3fRQlpTOaLp5eST

那么问题来了,如果请求是带上的sid不是服务端生成的会怎样呢?服务端会不认识,给你返回一个400,并告诉你

1
invalid sid

我们遇到的便是这个问题,nginx默认的负载均衡策略是轮询,所以请求有可能会打到不是生成这个sid的机器上去,这时候我们就会收到一个400,如果运气好,可能也会打到原来的机器上,运气更好一点,甚至能坚持到websocket连接建立。

解决

这里提出两种方案

  1. nginx的负载均衡采用ip_hash,这样能保证一个客户端的请求都走到一台服务器上
  2. 不使用polling模式,只使用websocket

这两种方案各有利弊。第二种显而易见,不支持websocket的古老浏览器和客户端将没法工作。第一种的问题隐藏得比较深,试想,如果你增减了机器会怎样,这时候ip_hash策略的模将变化,之前的连接将全部失效,而对于微服务,扩缩容是很频繁的操作(特别是产品处于发展期),这种有损的扩缩容很大概率是不能接受的。

综上,建议直接使用websocket,毕竟不支持websocket的老版本占比很少,而且相对于先polling,耗时也会减少。

真实世界中的WebRTC:STUN, TURN and signaling

发表于 2018-08-01 | 分类于 WebRTC

WebRTC使端到端能够通信。

但是…

WebRTC仍然需要服务器:

  • 让客户端交换元数据来协调通信:这被称为信令(signaling)
  • 处理网络地址转换(NATs)和防火墙

这篇文章将会展示如何搭建一个信令服务

在本文中,我们将向您展示如何构建信令服务,以及如何使用STUN和TURN服务器处理真实连接的难题。 我们还解释了WebRTC应用程序如何处理多方通话以及与VoIP和PSTN(又称电话)等服务进行交互。

什么是信令

信令是协调通信的过程。 为了使WebRTC应用程序能够建立一个“通话”,其客户端需要交换以下信息:

  • 会话控制消息用于打开或关闭通信
  • 错误消息
  • 媒体元数据,如编解码器和编解码器设置,带宽和媒体类型
  • 密钥数据,用于建立安全的连接
  • 网络数据,如:外界看到的主机IP地址和端口

此信令过程需要一种方法让客户端来回传递消息。 WebRTC API不实现该机制:你需要自己构建它。 我们在下面描述了构建信令服务的一些方法。 首先,需要一点背景…

为什么信令不是由WebRTC定义的?

为了避免出现冗余,并最大限度地提高与已有技术的兼容性,WebRTC标准并没有规定信令方法和协议。JavaScript会话建立协议JSEP概述了这种方法:

WebRTC通话建立的思想是完全指定和控制媒体平面,但是尽可能将信令平面留给应用程序。其原理是,不同的应用程序可能更喜欢使用不同的协议,例如现有的SIP或Jingle呼叫信令协议,或者对于特定应用程序定制的东西,可能是针对新颖的用例。在这种方法中,需要交换的关键信息是多媒体会话描述,其指定了建立媒体平面所必需的传输和媒体配置信息。

JSEP的架构也避免了浏览器不得不保存状态,即作为一个信令状态机。如果信令数据在每次刷新页面的时候都会发生丢失,就会出现问题。相反,信令状态机可以保存在服务器上。

jsep arch

JSEP要求端之间交换offer和answer:上面提到的媒体元数据。offer和answer以会话描述协议(SDP)的格式传递,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
v=0
o=- 7614219274584779017 2 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE audio video
a=msid-semantic: WMS
m=audio 1 RTP/SAVPF 111 103 104 0 8 107 106 105 13 126
c=IN IP4 0.0.0.0
a=rtcp:1 IN IP4 0.0.0.0
a=ice-ufrag:W2TGCZw2NZHuwlnf
a=ice-pwd:xdQEccP40E+P0L5qTyzDgfmW
a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level
a=mid:audio
a=rtcp-mux
a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:9c1AHz27dZ9xPI91YNfSlI67/EMkjHHIHORiClQe
a=rtpmap:111 opus/48000/2
…

想知道所有这些SDP gobbledygook实际意味着什么? 看看IETF的例子。

需要记住的是,WebRTC被设计为在被设置为本地或者远端描述之前,通过编辑SDP文本中的值,可以扭转offer或者answer。例如,apprtc.tc中的preferAudioCodec()函数可用于设置默认编解码器和比特率。使用 JavaScript处理SDP有些困难,有一些关于WebRTC的未来版本中是否应该使用JSON的讨论,但是坚持使用SDP还是有一些优势的。

RTCPeerConnection + signaling: offer, answer and candidate

RTCPeerConnection是WebRTC应用程序用来创建端对端连接并传输音视频的API。

为初始化这个过程,RTCPeerConnection有两个工作要做:

  • 确定本地媒体条件,如分辨率和编解码器功能。这是用于offer和answer机制的元数据。
  • 获取应用程序主机的潜在网络地址,成为候选人(candidates)

一旦确定了本地数据,就必须通过信令机制与远端进行交换。

让我们假设一个场景:Alice正在尝试呼叫Eve。下面是完整的offer/answer机制:

  1. Alice创建一个RTCPeerConnection对象。
  2. Alice使用RTCPeerConnection createOffer()方法产生一个offer(一个SDP会话描述)。
  3. Alice用他的offer调用setLocalDescription()。
  4. Alice将offer字符串化,并使用信令机制将其发送给Eve。
  5. Eve用Alice的offer调用setRemoteDescription(),以便她的RTCPeerConnection知道Alice的设置。
  6. Eve调用createAnswer(),成功的回调是传入一个本地的会话描述:Eve的answer。
  7. Eve通过调用setLocalDescription()将其answer设置为本地描述。
  8. Eve然后使用信令机制将她的字符串化的answer发回给Alice。
  9. Alice使用setRemoteDescription()将Eve的应答设置为远程会话描述。

Alice和Eve也需要交换网络信息。“查找候选人(find candidate)”这个表达是指使用ICE框架查找网络接口和端口的过程。

  1. Alice使用onicecandidate handler创建一个RTCPeerConnection对象。
  2. handler在网络候选人变得可用时被调用。
  3. 在handler中,Alice通过他们的信令通道将字符串化的候选数据发送给Eve。
  4. 当Eve从Alice那里获得候选消息时,她调用addIceCandidate(),将候选项添加到远端描述中。

JSEP支持ICE Candidate Trickling,它允许主叫方(caller)在最初的offer之后递增地向被叫方提供候选项(candidates),并使被叫方开始在通话中进行操作并建立连接而不用等所有候选项到达。

WebRTC信令代码

下面的W3C代码示例总结了一个完整的信令过程。该代码假定存在一些信令机制,SignalingChannel。我们会在下文中更详细地讨论信令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// handles JSON.stringify/parse
const signaling = new SignalingChannel();
const constraints = {audio: true, video: true};
const configuration = {iceServers: [{urls: 'stuns:stun.example.org'}]};
const pc = new RTCPeerConnection(configuration);
// send any ice candidates to the other peer
pc.onicecandidate = ({candidate}) => signaling.send({candidate});
// let the "negotiationneeded" event trigger offer generation
pc.onnegotiationneeded = async () => {
try {
await pc.setLocalDescription(await pc.createOffer());
// send the offer to the other peer
signaling.send({desc: pc.localDescription});
} catch (err) {
console.error(err);
}
};
// once remote track media arrives, show it in remote video element
pc.ontrack = (event) => {
// don't set srcObject again if it is already set.
if (remoteView.srcObject) return;
remoteView.srcObject = event.streams[0];
};
// call start() to initiate
async function start() {
try {
// get local stream, show it in self-view and add it to be sent
const stream =
await navigator.mediaDevices.getUserMedia(constraints);
stream.getTracks().forEach((track) =>
pc.addTrack(track, stream));
selfView.srcObject = stream;
} catch (err) {
console.error(err);
}
}
signaling.onmessage = async ({desc, candidate}) => {
try {
if (desc) {
// if we get an offer, we need to reply with an answer
if (desc.type === 'offer') {
await pc.setRemoteDescription(desc);
const stream =
await navigator.mediaDevices.getUserMedia(constraints);
stream.getTracks().forEach((track) =>
pc.addTrack(track, stream));
await pc.setLocalDescription(await pc.createAnswer());
signaling.send({desc: pc.localDescription});
} else if (desc.type === 'answer') {
await pc.setRemoteDescription(desc);
} else {
console.log('Unsupported SDP type.');
}
} else if (candidate) {
await pc.addIceCandidate(candidate);
}
} catch (err) {
console.error(err);
}
};

要查看实际offer/answer和候选项的交流过程,请参阅simpl.info/pc上的“单页”视频聊天示例的控制台日志。如果你想知道更多,请从Chrome的chrome://webrtc-internals页面或者Opera中的opera://webrtc-internals页面下载WebRTC 信令和统计数据的完整转储。

对端发现

这是“我该如何找到我要交谈的人”的一种高端说法。

对于电话来说,我们有电话号码和目录。对于在线视频聊天和消息发送,我们需要身份和状态管理系统,以及用户启动会话的方式。WebRTC应用程序需要一种方式让客户互相通知他们想要开始或加入一个通话。

端发现机制不是由WebRTC定义的。这个过程可以像发送电子邮件或发送一个URL一样简单:对于视频聊天应用程序,比如talky.io,tawk.com和browsermeeting.com,您可以通过共享自定义链接来邀请人们进行通话。开发人员Chris Ball已经搭建了一个有趣的无服务器的webrtc实验,使WebRTC呼叫参与者能够通过他们喜欢的任何消息服务来交换元数据。

我怎么才能建立一个信令服务?

重申:信令协议和机制不是由WebRTC标准定义的。无论你选择什么,你都需要一个中间服务器来在客户端之间交换信令消息和应用程序数据。不走运的是,一个网络应用程序不能简单地向互联网喊“连接到我朋友那!”

幸亏信令消息很小,而且大多在通话开始的时候进行交换。在使用appr.tc进行测试时,我们发现对于视频聊天会话,信令服务总共处理了30-45条消息,总共消息的大小大约为10KB。

WebRTC 信令业务在带宽方面的要求相对较低,因为它们只需要中继消息并保留少量的会话状态数据(如连接的客户端),同样不会消耗太多的处理或存储空间。

小贴士:用于交换会话元数据的信令机制也可用于传送应用程序数据。这只是一个消息服务而已!。

将消息从服务器推送到客户端

信令的消息服务需要是双向的:客户端到服务器和服务器到客户端。双向通信违背了HTTP客户端/服务器的请求/响应模型,但是为了将数据从运行在Web服务器上的服务推送到运行在浏览器中的Web应用程序,多年来已经开发了诸如长轮询)之类的各种hac
k方法。

最近,EventSource API已经得到了广泛的实现。这开启了 “服务器发送的事件”:通过HTTP从Web服务器发送到浏览器客户端的数据。在simpl.info/es上有一个简单的演示。EventSource是为单向消息传递而设计的,但是它可以和XHR结合使用来搭建交换信令消息的服务:信令服务器通过XHR请求传递来自呼叫者的消息,通过EventSource推送给被叫者。

WebSocket是一个更自然的解决方案,专为全双工客户端-服务器通信而设计(消息可以同时在两个方向上传输)。使用纯WebSocket或Server-Sent Events(EventSource)构建的信令服务的一个优点是这些API的后端可以在大多数Web托管软件包通用的各种Web框架上实现,比如PHP,Python和Ruby。

大约四分之三的浏览器都支持WebSocket,更重要的是,所有支持WebRTC的浏览器都支持WebSocket,无论是在台式机还是手机上。应该为所有连接都使用TLS,以确保消息不会因为没有加密而被截取,并且减少代理遍历的问题。(有关WebSocket和代理遍历的更多信息,请参阅Ilya Grigorik的高性能浏览器网络中的WebRTC章节。Peter Lubber的WebSocket备忘单提供了有关WebSocket客户端和服务器的更多信息)

用于标准的appr.tc WebRTC视频聊天应用程序的信令通过Google App Engine Channel API完成,该API使用Comet)技术(长轮询)来启用App Engine后端与Web客户端之间推送通信的信令传输。HTML5 Rocks WebRTC文章中详细介绍了该应用程序的代码演示。

apprtc_in_action

也可以通过让WebRTC客户端通过Ajax轮询消息服务器来处理信令,但这回导致大量的冗余网络请求,尤其对于移动设备是有问题的。即使在会话建立之后,对端也需要轮询信令消息,以防其他端发生改变或终止会话。WebRTC Book app示例使用此选项,并对轮询频率进行了一些优化。

扩展信令

尽管信令服务消耗客户端带宽和CPU相对较少,但是一个很受欢迎的应用程序的信令服务器可能需要处理来自不同位置的大量消息,而且并发性较高。有大量流量的WebRTC应用程序需要能够处理相当大负载的信令服务器。

在这里我们不会对其进行详细讨论,但是对于大容量、高性能的消息传递有很多选择,包括:

  • eXtensible Messaging and Presence Protocol(XMPP),最初是叫Jabber:一个为即时消息而开发的可用于信令的协议。服务器实现包括ejabberd和Openfire。Strophe.js等 JavaScript等客户端使用BOSH模拟双向流,但由于各种原因,BOSH可能不如WebSocket高效,同样的原因可能导致无法很好地扩展缩放。(跳离正题:Jingle是一个支持语音和视频的XMPP扩展;WebRTC项目使用来自libjingle库,一个Jingle的C++实现,的网络和传输组件。)
  • 开源的库,如ZeroMQ(TokBox在他们的Rumor服务中使用它)和OpenMQ。NullMQ通过WebSocket使用STOMP协议将ZeroMQ概念应用于Web平台。
  • 使用WebSocket的商业云消息平台(尽管可能会回退到长轮询),例如Pusher,Kaazing和PubNub。(PubNub也有WebRTC的API)
  • 像vLine这样的商业WebRTC平台。

在Node上使用Socket.io构建信令服务

下面是一个简单的Web应用程序的代码,它使用Node上的Socket.io构建的信令服务。Socket.io的设计使构建服务、交换信息变得简单,而且因为它内置了“房间”的概念,Socket.io特别适用于WebRTC 信令。这个例子不是为了扩大产品级别的信令服务而设计的,但是对于相对较少的用户来说效果很好。

Socket.io使用带有回退的WebSocket:AJAX长轮询,AJAX多部分流,Forever Iframe和JSONP轮询。它已被移植到各种后端,但也许最有名的是它的Node版本,我们将在下面的例子中使用它。

在这个例子中没有WebRTC:它的设计目的只是为了展示如何在一个Web应用程序中构建信令。查看控制台日志以查看客户端加入房间并且交换消息时发生的情况。我们的WebRTC codelab提供了分步说明,解释了如何将这个例子集成到一个完整的WebRTC视频聊天应用程序。

下面是客户端, index.html:

1
2
3
4
5
6
7
8
9
10
<!DOCTYPE html>
<html>
<head>
<title>WebRTC client</title>
</head>
<body>
<script src='/socket.io/socket.io.js'></script>
<script src='js/main.js'></script>
</body>
</html>

以及客户端中引用的JavaScript文件main.js::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const isInitiator;
room = prompt('Enter room name:');
const socket = io.connect();
if (room !== '') {
console.log('Joining room ' + room);
socket.emit('create or join', room);
}
socket.on('full', (room) => {
console.log('Room ' + room + ' is full');
});
socket.on('empty', (room) => {
isInitiator = true;
console.log('Room ' + room + ' is empty');
});
socket.on('join', (room) => {
console.log('Making request to join room ' + room);
console.log('You are the initiator!');
});
socket.on('log', (array) => {
console.log.apply(console, array);
});

完整的服务端app:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
const static = require('node-static');
const http = require('http');
const file = new(static.Server)();
const app = http.createServer(function (req, res) {
file.serve(req, res);
}).listen(2013);
const io = require('socket.io').listen(app);
io.sockets.on('connection', (socket) => {
// convenience function to log server messages to the client
function log(){
const array = ['>>> Message from server: '];
for (const i = 0; i < arguments.length; i++) {
array.push(arguments[i]);
}
socket.emit('log', array);
}
socket.on('message', (message) => {
log('Got message:', message);
// for a real app, would be room only (not broadcast)
socket.broadcast.emit('message', message);
});
socket.on('create or join', (room) => {
const numClients = io.sockets.clients(room).length;
log('Room ' + room + ' has ' + numClients + ' client(s)');
log('Request to create or join room ' + room);
if (numClients === 0){
socket.join(room);
socket.emit('created', room);
} else if (numClients === 1) {
io.sockets.in(room).emit('join', room);
socket.join(room);
socket.emit('joined', room);
} else { // max two clients
socket.emit('full', room);
}
socket.emit('emit(): client ' + socket.id +
' joined room ' + room);
socket.broadcast.emit('broadcast(): client ' + socket.id +
' joined room ' + room);
});
});

(你不需要去学习node-static;它只是碰巧在这个例子里用到)

在localhost上运行这个程序,你需要安装Node,socket.io和node-static。Node可以从nodejs.org下载。要安装socket.io和node-static,请从你的应用程序目录中的终端运行Node Package Manager:

1
2
npm install socket.io
npm install node-static

启动服务器,在应用目录下运行下面的命令:

1
node server.js

在浏览器中打开localhost:2013。在任何浏览器中打开新的标签页或窗口,然后再次打开localhost:2013。如果想要查看发生了什么,请查看控制台:在Chrome和Opera中,可以通过Command-Option-J或Ctrl-Shift-J通过DevTools访问。

不管你选择使用什么方式进行信号传输,后端和客户端app都至少需要提供类似于此示例的服务。

信令陷阱

  • 在setLocalDescription()被调用之前,RTCPeerConnection并不会开始收集候选:这是在JSEP IETF草案中规定的。

  • 利用Trickle ICE(见上文):候选到达后立即调用addIceCandidate()。

现成的信令服务器

如果你不想自己做WebRTC 信令服务器,有一些现成的服务器可用,它们可以使用上面例子中使用Socket.io,并与WebRTC客户端 JavaScript库集成:

  • webRTC.io:WebRTC最先出现的几个抽象库之一。
  • easyRTC:完整的WebRTC包。
  • Signalmaster:一个与SimpleWebRTC JavaScript客户端库一起使用的信号服务器。

如果你一点代码都不想写的话,还可以从像vLine,OpenTok和Asterisk等公司获得完整的商业WebRTC平台。

爱立信在WebRTC早期的时候建立了一个在Apache上使用PHP的信令服务器。虽说这现在已经过时了,但是如果你正在考虑类似的东西,那么还是值得看一下代码的。

信令安全

安全是无所作为的艺术。

— Salman Rushdie

加密对于所有WebRTC组件来说都是强制性的。

但是信令机制并不是由WebRTC标准定义的,所以保护信令安全的责任就全在你的身上了。如果攻击者设法劫持了信令,他们就可以停止会话,重新定向连接和记录,更改或注入其他内容。

确保信令安全的最重要因素是使用安全协议,即HTTPS和WSS(即TLS),确保消息不会因未加密而截获。另外要小心,不要以可以被其他呼叫方能够获取的方式用同一个信令服务器广播信令消息。

在信令之后:使用 ICE来对付NAT和防火墙

对于元数据信令,WebRTC应用程序使用中介服务器,但对于实际的媒体和数据流,一旦建立对话的话,RTCPeerConnection就会尝试点对点地直接连接客户端。

在简单的情况中,每个WebRTC端点都有一个唯一的地址,可以与其他端进行交换以便直接通信。

world without nat

实际上大多数设备都是处在一层或者多层NAT之后的,其中有一些包含可以阻挡某些端口和协议的防病毒软件,还有很多设备是在代理和公司防火墙之后的。防火墙和NAT实际上可以由相同的设备实现,比如说家庭WiFi路由器。

nat_real_world

WebRTC应用程序可以使用ICE框架来消除实际网络的复杂性。为了实现这一点,你的应用程序必须将 ICE服务器的URL传递给RTCPeerConnection,就像下面所描述的那样。

ICE试图找到连接对方的最佳途径。它会并行地尝试所有可能性,并选择最有效的选项。 ICE首先尝试使用从设备操作系统和网卡获取的主机地址进行连接;如果不成功的话(对于NAT后面的设备就会失败), ICE会使用 STUN服务器获取外部地址,如果还是失败的话,则通过 TURN中继服务器路由数据。

换句话说:

  • STUN服务器是用来获取外部地址的。

  • TURN服务器是用来在直接连接(点到点)失败的情况下进行中继数据流量的

每个 TURN服务器都支持 STUN: TURN服务器也是一个增加了内置中继功能的 STUN服务器。 ICE还可以应付NAT设置的复杂性:实际上,NAT“打孔”可能不仅仅需要一个公共IP:端口地址。

STUN 和/或 TURN服务器的URL(可选择地)由iceServers配置对象中的WebRTC应用程序指定,该配置对象是RTCPeerConnection构造函数的第一个参数。对于appr.tc来说,值看起来是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
'iceServers': [
{
'urls': 'stun:stun.l.google.com:19302'
},
{
'urls': 'turn:192.158.29.39:3478?transport=udp',
'credential': 'JZEOEt2V3Qb0y27GRntt2u2PAYA=',
'username': '28224511:1379330808'
},
{
'urls': 'turn:192.158.29.39:3478?transport=tcp',
'credential': 'JZEOEt2V3Qb0y27GRntt2u2PAYA=',
'username': '28224511:1379330808'
}
]
}

注意:上面显示的 TURN 证书是有时间限制的,在2013年9月到期。 TURN服务器运行起来很昂贵,你需要为自己的服务器付费或者找一个服务提供商。要测试证书,你可以使用候选收集样本,并检查是否获得了类型为中继的候选。

一旦RTCPeerConnection具有该信息, ICE的作用就会自动发生:RTCPeerConnection使用 ICE框架 来计算到对端之间的最佳路径,并根据需要使用 STUN和 TURN服务器。

STUN

NAT给设备提供了一个IP地址以使用专用局域网,但是这个地址不能在外部使用。由于没有公用地址,WebRTC端对端就无法进行通信。而WebRTC使用STUN来解决这个问题。

STUN服务器位于公共网络上,并且有一个简单的任务:检查传入请求的IP地址(来自运行在NAT后面的应用程序),并将该地址作为响应发送回去。换句话说,应用程序使用 STUN服务器从公共角度发现其IP:端口。这个过程使得WebRTC一端为自己获得一个可公开访问的地址,然后通过信令机制将其传递给另一端以建立直接连接。(实际上不同NAT工作方式都有所不同,可能有多个NAT层,但是原理是一样的)。

因为 STUN服务器不需要做太多的工作或者记特别多的东西,所以相对低规格的 STUN服务器就可以处理大量的请求。

根据webrtcstats.com的统计(2013年),大多数WebRTC通话都成功地使用 STUN进行连接,有86%。尽管对于防火墙之后的两端之间的呼叫以及复杂的NAT配置,成功通话量会更少一些。

stun

TURN

RTCPeerConnection尝试通过UDP建立对等端之间的直接通信。如果失败的话,RTCPeerConnection就会使用TCP进行连接。如果使用TCP还失败的话,可以用 TURN服务器作为后备,在终端之间转发数据。

重申: TURN用于中继对等端之间的音频/视频/数据流,而不是信令数据。

TURN服务器具有公共地址,因此即使对等端位于防火墙或代理之后也可以与其他人联系。 TURN服务器有一个概念上来讲简单的任务—中继数据流—但是与 STUN服务器不同的是,他们会消耗大量的带宽。换句话说, TURN服务器需要更加的强大。

turn

上图显示了 TURN的作用:单纯的 STUN没有成功建立连接,所以每个对等端还需要使用 TURN服务器。

部署 STUN和 TURN服务器

为了进行测试,Google运行了一个公共 STUN服务器 stun.l.google.com:19302,就是appr.tc所使用的那样。对于产品的 STUN/ TURN服务,我们推荐使用rfc5766-turn-server; STUN和 TURN服务器的源代码可从code.google.com/p/rfc5766-turn-server获得,该代码还提供了有关服务器安装的多个信息源的链接。Amazon Web Services的VM映像也可用。

另一个 TURN服务器是restund,提供源代码,也有AWS服务。以下是如何在Google Compute Engine上设置restund的说明。

  1. 根据需要打开防火墙,对于tcp = 443,udp/tcp = 3478
  2. 创建四个实例,每个公共IP标准一个,Standard Ubuntu 12.06映像
  3. 设置本地防火墙配置
  4. 安装工具

    1
    2
    sudo apt-get install make
    sudo apt-get install gcc
  5. 从creytiv.com/re.html安装libre

  6. 从creytiv.com/restund.html获取并解压缩restund
  7. wget hancke.name/restund-auth.patch并且使用patch – p1
  8. 对libre和restund运行make, sudo make install
  9. 根据你的需要(替换IP地址并确保它包含相同的共享密钥)对restund.conf进行调整,并复制到/etc
  10. 复制restund/etc/restund到/etc/init.d/
  11. 配置restund:
    • 设置LD_LIBRARY_PATH
    • 复制restund.conf到/etc/restund.conf
    • 设置restund.conf以使用正确的 10. IP地址
  12. 运行restund
  13. 从远端机上使用社团的客户端进行测试:./client IP:port

多方WebRTC

你可能还想查看一下Justin Uberti提出的用于访问TURN服务的REST API的IETF标准。

很容易想象媒体流的使用情况超出了简单的一对一呼叫:例如,一组同事之间的视频会议,或一个发言者和数百(数百万)个观众的公共事件。

WebRTC应用程序可以使用多个RTCPeerConnection,以便每个端点都可以连接到网格配置中的每个其他端点。这是talky.io等应用程序所采取的方法,对于只有少数几个对等端的情况来说可以很好的工作。除此之外,处理和带宽会过度消耗,对于移动客户端来说尤其是这样。

mesh_topo

或者,WebRTC应用程序可以选择一个端点以星形配置将流分配给所有其他端点。也可以在服务器上运行WebRTC端点并构建自己的重新分配机制。(webrtc.org提供了一个客户端应用示例)

从Chrome 31和Opera 18开始,来自一个RTCPeerConnection的MediaStream可以用作另一个的输入:在simpl.info/multi上有一个演示。这可以启用更灵活的体系结构,因为它使Web应用程序能够通过选择要连接的其他对等端来处理呼叫路由。

多点控制单元

大量endpoint情况的更好选择是使用多点控制单元(Multipoint Control Unit,MCU)。它是一个服务器,可以作为在大量参与者之间分发媒体的桥。MCU可以处理视频会议中的不同分辨率,编解码器和帧速率,处理转码,选择性流转发,混音或录制音频和视频。对于多方通话,需要考虑许多问题:特别是如何显示多个视频输入并混合来自多个来源的音频。

你可以购买一个完整的MCU硬件包,或者建立自己的MCU。

mcu

有几个开源的MCU软件可供选择。比如说,Licode为WebRTC做了一个开源MCU;OpenTok也有Mantis。
Several open source MCU software options are available. For example, Licode (previously know as Lynckia) produces an open source MCU for WebRTC; OpenTok has Mantis.

除了浏览器以外还有:VoIP,电话和消息

WebRTC的标准化特性使得在浏览器中运行的WebRTC应用程序与另一个通信平台运行的设备或停牌(例如电话或视频会议系统)之间建立通信成为可能。

SIP是VoIP和视频会议系统使用的信令协议。为了实现WebRTC应用程序与视频会议系统等SIP客户端之间的通信,WebRTC需要代理服务器来调解信令。信令必须通过网关流动,但一旦通信建立,SRTP流量(视频和音频)就可以直接流向对等端。

公共交换电话网(PSTN)是所有“普通老式”模拟电话的电路交换网络。对于WebRTC应用程序和电话之间的通话,通信必须通过PSTN网关。同样,WebRTC应用程序需要中间的XMPP服务器来与Jingle端点(如IM客户端)进行通信。Jingle由Google开发,作为XMPP的扩展,为语音和视频提供消息传递服务:当前的WebRTC实现是基于C++ libjingle库的,这是一个最初为Google Talk开发的Jingle实现。

许多应用程序,库,和平台利用WebRTC与外部世界的沟通能力:sipML5,jsSIP,Phono,Zingaya,Twilio和Uberconference等等。

sipML5开发者也构建了webrtc2sip网关。Tethr和Tropo展示了一个在灾难通信框架,使用OpenBTS单元通过WebRTC实现手机和计算机之间的通信。这是一个没有运营商在中间的电话通信!

[翻译]使用一个新的hash一致性算法提升负载均衡

发表于 2018-07-19 | 分类于 算法

原文: Improving load balancing with a new consistent-hashing algorithm

我们在云上运行Vimeo的动态视频打包器Skyfire,每天服务近10亿个DASH和HLS请求。 这个请求量是非常大的! 我们对它的表现非常满意,但将其扩展到适应今天的流量以及更高的流量是一个有趣的挑战。 今天我想谈谈一个新的算法,有限负载一致性哈希(bounded-load consistent hashing),以及它是如何消除我们视频传输的瓶颈的。

动态打包

Vimeo的视频文件存储为MP4文件,与浏览器中用于下载或“渐进式”播放的格式相同。但是,DASH和HLS不使用单个文件 - 它们使用单​​独的视频短片段。当播放器请求某个片段时,Skyfire会动态处理该请求。它仅获取MP4文件的必要部分,针对DASH或HLS格式进行一些调整,并将结果发送回用户。

但是,当播放器请求(例如,文件的第37段)时,Skyfire如何知道需要获取哪些字节?它需要查看一个索引,该索引知道所有关键帧的位置以及文件中的所有数据包。在索引可以被查询之前,你需要生成它。这需要至少一个HTTP请求和一点CPU时间 - 或者,对于很长的视频,需要大量的CPU时间。由于我们对同一视频文件的请求很多,因此缓存索引并在以后重复使用是有意义的。

当我们第一次在现实世界中开始测试Skyfire时,我们采用了一种简单的缓存方法:我们将索引缓存在生成它们的云服务器的内存中,并在HAProxy中使用一致性哈希将相同视频文件的请求发送到相同的云服务器。这样,我们可以重用缓存的数据。

理解一致性哈希

在继续前进之前,让我们深入研究一下一致性哈希,这是一种在多个服务器之间分配负载的技术。如果你已经熟悉一致性哈希,请随时跳转到下一部分。

要使用一致性哈希在服务器之间分发请求,HAProxy会获取部分请求的哈希值(我们使用的是包含视频ID的URL的一部分),并使用该哈希值来选择可用的后端服务器。使用传统的“模状哈希”,您只需将请求哈希值视为一个非常大的数字。如果以可用服务器数量为模数,则获取的是要使用的服务器的索引。这很简单,只要服务器列表稳定,它就能很好地工作。但是,当添加或删除服务器时,会出现问题:大多数请求将散列到与之前不同的服务器。如果你有九台服务器并且添加了十分之一,那么只有十分之一的请求会(通过运气)散列到与之前相同的服务器。

所以有了一致性哈希。一致性哈希使用更复杂的方案,其中每个服务器根据其名称或ID分配多个哈希值,并且每个请求都根据“最近”哈希值分配给服务器。这种增加的复杂性的好处是,当添加或删除服务器时,大多数请求将映射到他们之前执行的相同服务器。因此,如果您有九台服务器并添加十分之一,则大约1/10的请求将落在新添加的服务器哈希值附近,而另外9/10将落到与之前相同的最近服务器。好多了!因此,一致性哈希使我们可以添加和删除服务器,而不会完全干扰每个服务器所拥有的缓存。当这些服务器在云中运行时,这是一个非常重要的属性。

一致性哈希 - 不理想的负载均衡

但是,一致性哈希有其自身的问题:请求分布不均匀。由于其数学属性,当请求的分布均匀时,一致性哈希仅平衡负载以及为每个请求随机选择服务器。但是,如果某些内容比其他内容(互联网中很常见)更受欢迎,那可能会很糟糕。一致性哈希会将该流行内容的所有请求发送到相同的服务器,比其他服务器接收更多流量。这可能导致服务器过载,视频播放效果不佳以及用户不满意。

到2015年11月,由于Vimeo已经准备好向一群精心挑选的成员推出Skyfire,我们认为这个超载问题太严重,不容忽视,所以我们改变了缓存使用方法。我们在HAProxy中使用了“least connections”的负载均衡策略,而不是基于一致性哈希的均衡,因此负载将在服务器之间均匀分配。我们使用memcache添加了一个二级缓存,在服务器之间共享,这样一个服务器生成的索引可以被另一个服务器检索。共享缓存需要一些额外的带宽,但负载在服务器之间更均衡地平衡。这就是我们第二年愉快运行的方式。

两者都有不是更好吗?

为什么没有办法说“使用一致性哈希,但请不要超载任何服务器”?早在2015年8月,我就试图提出一种算法,该算法基于两个随机选择的功能,这样做可以做到这一点,但是一些模拟表明它不起作用。向非理想服务器发送了太多请求。我很失望,但是我们没有浪费时间去拯救它,而是采用了上面最少的连接和共享缓存方法。

快进到2016年8月。我注意到不可估量的Damian Gryski推文中的一个URL,这是一篇名为Consistent Hashing with Bounded Loads的arXiv论文。我阅读了摘要,它似乎正是我想要的:一种算法,它将一致性哈希与任何一台服务器负载的上限相结合,相对于整个池的平均负载。我读了这篇论文,算法非常简单。实际上,该论文表示:

虽然一致性哈希搭配转发来满足容量限制的想法似乎非常明显,但似乎以前没有被考虑过。

有界负载算法(The bounded-load algorithm)

这是算法的简化草图。遗漏了一些细节,如果你打算自己实现它,你一定要去原始论文获取信息。

首先,定义一个大于1的平衡因子c。c控制服务器之间允许的不平衡程度。例如,如果c = 1.25,则服务器不应超过平均负载的125%。在c增加到∞的极限中,算法变得等效于普通一致性哈希,没有平衡;当c减小到接近1时,它变得更像是最少连接策略,并且哈希变得不那么重要。根据我的经验,1.25和2之间的值更适用于实际场景。

当请求到达时,计算平均负载(未完成请求的数量,m,包括刚刚到达的请求数除以可用服务器数n)。将平均负载乘以c得到“目标负载”,t。在原始论文中,将容量分配给服务器,以便每个服务器的容量为⌊t⌋或⌈t⌉,总容量为⌈cm⌉。因此,服务器的最大容量为⌈cm/n⌉,大于平均负载的c倍,小于1个请求。为了支持给服务器不同的“权重”,正如HAProxy所做的那样,算法必须略有改变,但精神是相同的 - 没有服务器可以超过负载份额1个请求。

分发一个请求时,像往常一样计算其哈希值和最近的服务器。如果该服务器负载低于其容量,则将请求分配给该服务器。否则,转到哈希环中的下一个服务器并检查其负载,继续,直到找到有剩余容量的服务器。肯定有一个服务器符合条件,因为最高容量高于平均负载,并且每个服务器的负载不可能高于平均值。这保证了一些不错的东西:

  1. 不允许服务器负载超过(平均负载 * c 加 1) 个请求。
  2. 只要服务器未过载,请求的分配策略与一致性哈希相同。
  3. 如果服务器过载,则所选择的回退服务器列表对于相同的请求哈希将是相同的 - 即,相同的服务器将始终是流行的内容的“第二选择”。这对缓存很有用。
  4. 如果服务器过载,则回退服务器列表对于不同的请求哈希值通常会有所不同 - 即,过载服务器的溢出负载将在可用服务器之间分配,而不是全部落到某个服务器上。这取决于每个服务器在一致性哈希环中分配多少个点。

实际应用的结果

在模拟器中测试算法并获得比我的简单算法更积极的结果后,我开始搞清楚如何将其hack到HAProxy。向HAProxy添加代码并不算太糟糕。HAProxy代码非常干净,组织良好,经过几天的工作,我得到了一些运行良好的程序,我可以通过它重放一些流量,观察算法的作用。它奏效了!数学证明和模拟是很好的,但是直到你看到真正的流量击中正确的服务器才能真正相信。

有了这个成功,我在9月份向HAProxy发送了一个概念验证补丁。 HAProxy维护者Willy Tarreau(非常高兴能与之合作),认识到算法的价值。他并没有告诉我我的补丁有多糟糕。他做了彻底的代码审查,并提供了一些非常有价值的反馈。我花了一点时间来处理这些建议并把事情搞定了,几周之后我就有了一个精美的版本准备发送到列表中。还有一些小的调整,它在10月26日发布的HAProxy 1.7.0-dev5及时被接受。11月25日,HAProxy 1.7.0被指定为稳定版本,因此现在普遍可以使用有限负载一致性哈希。

但我确定你想知道的是,我们从这一切中获得了什么?

这是更改HAProxy配置之前和之后的缓存行为图。

haproxy conf

每日变化是由弹性伸缩引起的:在白天,流量会增加,因此我们启动更多服务器来处理它,本地缓存只能解决更少的请求。在晚上,流量较少,因此关闭服务器,本地缓存性能有所提升。切换到有界负载算法后,无论运行多少台服务器,都会有更大比例的请求到达本地缓存。

下面是共享缓存带宽在同一时间的图表:在更改之前,每个memcache服务器在高峰时段的出站带宽达到400到500 Mbit / s(总共大约8Gbit / s)。改进算法之后,变化较小,服务器带宽保持在100 Mbit / s以下。

bandwidth

从响应时间的角度来看,我们没有画出性能提升的图。为什么?因为他们保持几乎完全相同。最少连接策略在保持服务器不会过载方面做得很好,从memcache中获取内容的速度足够快,以至于它对响应时间没有可测量的影响。但是现在更少部分的请求会依赖于共享缓存,并且由于该部分不依赖于我们运行的服务器数量,因此我们可以期待在不使memcached服务器饱和的情况下处理更多流量。此外,如果memcache服务器出现故障,它对Skyfire的整体影响将会大大降低。

总而言之,很高兴看到一点算法工作将单点问题变得更好。

[翻译]不使用第三方库

发表于 2018-05-15 | 分类于 go

原文

译者:不少golang开源项目的Contrib Guidelines里会表示在提交PR时尽量不要引入第三方的包,这篇文章正是阐述了这样一个观点。原文标题下还有一行是:”我如何不再担心包管理并开始喜欢上无版本的包管理”,可以作为本文的另一个标题。也许很多人并不认同本文的观点,甚至认为是胡说八道,但是确实有很多项目在使用这样一个规则:influxdb的coding guidlines明确表示Try not to use third-party libraries,web框架gin的router实现copy了github.com/julienschmidt/httprouter的代码实现并做了少量更改,几乎所有的大型golang程序都会自己实现一些通用的函数,例如slice的union、unique等操作,自己扩展log的程序也有很多,比如nsq。


如果你看过golang-nuts邮件列表,会发现包管理比任何其他话题都容易引起争论。在我刚开始写Go的时候同样认为缺少一个真正的包管理是一个明显的失误,但是,当我写的Go代码越多我越是开始欣赏go get的简单易用。

版本化软件包的好处很明显。它使团队中的每个人都有一份一致的代码,并且大多数时候包的版本号采用固定的模式(语义化版本),可以传达API的变更信息。但是包版本控制并非没有缺点。

在这篇文章中,我们将来探讨应用程序开发人员可用的选项,库开发人员的职责以及无版本包管理的副作用。

只使用标准库

许多第三方Go库试图提供较现有的标准库Package更丰富的实现。然而,很多时候,他们的功能集变得太大而无法正确测试,并且需要新的开发人员了解额外的框架,从而增加了项目的认知开销。

在GitHub上有200多个Go Web框架,我一个都没用过。基于HTTP的项目有这样一系列的要求和权衡,我很欣赏net/http提供的功能和实用性的平衡。如果我的应用程序需要中间件,那么写一个适合我的项目的http.Handler是很简单的。没有万能的Web框架。

日志记录(Logging)是另一个少即是多(less is more)的领域。 GitHub上有超过500多个Go日志库,但我更喜欢标准日志包(log)的简单。它提供了一个小型的、固定范围的库,将时间戳信息打印到标准错误。这就是我需要的。添加其他功能,如日志等级,让开发人员选择何时使用DEBUG而不是INFO级别,会增加剩余代码的复杂性。如果足够重要,那就应该记录日志。

Ctrl-C, Ctrl-V

DRY的坚定支持者可能会对此建议感到畏缩,但多次将小部分代码复制到项目中可能是导入整个库的更好选择。 如果需要,将小功能复制过来使你可以根据自己的特定需求对其进行调整,并随时扩展它们。 它们还限制了项目中其他开发人员所需的知识,因为他们不需要读取另一个库的完整文档来了解依赖关系。

几个月前,我发布了一个没有.go文件的Go测试库,因为它足够小,我希望用户只需复制他们需要的功能。 另一个很好的例子是使用一些简单的算法,如一致的哈希。 很多时候他们少于50行代码。 在复制任何代码之前,请查看包的证书。

固定范围的库

尽管我试图坚持使用标准库,但有时候我还是需要使用第三方库。因为go get不提供版本控制,所以与使用像rubygems等工具时相比,我对第三方库的看法非常不同。

在Ruby中,在引用程序中肆意添加第三方库很常见。一个三年的Rails项目很容易在其Gemfile中拥有50个依赖项。随着时间的推移,我需要升级库,毫无疑问,它们的功能或API会改变,并与我使用的其他库冲突。所以我将不得不升级那些可能导致更多冲突的其他库。

那时我意识到一个有30个版本的库实际上是30个独立的库。

在Go中,我尝试编写和使用具有固定范围的库,因此他们只需要一个版本。例如,css库实现了W3C的CSS语法模块级别3规范,因此其范围是固定的。一旦实施和测试,变更和bug修复都会很小。

另一个例子是Bolt库。它的目标是提供一个简单,快速的事务密钥/值存储。通过限制它的问题空间,Bolt能够保持最小的API并经过良好测试。经过近六个月的各种生产系统运行没有问题,Bolt升级到了1.0版并停止了开发。该项目不会被放弃 - 它只是已经完成了。添加功能会影响项目的稳定性。 Bolt的下一个版本不会是2.0,它会被简单地称为别的东西。

高质量建立信任&信任建立社区

无版本软件包管理的一个意想不到的副作用是,我现在需要知道并信任编写我使用的库的开发人员。 正因为如此,我在Go社区的参与度比我在任何其他语言社区的参与度都要高。

我定期阅读我使用的项目源代码,看看Package维护人员的风格和项目质量是否与我期望的一致。它让我对社区中的开发者更加赞赏。了解某人的代码是非常个人化的事,我相信这是Go社区蓬勃发展的一个小原因。

结论

Go语言提供了惊人的高质量和深思熟虑的通用库的实现。很多时候,这是项目所需的一切。 很多时候你也会需要超出标准库范围的功能。在这些情况下,我希望你能花时间了解一下你使用的库的代码和开发人员。

nsqd源码分析和代码流程图

发表于 2018-04-27 | 分类于 go

nsqd的代码量在1w行左右,包含了项目下的nsqd,internal目录,以及go-diskqueue项目,代码量虽然不大,但是开启了各种goroutine,有各种channel用于通信,应该算得上有点复杂了。

看源码的时候建议分模块来看,topic的功能,channel的功能,延迟消息,超时重传,写入磁盘,与nsqlookdup的通信等。可以从入口函数先了解个大概,不建议一开始就一行行细看,goroutine和channel很多,非常容易绕进去。

我画了一张代码的流程图(图在最下面),供大家参考,图中的节点是我觉得比较重要的地方。这张图本身就有点乱(一方面我水平有限,另一方面代码确实略复杂),看上去各种线飞来飞去,但是我私认为还是反映了nsqd的代码流程的,图中的线确实代表了节点之间的一些关系,如果去掉,反而可能会有误导。

流程图只是一个辅助工具,觉得不好的同学大可以不看,这并不会妨碍你对代码的理解。详细的对于代码的注释参考这里,网上有不少对于源码的分析,但大多只是某一个模块的,这应该算是一个比较全面的注释了。

有问题欢迎提出:-D

nsqd流程图

database/sql 一点深入理解

发表于 2018-03-30 | 分类于 go

我们主要来分析一下查询的实现,涉及的是database/sql和go-sql-driver/mysql的部分源码。database/sql是go对于db抽象出的一个标准库,go-sql-driver/mysql是实现了database/sql驱动接口的mysql驱动。

什么是数据库驱动?

简单来讲,数据库驱动实现了mysql协议,比如连接数据库,驱动会给数据库服务器发送握手初始化报文和登陆认证报文,拼出报文头,将username,password放在报文的固定位置,将[]byte数据写入到socket,这就是驱动的主要功能,他给我们(这里指database/sql)完成了底层的操作

查询的接口主要有两个:

1
2
3
4
5
6
7
8
9
10
/*
执行一个查询并返回多个数据行, 这个查询通常是一个 SELECT 。 方法的 arg 部分用于填写查询语句中包含的占位符的实际参数。
*/
func (db *DB) Query(query string, args ...interface{}) (*Rows, error)
/*
执行一个预期最多只会返回一个数据行的查询。
这个方法总是会返回一个非空的值, 而它引起的错误则会被推延到数据行的 Scan 方法被调用为止。
*/
func (db *DB) QueryRow(query string, args ...interface{}) *Row

再来看看返回值的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Rows struct {
dc *driverConn // owned; must call releaseConn when closed to release
releaseConn func(error)
rowsi driver.Rows
cancel func() // called when Rows is closed, may be nil.
closeStmt *driverStmt // if non-nil, statement to Close on close
closemu sync.RWMutex
closed bool
lasterr error // non-nil only if closed is true
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not to be called concurrently.
lastcols []driver.Value
}
// Row is the result of calling QueryRow to select a single row.
type Row struct {
// One of these two will be non-nil:
err error // deferred error for easy chaining
rows *Rows
}

可以明显地看到Row将Rows包了一层,加了一个err字段,上面注释中说的而它引起的错误则会被推延到数据行的 Scan 方法被调用为止也就可以理解了,错误在error上而没有直接返回。

个人认为看源码最好带着问题看,有目的往往更能坚持。我们先来看一段最基本的调用代码,看看能不能找出几个我们感兴趣的问题:

1
2
3
4
5
6
7
8
9
rows, err := db.Query("SELECT * FROM User WHERE id=?", id_param)
...
defer rows.Close()
for rows.Next() {
var id int
var name string
err = rows.Scan(&id, &name)
...
}

我的问题来了:

  1. 这里query用的sql格式看上去是prepared statement,我们知道prepared statement是可以防注入的。我们也知道不能够直接拼接字符串,直接拼接定会引入注入问题。那么问题来了,prepared statement会增加与服务器的交互,影响性能,是否能不使用prepared statement,如果不使用,像上面这样的query能否防注入呢?另外上面我们也只是说了看上去是,底层是否真的在使用prepared statement?

    要想使用prepared statement在数据库提前编译,复用的特性需要在客户端做很多事,比如JDBC就实现了一整套方案。目前看来database/sql没有这样的实现,所以这里可以忽略编译的优势。

  2. 这个for的写法真是奇怪,简直看不懂到底在遍历什么,每次都是rows.Scan, rows里面到底存了些什么可以这么玩

下面我们从这些问题出发来研究一下实现,我们会忽略掉与问题无关的部分(比如连接池),只关注我们需要的链路。

Prepared Statement & 防注入

Query之后还有几个函数调用才能到下面的函数,包括了取连接和错误处理的逻辑,这些对我们的问题没有影响,暂且忽略它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
// 我们的driver实现了这个接口,这里可以走进去
if queryer, ok := dc.ci.(driver.Queryer); ok {
dargs, err := driverArgs(dc.ci, nil, args)
if err != nil {
releaseConn(err)
return nil, err
}
var rowsi driver.Rows
withLock(dc, func() {
rowsi, err = ctxDriverQuery(ctx, queryer, query, dargs) // 尝试不使用`Prepared Statement`来执行
})
// 这个错误非常重要,不想使用`Prepared Statement`,上面就不能返回这个错误
if err != driver.ErrSkip {
if err != nil {
releaseConn(err)
return nil, err
}
// Note: ownership of dc passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
}
rows.initContextClose(ctx, txctx)
return rows, nil
}
}
// 下面将会使用Prepared Statement,如果不想使用就要保证上面的代码能顺利return
var si driver.Stmt
var err error
withLock(dc, func() {
si, err = ctxDriverPrepare(ctx, dc.ci, query)
})
.....
}

从上面的注释可以看到,Query默认不使用prepared statement,但是要保证正常查询不返回driver.ErrSkip,一旦返回这个错误,则会使用prepared statement继续查询。再往下跟我们将会走到driver中,我们来看看真实的query,在go-sql-driver/mysql的connection.go。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (mc *mysqlConn) query(query string, args []driver.Value) (*textRows, error) {
if mc.closed.IsSet() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
// 考虑到注入问题所以一定有args,将会走进if
if len(args) != 0 {
if !mc.cfg.InterpolateParams {
return nil, driver.ErrSkip // 我们就是不想返回这个错误,所以要保证mc.cfg.InterpolateParams为true
}
// try client-side prepare to reduce roundtrip
prepared, err := mc.interpolateParams(query, args) // 这里将进行插值,能不能防注入就看这里了
if err != nil {
return nil, err
}
query = prepared
}
// Send command
err := mc.writeCommandPacketStr(comQuery, query)
......
}

InterpolateParams从字面上看意思是是否可以插值,它其实是dsn的一个参数,见dsn.go

1
2
3
4
5
6
7
// Enable client side placeholder substitution
case "interpolateParams":
var isBool bool
cfg.InterpolateParams, isBool = readBool(value)
if !isBool {
return errors.New("invalid bool value: " + value)
}

这里我们就清楚了,如果不想使用prepared statement,就要在dsn中加入interpolateParams=true,允许驱动进行对sql进行插值。下一个问题,这里的插值能不能防注入?我们来看interpolateParams方法,我们只看参数为string的情况:

1
2
3
4
5
6
7
8
case string:
buf = append(buf, '\'')
if mc.status&statusNoBackslashEscapes == 0 {
buf = escapeStringBackslash(buf, v) // 将会走到这里
} else {
buf = escapeStringQuotes(buf, v)
}
buf = append(buf, '\'')

从函数名我相信大家已经知道了,我们将会对字符串参数进行转义,而转义特殊字符是可以防止注入的,不妨再看看escapeStringBackslash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func escapeBytesBackslash(buf, v []byte) []byte {
pos := len(buf)
buf = reserveBuffer(buf, len(v)*2)
for _, c := range v {
switch c {
case '\x00':
buf[pos] = '\\'
buf[pos+1] = '0'
pos += 2
case '\n':
buf[pos] = '\\'
buf[pos+1] = 'n'
pos += 2
case '\r':
buf[pos] = '\\'
buf[pos+1] = 'r'
pos += 2
case '\x1a':
buf[pos] = '\\'
buf[pos+1] = 'Z'
pos += 2
case '\'':
buf[pos] = '\\'
buf[pos+1] = '\''
pos += 2
case '"':
buf[pos] = '\\'
buf[pos+1] = '"'
pos += 2
case '\\':
buf[pos] = '\\'
buf[pos+1] = '\\'
pos += 2
default:
buf[pos] = c
pos++
}
}
return buf[:pos]
}

一目了然。上面的分析基本解决了我们的第一个问题:dsn中加入interpolateParams=true可以不使用prepared statement,将sql的参数传入Query方法可以防止注入,防注入是驱动通过转义特殊字符来实现的。

第二个问题我们可以接着第一个问题的代码往下看,上面我们看了拼接好sql,下面就是真正的查询了,代码在statement.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (stmt *mysqlStmt) query(args []driver.Value) (*binaryRows, error) {
if stmt.mc.closed.IsSet() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
// Send command
// 这个方法背后就是实现了`客户端命令请求报文`,非常值得一看,但是我们这里关心的是返回值,暂且跳过
err := stmt.writeExecutePacket(args)
if err != nil {
return nil, stmt.mc.markBadConn(err)
}
mc := stmt.mc
// Read Result
// 这里虽然说是read result,但是只是根据协议读出了列的数量,用于下面的readColumns
// 注意,即使数据库中没有找到对应的记录,数据库仍然会将字段的信息返回,只是在返回的报文中
// Row Data没有数据
resLen, err := mc.readResultSetHeaderPacket()
if err != nil {
return nil, err
}
rows := new(binaryRows)
if resLen > 0 {
rows.mc = mc
rows.rs.columns, err = mc.readColumns(resLen) // 这里会将字段名(列名)读出然后存起来
} else {
rows.rs.done = true
switch err := rows.NextResultSet(); err {
case nil, io.EOF:
return rows, nil
default:
return nil, err
}
}
// 竟然就这么返回了
return rows, err
}

看到了吗,query最后返回的时候,只是把字段名读出来了而已,根本没有读到我们需要的数据库中的记录。不妨大胆猜测:for Next将会每次从buffer中读出一条记录,将其赋值给某个变量,Scan就是在解析这个变量。让我们回到database/sql,在sql.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (rs *Rows) nextLocked() (doClose, ok bool) {
if rs.closed {
return false, false
}
if rs.lastcols == nil {
rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns())) // 哇哈哈,我想就是存在这里了
}
// Lock the driver connection before calling the driver interface
// rowsi to prevent a Tx from rolling back the connection at the same time.
rs.dc.Lock()
defer rs.dc.Unlock()
rs.lasterr = rs.rowsi.Next(rs.lastcols) // 这里应该就是在读数据然后存到rs.lastcols
if rs.lasterr != nil {
......
}
return false, true
}

再回到go-sql-driver/mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// rows.go
func (rows *binaryRows) Next(dest []driver.Value) error {
if mc := rows.mc; mc != nil {
if err := mc.error(); err != nil {
return err
}
// Fetch next row from stream
// 读数据并存到dest中,readRow中实现了针对Row Data 结构的解析
// 复杂但真的值得一看
return rows.readRow(dest)
}
return io.EOF
}

来来回回啊,再看database/sql的sql.go

1
2
3
4
5
6
7
8
9
10
11
func (rs *Rows) Scan(dest ...interface{}) error {
......
// 看到了吧,就是lastcols,遍历lastcols,将字段值放入到dest中
for i, sv := range rs.lastcols {
err := convertAssign(dest[i], sv)
if err != nil {
return fmt.Errorf("sql: Scan error on column index %d: %v", i, err)
}
}
return nil
}

至此我们的第二个问题也解决了,Query返回的时候只是取出了字段信息,真实的数据库记录还留在buffer中,for循环Next,每次从buffer中读取一条记录,存在rows结构体的lastcols字段中,调用Scan的时候就是从lastcols取出值。

上面只是对大致流程的分析,里面还有大量的细节没有涉及到,特别是对于协议的实现,非常值得一看。

本文如有错误,欢迎联系O(∩_∩)O

路由查找之Radix Tree

发表于 2018-02-10 | 分类于 go

什么是Radix Tree

在计算机科学中,基数树,或称Patricia trie/tree,或crit bit tree,压缩前缀树,是一种更节省空间的Trie(前缀树)。对于基数树的每个节点,如果该节点是唯一的子树的话,就和父节点合并。

golang的web框架echo和gin都使用了radix tree作为路由查找的算法,我们以gin的实现来分析一下。

在gin的路由中,每一个Http Method(GET, PUT, POST…)都对应了一棵 radix tree

1
2
3
4
5
6
7
8
9
10
11
12
func (engine *Engine) addRoute(method, path string, handlers HandlersChain) {
// ...
// 获取method对应的树,如果没有就创建
root := engine.trees.get(method)
if root == nil {
// 创建radix tree,只有根节点
root = new(node)
engine.trees = append(engine.trees, methodTree{method: method, root: root})
}
root.addRoute(path, handlers)
}

radix tree可以被认为是一棵简洁版的前缀树。拥有共同前缀的节点也共享同一个父节点。下面是一个GET方法对应的路由树的结构:

1
2
3
4
5
6
7
8
9
10
11
Priority Path Handle
9 \ *<1>
3 ├s nil
2 |├earch\ *<2>
1 |└upport\ *<3>
2 ├blog\ *<4>
1 | └:post nil
1 | └\ *<5>
2 ├about-us\ *<6>
1 | └team\ *<7>
1 └contact\ *<8>

*<num>是方法(handler)对应的指针,从根节点遍历到叶子节点我们就能得到完整的路由表,图中的示例实现了以下路由:

1
2
3
4
5
6
7
8
GET("/", func1)
GET("/search/", func2)
GET("/support/", func3)
GET("/blog/", func4)
GET("/blog/:post/", func5)
GET("/about-us/", func6)
GET("/about-us/team/", func7)
GET("/contact/", func8)

:post是真实的post name的一个占位符(就是一个参数)。这里体现了radix tree相较于hash-map的一个优点,树结构允许我们的路径中存在动态的部分(参数),因为我们匹配的是路由的模式而不是hash值

为了更具扩展性,每一层的节点按照priority排序,priority是节点的子节点(儿子节点,孙子节点等)注册的handler的数量,这样做有两个好处:

  1. 被最多路径包含的节点会被最先评估。这样可以让尽量多的路由快速被定位。
  2. 有点像成本补偿。最长的路径可以被最先评估,补偿体现在最长的路径需要花费更长的时间来定位,如果最长路径的节点能被优先评估(即每次拿子节点都命中),那么所花时间不一定比短路径的路由长。下面展示了节点(每个-可以看做一个节点)评估的路径:从左到右,从上到下
1
2
3
4
5
6
7
├------------
├---------
├-----
├----
├--
├--
└-

节点数据结构

节点的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type node struct {
// 节点路径,比如上面的s,earch,和upport
path string
// 节点是否是参数节点,比如上面的:post
wildChild bool
// 节点类型,包括static, root, param, catchAll
// static: 静态节点,比如上面的s,earch等节点
// root: 树的根节点
// catchAll: 有*匹配的节点
// param: 参数节点
nType nodeType
// 路径上最大参数个数
maxParams uint8
// 和children字段对应, 保存的是分裂的分支的第一个字符
// 例如search和support, 那么s节点的indices对应的"eu"
// 代表有两个分支, 分支的首字母分别是e和u
indices string
// 儿子节点
children []*node
// 处理函数
handlers HandlersChain
// 优先级,子节点注册的handler数量
priority uint32
}

添加路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
func (n *node) addRoute(path string, handlers HandlersChain) {
fullPath := path
n.priority++
numParams := countParams(path)
// non-empty tree
if len(n.path) > 0 || len(n.children) > 0 {
walk:
for {
// Update maxParams of the current node
if numParams > n.maxParams {
n.maxParams = numParams
}
// Find the longest common prefix.
// This also implies that the common prefix contains no ':' or '*'
// since the existing key can't contain those chars.
i := 0
max := min(len(path), len(n.path))
for i < max && path[i] == n.path[i] {
i++
}
// Split edge
// 开始分裂,比如一开始path是search,新来了support,s是他们匹配的部分,
// 那么会将s拿出来作为parent节点,增加earch和upport作为child节点
if i < len(n.path) {
child := node{
path: n.path[i:], // 不匹配的部分作为child节点
wildChild: n.wildChild,
indices: n.indices,
children: n.children,
handlers: n.handlers,
priority: n.priority - 1, // 降级成子节点,priority减1
}
// Update maxParams (max of all children)
for i := range child.children {
if child.children[i].maxParams > child.maxParams {
child.maxParams = child.children[i].maxParams
}
}
// 当前节点的子节点变成刚刚分裂的出来的节点
n.children = []*node{&child}
// []byte for proper unicode char conversion, see #65
n.indices = string([]byte{n.path[i]})
n.path = path[:i]
n.handlers = nil
n.wildChild = false
}
// Make new node a child of this node
// 将新来的节点插入新的parent节点作为子节点
if i < len(path) {
path = path[i:]
// 如果是参数节点(包含:或*)
if n.wildChild {
n = n.children[0]
n.priority++
// Update maxParams of the child node
if numParams > n.maxParams {
n.maxParams = numParams
}
numParams--
// Check if the wildcard matches
// 例如:/blog/:pp 和 /blog/:ppp,需要检查更长的通配符
if len(path) >= len(n.path) && n.path == path[:len(n.path)] {
// check for longer wildcard, e.g. :name and :names
if len(n.path) >= len(path) || path[len(n.path)] == '/' {
continue walk
}
}
panic("path segment '" + path +
"' conflicts with existing wildcard '" + n.path +
"' in path '" + fullPath + "'")
}
// 首字母,用来与indices做比较
c := path[0]
// slash after param
if n.nType == param && c == '/' && len(n.children) == 1 {
n = n.children[0]
n.priority++
continue walk
}
// Check if a child with the next path byte exists
// 判断子节点中是否有和当前path有匹配的,只需要查看子节点path的第一个字母即可,即indices
// 比如s的子节点现在是earch和upport,indices为eu
// 如果新来的路由为super,那么就是和upport有匹配的部分u,将继续分类现在的upport节点
for i := 0; i < len(n.indices); i++ {
if c == n.indices[i] {
i = n.incrementChildPrio(i)
n = n.children[i]
continue walk
}
}
// Otherwise insert it
if c != ':' && c != '*' {
// []byte for proper unicode char conversion, see #65
// 记录第一个字符,放在indices中
n.indices += string([]byte{c})
child := &node{
maxParams: numParams,
}
// 增加子节点
n.children = append(n.children, child)
n.incrementChildPrio(len(n.indices) - 1)
n = child
}
n.insertChild(numParams, path, fullPath, handlers)
return
} else if i == len(path) { // Make node a (in-path) leaf
// 路径相同,如果已有handler就报错,没有就赋值
if n.handlers != nil {
panic("handlers are already registered for path ''" + fullPath + "'")
}
n.handlers = handlers
}
return
}
} else { // Empty tree,空树,插入节点,节点种类是root
n.insertChild(numParams, path, fullPath, handlers)
n.nType = root
}
}

此函数的主要目的是找到插入节点的位置,如果和现有节点存在相同的前缀,那么要将现有节点进行分裂,然后再插入,下面是insertChild函数

插入子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// @1: 参数个数
// @2: 路径
// @3: 完整路径
// @4: 处理函数
func (n *node) insertChild(numParams uint8, path string, fullPath string, handlers HandlersChain) {
var offset int // already handled bytes of the path
// find prefix until first wildcard (beginning with ':'' or '*'')
// 找到前缀,只要匹配到wildcard
for i, max := 0, len(path); numParams > 0; i++ {
c := path[i]
if c != ':' && c != '*' {
continue
}
// find wildcard end (either '/' or path end)
end := i + 1
for end < max && path[end] != '/' {
switch path[end] {
// the wildcard name must not contain ':' and '*'
case ':', '*':
panic("only one wildcard per path segment is allowed, has: '" +
path[i:] + "' in path '" + fullPath + "'")
default:
end++
}
}
// check if this Node existing children which would be
// unreachable if we insert the wildcard here
if len(n.children) > 0 {
panic("wildcard route '" + path[i:end] +
"' conflicts with existing children in path '" + fullPath + "'")
}
// check if the wildcard has a name
if end-i < 2 {
panic("wildcards must be named with a non-empty name in path '" + fullPath + "'")
}
if c == ':' { // param
// split path at the beginning of the wildcard
if i > 0 {
n.path = path[offset:i]
offset = i
}
child := &node{
nType: param,
maxParams: numParams,
}
n.children = []*node{child}
n.wildChild = true
n = child
n.priority++
numParams--
// if the path doesn't end with the wildcard, then there
// will be another non-wildcard subpath starting with '/'
if end < max {
n.path = path[offset:end]
offset = end
child := &node{
maxParams: numParams,
priority: 1,
}
n.children = []*node{child}
// 下次循环这个新的child节点
n = child
}
} else { // catchAll
if end != max || numParams > 1 {
panic("catch-all routes are only allowed at the end of the path in path '" + fullPath + "'")
}
if len(n.path) > 0 && n.path[len(n.path)-1] == '/' {
panic("catch-all conflicts with existing handle for the path segment root in path '" + fullPath + "'")
}
// currently fixed width 1 for '/'
i--
if path[i] != '/' {
panic("no / before catch-all in path '" + fullPath + "'")
}
n.path = path[offset:i]
// first node: catchAll node with empty path
child := &node{
wildChild: true,
nType: catchAll,
maxParams: 1,
}
n.children = []*node{child}
n.indices = string(path[i])
n = child
n.priority++
// second node: node holding the variable
child = &node{
path: path[i:],
nType: catchAll,
maxParams: 1,
handlers: handlers,
priority: 1,
}
n.children = []*node{child}
return
}
}
// insert remaining path part and handle to the leaf
n.path = path[offset:]
n.handlers = handlers
}

insertChild函数是根据path本身进行分割, 将/分开的部分分别作为节点保存, 形成一棵树结构. 注意参数匹配中的:和*的区别, 前者是匹配一个字段, 后者是匹配后面所有的路径

路径查找

匹配每个children的path,最长匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Returns the handle registered with the given path (key). The values of
// wildcards are saved to a map.
// If no handle can be found, a TSR (trailing slash redirect) recommendation is
// made if a handle exists with an extra (without the) trailing slash for the
// given path.
func (n *node) getValue(path string, po Params, unescape bool) (handlers HandlersChain, p Params, tsr bool) {
p = po
walk: // Outer loop for walking the tree
for {
// 尚未到达path的终点
if len(path) > len(n.path) {
// 前面一段需要一致
if path[:len(n.path)] == n.path {
path = path[len(n.path):]
// If this node does not have a wildcard (param or catchAll)
// child, we can just look up the next child node and continue
// to walk down the tree
if !n.wildChild {
c := path[0]
for i := 0; i < len(n.indices); i++ {
if c == n.indices[i] {
n = n.children[i]
continue walk
}
}
// Nothing found.
// We can recommend to redirect to the same URL without a
// trailing slash if a leaf exists for that path.
tsr = (path == "/" && n.handlers != nil)
return
}
// handle wildcard child
n = n.children[0]
switch n.nType {
case param:
// find param end (either '/' or path end)
end := 0
for end < len(path) && path[end] != '/' {
end++
}
// save param value
if cap(p) < int(n.maxParams) {
p = make(Params, 0, n.maxParams)
}
i := len(p)
p = p[:i+1] // expand slice within preallocated capacity
p[i].Key = n.path[1:]
val := path[:end]
if unescape {
var err error
if p[i].Value, err = url.QueryUnescape(val); err != nil {
p[i].Value = val // fallback, in case of error
}
} else {
p[i].Value = val
}
// we need to go deeper!
if end < len(path) {
if len(n.children) > 0 {
path = path[end:]
n = n.children[0]
continue walk
}
// ... but we can't
tsr = (len(path) == end+1)
return
}
if handlers = n.handlers; handlers != nil {
return
}
if len(n.children) == 1 {
// No handle found. Check if a handle for this path + a
// trailing slash exists for TSR recommendation
n = n.children[0]
tsr = (n.path == "/" && n.handlers != nil)
}
return
case catchAll:
// save param value
if cap(p) < int(n.maxParams) {
p = make(Params, 0, n.maxParams)
}
i := len(p)
p = p[:i+1] // expand slice within preallocated capacity
p[i].Key = n.path[2:]
if unescape {
var err error
if p[i].Value, err = url.QueryUnescape(path); err != nil {
p[i].Value = path // fallback, in case of error
}
} else {
p[i].Value = path
}
handlers = n.handlers
return
default:
panic("invalid node type")
}
}
} else if path == n.path {
// We should have reached the node containing the handle.
// Check if this node has a handle registered.
if handlers = n.handlers; handlers != nil {
return
}
if path == "/" && n.wildChild && n.nType != root {
tsr = true
return
}
// No handle found. Check if a handle for this path + a
// trailing slash exists for trailing slash recommendation
for i := 0; i < len(n.indices); i++ {
if n.indices[i] == '/' {
n = n.children[i]
tsr = (len(n.path) == 1 && n.handlers != nil) ||
(n.nType == catchAll && n.children[0].handlers != nil)
return
}
}
return
}
// Nothing found. We can recommend to redirect to the same URL with an
// extra trailing slash if a leaf exists for that path
tsr = (path == "/") ||
(len(n.path) == len(path)+1 && n.path[len(path)] == '/' &&
path == n.path[:len(n.path)-1] && n.handlers != nil)
return
}
}

之前总听大家说数据结构与算法有什么用,工作中又用不到,上面就是一个很好的示例。我们平时还是要多关注底层原理,做后端的同学多看看框架的代码,一定受益匪浅~

go pprof 采样何时进行

发表于 2017-12-06 | 分类于 go
1
2
3
4
5
6
7
8
9
import (
_ "net/http/pprof"
)
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}

这段代码大家应该都很熟悉,go tool pprof工具的通用代码,用来做性能等分析。我这两天尝试用了一下,不免有了一个疑问:pprof是要采样的,这个采样是何时进行的?是程序启动就开始采样还是当我curl localhost:$PORT/debug/pprof/$PROFILE_TYPE开始?

为什么会考虑这个呢,主要是在看的框架里集成了pprof,默认打开,如果是程序一开始就采样,那对于性能是有损耗的。这种问题别人回答你,你也不一定相信,我们还是来看看pprof的代码吧。

标准库的代码是在$GOROOT/src下面,我们找一下pprof,在我的mac上路径是/usr/local/Cellar/go/1.9.2/libexec/src/net/http/pprof/pprof.go

首先是init函数,注册了我们用到的分析urls:

1
2
3
4
5
6
7
func init() {
http.Handle("/debug/pprof/", http.HandlerFunc(Index))
http.Handle("/debug/pprof/cmdline", http.HandlerFunc(Cmdline))
http.Handle("/debug/pprof/profile", http.HandlerFunc(Profile))
http.Handle("/debug/pprof/symbol", http.HandlerFunc(Symbol))
http.Handle("/debug/pprof/trace", http.HandlerFunc(Trace))
}

这是在程序启动就执行的,这也是我们感觉只要import就万事大吉的原因。其实看到这里应该就明白了,profile只是普通的函数调用而已,程序启动只是注册了handler,真正的采样应该是在请求之后执行的。

但是来都来了,我们不妨往下看看,毕竟需要看标准库的机会不多。我们来看一下cpu profile,也就是Profile函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Profile responds with the pprof-formatted cpu profile.
// The package initialization registers it as /debug/pprof/profile.
func Profile(w http.ResponseWriter, r *http.Request) {
sec, _ := strconv.ParseInt(r.FormValue("seconds"), 10, 64)
if sec == 0 {
sec = 30
}
if durationExceedsWriteTimeout(r, float64(sec)) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Go-Pprof", "1")
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintln(w, "profile duration exceeds server's WriteTimeout")
return
}
// Set Content Type assuming StartCPUProfile will work,
// because if it does it starts writing.
w.Header().Set("Content-Type", "application/octet-stream")
if err := pprof.StartCPUProfile(w); err != nil {
// StartCPUProfile failed, so no writes yet.
// Can change header back to text content
// and send error code.
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Go-Pprof", "1")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Could not enable CPU profiling: %s\n", err)
return
}
sleep(w, time.Duration(sec)*time.Second)
pprof.StopCPUProfile()
}

获取了一个默认30秒的时间,执行StartCPUProfile,里面应该是个goroutine,调用返回后sleep了一下,结束cpu profile。采样应该是在StartCPUProfile,执行seconds时间。

再来看看StartCPUProfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func StartCPUProfile(w io.Writer) error {
// The runtime routines allow a variable profiling rate,
// but in practice operating systems cannot trigger signals
// at more than about 500 Hz, and our processing of the
// signal is not cheap (mostly getting the stack trace).
// 100 Hz is a reasonable choice: it is frequent enough to
// produce useful data, rare enough not to bog down the
// system, and a nice round number to make it easy to
// convert sample counts to seconds. Instead of requiring
// each client to specify the frequency, we hard code it.
const hz = 100
cpu.Lock()
defer cpu.Unlock()
if cpu.done == nil {
cpu.done = make(chan bool)
}
// Double-check.
if cpu.profiling {
return fmt.Errorf("cpu profiling already in use")
}
cpu.profiling = true
runtime.SetCPUProfileRate(hz)
go profileWriter(w)
return nil
}

注释解释了取样频率的由来,我们先看一下CPU主频的概念:

CPU的主频,即CPU内核工作的时钟频率(CPU Clock Speed)。CPU的主频的基本单位是赫兹(Hz),但更多的是以兆赫兹(MHz)或吉赫兹(GHz)为单位。时钟频率的倒数即为时钟周期。时钟周期的基本单位为秒(s),但更多的是以毫秒(ms)、微妙(us)或纳秒(ns)为单位。在一个时钟周期内,CPU执行一条运算指令。也就是说,在1000 Hz的CPU主频下,每1毫秒可以执行一条CPU运算指令。在1 MHz的CPU主频下,每1微妙可以执行一条CPU运算指令。而在1 GHz的CPU主频下,每1纳秒可以执行一条CPU运算指令。

在默认情况下,Go语言的运行时系统会以100 Hz的的频率对CPU使用情况进行取样。也就是说每秒取样100次,即每10毫秒会取样一次。为什么使用这个频率呢?因为100 Hz既足够产生有用的数据,又不至于让系统产生停顿。并且100这个数上也很容易做换算,比如把总取样计数换算为每秒的取样数。实际上,这里所说的对CPU使用情况的取样就是对当前的Goroutine的堆栈上的程序计数器的取样。由此,我们就可以从样本记录中分析出哪些代码是计算时间最长或者说最耗CPU资源的部分了。

代码很容易理解,锁住cpu,设置runtime的采样频率,profileWriter就是实际采样了。

所以结论是,采样在请求之后才会进行,线上打开采样的接口没有问题。

参考资料

  • Profiling Go Programs
  • Profiling Go programs with pprof
  • go tool pprof

go语言死循环分析

发表于 2017-12-05 | 分类于 go

最近看了一篇文章,如何定位 golang 进程 hang 死的 bug,里面有这样一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"fmt"
"io"
"log"
"net/http"
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
go server()
go printNum()
var i = 1
for {
// will block here, and never go out
i++
}
fmt.Println("for loop end")
time.Sleep(time.Second * 3600)
}
func printNum() {
i := 0
for {
fmt.Println(i)
i++
}
}
func HelloServer(w http.ResponseWriter, req *http.Request) {
fmt.Println("hello world")
io.WriteString(w, "hello, world!\n")
}
func server() {
http.HandleFunc("/", HelloServer)
err := http.ListenAndServe(":12345", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

运行,会发现打印一会儿数字后停了,我们执行

1
curl localhost:12345

程序卡死。关于程序挂在哪里借助dlv是很好定位的:

1
dlv debug hang.go

进去之后运行程序,打印停止进入卡死状态,我们执行ctrl C,dlv会显示断开的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
received SIGINT, stopping process (will not forward signal)> main.main() ./hang.go:17 (PC: 0x12dd7c8)
12: func main() {
13: runtime.GOMAXPROCS(runtime.NumCPU())
14: go server()
15: go printNum()
16: var i = 1
=> 17: for {
18: // will block here, and never go out
19: i++
20: }
21: fmt.Println("for loop end")
22: time.Sleep(time.Second * 3600)
(dlv)

但是我还是不明白,不明白的地方主要是因为:

  • 我又看了两篇文章Goroutine调度实例简要分析和也谈goroutine调度器,是同一位作者Tony Bai写的,写得非常好。第二篇文章解释了goroutine的调度和cpu数量的关系(不多加解释,建议大家看看),我的mac是双核四线程(这里不明白的同学自行google cpu 超线程),go版本是1.9,理论上讲可以跑4个goroutine而不用考虑死循环,一个死循环最多把一个cpu打死,上面的代码中只有3个goroutine,而且他们看上去都挂住了。
  • 上面说的理论上讲,不是我主观臆测的,我跑了1中第一篇文章中的一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
import (
"fmt"
"time"
)
func deadloop() {
for {
}
}
func main() {
go deadloop()
for {
time.Sleep(time.Second * 1)
fmt.Println("I got scheduled!")
}
}

上面代码有两个goroutine,一个是main goroutine,一个是deadloop goroutine,跑得时候deadloop gouroutine不会对main goroutine造成影响,打印一直在持续,作者的文章解释了原因。

  • 如何定位 golang 进程 hang 死的 bug这篇文章提到了gcwaiting,然而没有解释。

在如何定位 golang 进程 hang 死的 bug有这样一段话:

因为在 for 循环中没有函数调用的话,编译器不会插入调度代码,所以这个执行 for 循环的 goroutine 没有办法被调出,而在循环期间碰到 gc,那么就会卡在 gcwaiting 阶段,并且整个进程永远 hang 死在这个循环上。并不再对外响应。

这个其实就是我们的第一段代码卡死的原因,也是我们第二段代码没有卡死的原因,就是在gc上!

我们再看一篇文章,golang的垃圾回收(GC)机制,这篇文章很短,但每句话都很重要:

  1. 设置gcwaiting=1,这个在每一个G任务之前会检查一次这个状态,如是,则会将当前M 休眠;
  2. 如果这个M里面正在运行一个长时间的G任务,咋办呢,难道会等待这个G任务自己切换吗?这样的话可要等10ms啊,不能等!坚决不能等!
    所以会主动发出抢占标记(类似于上一篇),让当前G任务中断,再运行下一个G任务的时候,就会走到第1步

那么如果这时候运行的是没有函数调用的死循环呢,gc也发出了抢占标记,但是如果死循环没有函数调用,就没有地方被标记,无法被抢占,那就只能设置gcwaiting=1,而M没有休眠,stop the world卡住了(死锁),gcwaiting一直是1,整个程序都卡住了!

这里其实已经解释了第一份代码的现象,第二份代码为什么没有hang住相信大家也能猜到了:代码里没有触发gc!我们来手动触发一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main
import (
"fmt"
"log"
"net/http"
_ "net/http/pprof"
// "runtime"
"time"
)
func deadloop() {
for {
}
}
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
go deadloop()
i := 3
for {
time.Sleep(time.Second * 1)
i--
fmt.Println("I got scheduled!")
if i == 0 {
runtime.GC()
}
}
}

会发现打印了3行之后,程序也卡死了,bingo🎉

我们来看看gcwaiting是不是等于1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ go build hang2.go
$ GODEBUG="schedtrace=300,scheddetail=1" ./hang2
SCHED 2443ms: gomaxprocs=4 idleprocs=3 threads=7 spinningthreads=0 idlethreads=2 runqueue=0 gcwaiting=0 nmidlelocked=0 stopwait=0 sysmonwait=0
P0: status=1 schedtick=4 syscalltick=5 m=5 runqsize=0 gfreecnt=1
P1: status=0 schedtick=14 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
P2: status=0 schedtick=3 syscalltick=4 m=-1 runqsize=0 gfreecnt=0
......
SCHED 2751ms: gomaxprocs=4 idleprocs=0 threads=7 spinningthreads=0 idlethreads=2 runqueue=0 gcwaiting=1 nmidlelocked=0 stopwait=1 sysmonwait=0
P0: status=1 schedtick=4 syscalltick=5 m=5 runqsize=0 gfreecnt=1
P1: status=3 schedtick=14 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
P2: status=3 schedtick=3 syscalltick=10 m=-1 runqsize=0 gfreecnt=0
P3: status=3 schedtick=1 syscalltick=26 m=0 runqsize=0 gfreecnt=0
M6: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1
M5: p=0 curg=19 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpg

代码诚不欺我也!

参考资料

  • 如何定位 golang 进程 hang 死的 bug
  • Goroutine调度实例简要分析
  • 也谈goroutine调度器
  • golang的垃圾回收(GC)机制

hystrix-go简介

发表于 2017-11-27 | 分类于 go

hystrix是一个容错库,旨在隔离指向远程系统,服务和第三方库的请求,杜绝级联故障,并在复杂的分布式系统中实现弹性,毕竟在分布式系统中,故障是不可避免的。

此项目脱胎于由Netflix开源的同名java项目。https://github.com/Netflix/Hystrix

像Hystrix命令一样执行代码

定义依赖于外部系统的应用逻辑,将函数传给Go。当外部系统处于健康状态,这个函数将是唯一被执行的代码。

1
2
3
4
hystrix.Go("my_command", func() error {
// talk to other services
return nil
}, nil)

定义fallback行为

如果希望外部系统挂了的时候执行一些动作,可以给Go传递第二个函数。理想情况下,这里的逻辑可以让你的应用优雅地处理外部系统不可用的情况。

当第一个函数返回error,或者在一系列健康检查的情况下函数无法运行结束,都会触发fallback。更详细的参考在这里

1
2
3
4
5
6
7
hystrix.Go("my_command", func() error {
// talk to other services
return nil
}, func(err error) error {
// do this when services are down
return nil
})

等待输出

调用Go就像执行了一个goroutine,除了你能获取到一个error的channel并且监控它。

1
2
3
4
5
6
7
8
9
10
11
12
13
output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
// talk to other services
output <- true
return nil
}, nil)
select {
case out := <-output:
// success
case err := <-errors:
// failure
}

同步API

调用一个借口并且等待返回是一个常见的场景(对应于goroutine),Hystrix提供了一个Do函数,返回一个error

1
2
3
4
err := hystrix.Do("my_command", func() error {
// talk to other services
return nil
}, nil)

配置

在应用启动期间,你可以调用ConfigureCommand来为每个command添加配置:

1
2
3
4
5
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: 1000,
MaxConcurrentRequests: 100,
ErrorPercentThreshold: 25,
})

也有别的配置方法,更详细的介绍请参考官方文档。

一个例子

最后给大家举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import (
"fmt"
"github.com/afex/hystrix-go/hystrix"
"net/http"
"time"
)
func main() {
hystrix.Go("get_baidu", func() error {
// talk to other services
_, err := http.Get("https://www.baidu.com/")
if err != nil {
fmt.Println("get error")
return err
}
return nil
}, func(err error) error {
fmt.Println("get an error, handle it")
return nil
})
time.Sleep(2 * time.Second) // 调用Go方法就是起了一个goroutine,这里要sleep一下,不然看不到效果
}

网络请求,大家把网络断开后就能够模拟外部服务挂掉的情况。

总结

熔断机制在分布式系统中几乎是必备的组件,下面总结一下:

特点

  1. hystrix作用在客户端,客户端程序依赖hystrix相关的第三方包,使得客户端与所依赖的服务,形成隔离(goroutine的隔离)。依赖服务的延迟与失败变的可控。保护调用者goroutine的执行。

  2. 避免了分布式系统中,单个组件的失败导致的级联影响。

  3. 快速失败,迅速恢复。 hystrix有快速失败机制,单个组件服务失败率到一定程度后,再请求,会直接响应失败。再这之后,会有重试机制。减少系统在错误服务调用上的开销。

  4. 降级应用

hystrix的设计原则

  1. 防止任何单个依赖服务耗尽所有用户线程

  2. 直接响应失败,而不是一直等待

  3. 提供错误返回接口,而不是让用户线程直接处理依赖服务抛出的异常

  4. 使用隔离或熔断技术来降低并限制单个依赖对整个系统造成的影响

12…18
You Wangqiu

You Wangqiu

世之奇伟、瑰怪,非常之观,常在于险远,而人之所罕至焉,故非有志者不能至也

171 日志
21 分类
24 标签
GitHub 知乎 E-Mail
© 2018 You Wangqiu
由 Hexo 强力驱动
主题 - NexT.Muse