C0reFast记事本

to inspire confidence in somebody.

转自The complete guide to Go net/http timeouts

服务端超时

对于http.Server服务端有两个超时可以设置:ReadTimeoutWriteTimeout

srv := &http.Server{
    ReadTimeout: 5 * time.Second,
    WriteTimeout: 10 * time.Second,
}
log.Println(srv.ListenAndServe())

各自的作用时间见图:

Server timeout

需要注意的是WriteTimeout被设置了两次,一次是在读取Http头过程中,另一次是在读取Http头结束后。

客户端超时

对于http.Client客户端,相对要复杂一点,一般的初始化代码如下:

c := &http.Client{
    Transport: &http.Transport{
        Dial: (&net.Dialer{
                Timeout:   30 * time.Second,
                KeepAlive: 30 * time.Second,
        }).Dial,
        TLSHandshakeTimeout:   10 * time.Second,
        ResponseHeaderTimeout: 10 * time.Second,
        ExpectContinueTimeout: 1 * time.Second,
    }
}

这些Timeout各自的作用时间见:

Client timeout

在CNI所以默认提供的Plugin中,bridge应该算是最简单的插件了,针对IPAM Plugin,最简单的应该是host-local,这两个插件也是Kubernetes网络kubenet需要的两个插件。所以这里看一下这两个插件的代码。

所有官方维护的代码,都开源在containernetworking/plugins项目中了。

其中bridge的代码在plugins/main/bridge目录,最重要的是cmdAddcmdDel两个函数,对应CNI SPEC中的ADD和DEL两个主要操作。主要来看一下cmdAdd的实现,精简(删除一些错误处理)后的代码如下:

func cmdAdd(args *skel.CmdArgs) error {
	n, cniVersion, err := loadNetConf(args.StdinData)

	if n.IsDefaultGW {
		n.IsGW = true
	}

	br, brInterface, err := setupBridge(n)
	if err != nil {
		return err
	}

	netns, err := ns.GetNS(args.Netns)
	defer netns.Close()

	hostInterface, containerInterface, err := setupVeth(netns, br, args.IfName, n.MTU, n.HairpinMode)

	r, err := ipam.ExecAdd(n.IPAM.Type, args.StdinData)
	if err != nil {
		return err
	}

	result, err := current.NewResultFromResult(r)

	if len(result.IPs) == 0 {
		return errors.New("IPAM plugin returned missing IP config")
	}

	result.Interfaces = []*current.Interface{brInterface, hostInterface, containerInterface}

	gwsV4, gwsV6, err := calcGateways(result, n)
	if err != nil {
		return err
	}

	if err := netns.Do(func(_ ns.NetNS) error {
		contVeth, err := net.InterfaceByName(args.IfName)

		for _, ipc := range result.IPs {
			if ipc.Version == "6" && (n.HairpinMode || n.PromiscMode) {
				if err := disableIPV6DAD(args.IfName); err != nil {
					return err
				}
				break
			}
		}

		if err := ipam.ConfigureIface(args.IfName, result); err != nil {
			return err
		}

		for _, ipc := range result.IPs {
			if ipc.Version == "4" {
				_ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth)
			}
		}
		return nil
	}); err != nil {
		return err
	}

	if n.IsGW {
		var firstV4Addr net.IP
		for _, gws := range []*gwInfo{gwsV4, gwsV6} {
			for _, gw := range gws.gws {
				if gw.IP.To4() != nil && firstV4Addr == nil {
					firstV4Addr = gw.IP
				}

				err = ensureBridgeAddr(br, gws.family, &gw, n.ForceAddress)
				if err != nil {
					return fmt.Errorf("failed to set bridge addr: %v", err)
				}
			}

			if gws.gws != nil {
				if err = enableIPForward(gws.family); err != nil {
					return fmt.Errorf("failed to enable forwarding: %v", err)
				}
			}
		}
	}

	if n.IPMasq {
		chain := utils.FormatChainName(n.Name, args.ContainerID)
		comment := utils.FormatComment(n.Name, args.ContainerID)
		for _, ipc := range result.IPs {
			if err = ip.SetupIPMasq(ip.Network(&ipc.Address), chain, comment); err != nil {
				return err
			}
		}
	}

	br, err = bridgeByName(n.BrName)

	brInterface.Mac = br.Attrs().HardwareAddr.String()

	result.DNS = n.DNS

	return types.PrintResult(result, cniVersion)
}
阅读全文 »

本文主要介绍一下容器网络接口(CNI) 的SPEC,主要参考SPEC v0.3.1,以及目前最新的SPEC,目前新SPEC针对v0.3.1版本改动不是很大,特别是相关接口输入输出方面,因此可以看作是一样的。

总览

所有的CNI Plugin,都必须实现为可以被容器管理系统(如rtk、Kubernetes等)调用的可执行文件。
CNI插件负责将网络接口插入容器网络命名空间(例如veth pair的其中一端),并在主机上进行任何必要的改变(例如将veth pair的另一端连接到网桥)。然后应该将IP分配给接口,并通过调用适当的IPAM插件将与“IP地址管理”部分一致的IP地址分配给该网络接口,并设置好对应的路由。

参数

所有的CNI Plugin必须实现以下操作:

  • 添加一个容器到网络

    • Parameters:
      • Version. The version of CNI spec that the caller is using (container management system or the invoking plugin).
      • Container ID. A unique plaintext identifier for a container, allocated by the runtime. Must not be empty.
      • Network namespace path. This represents the path to the network namespace to be added, i.e. /proc/[pid]/ns/net or a bind-mount/link to it.
      • Network configuration. This is a JSON document describing a network to which a container can be joined. The schema is described below.
      • Extra arguments. This provides an alternative mechanism to allow simple configuration of CNI plugins on a per-container basis.
      • Name of the interface inside the container. This is the name that should be assigned to the interface created inside the container (network namespace); consequently it must comply with the standard Linux restrictions on interface names.
    • Result:
      • Interfaces list. Depending on the plugin, this can include the sandbox (eg, container or hypervisor) interface name and/or the host interface name, the hardware addresses of each interface, and details about the sandbox (if any) the interface is in.
      • IP configuration assigned to each interface. The IPv4 and/or IPv6 addresses, gateways, and routes assigned to sandbox and/or host interfaces.
      • DNS information. Dictionary that includes DNS information for nameservers, domain, search domains and options.
  • 从网络中删除一个容器

    • Parameters:
      • Version. The version of CNI spec that the caller is using (container management system or the invoking plugin).
      • Container ID, as defined above.
      • Network namespace path, as defined above.
      • Network configuration, as defined above.
      • Extra arguments, as defined above.
      • Name of the interface inside the container, as defined above.
    • All parameters should be the same as those passed to the corresponding add operation.
    • A delete operation should release all resources held by the supplied containerid in the configured network.
  • 报告插件支持的CNI版本

    • Parameters: NONE.

    • Result: information about the CNI spec versions supported by the plugin

      {
        "cniVersion": "0.3.1", // the version of the CNI spec in use for this output
        "supportedVersions": [ "0.1.0", "0.2.0", "0.3.0", "0.3.1" ] // the list of CNI spec versions that this plugin supports
      }
阅读全文 »

默认情况下,在使用rpmbuild打包时,会对安装的所有文件进行strip操作,去除文件的一些调试信息,并将这些调试信息放到debuginfo包中,但在很多时候,我们并不需要rpmbuild帮我们执行strip,也不需要生成debuginfo包,所以我们可以修改一下spec文件,关闭这些选项。

针对文件的strip操作是在__os_install_post这个宏中定义的,我们可以运行一下rpmbuild --showrc看一下原始的__os_install_post做了哪些操作:

...
-14: __os_install_post
    /usr/lib/rpm/redhat/brp-compress
    %{!?__debug_package:/usr/lib/rpm/redhat/brp-strip %{__strip}}
    /usr/lib/rpm/redhat/brp-strip-static-archive %{__strip}
    /usr/lib/rpm/redhat/brp-strip-comment-note %{__strip} %{__objdump}
    /usr/lib/rpm/brp-python-bytecompile
    /usr/lib/rpm/redhat/brp-python-hardlink
    %{!?__jar_repack:/usr/lib/rpm/redhat/brp-java-repack-jars}
...

可以看到在打包时会对文件进行一系列操作,比如压缩,strip,编译Python脚本等,所以,我们只需要在spec文件中,加上%define __os_install_post %{nil},将__os_install_post设置为空,这样在打包的时候,就不会执行上面的这些操作了,也就不会对文件进行strip操作了。同样的,如果不需要生成debuginfo包,只需要再加上%define debug_package %{nil}就可以了。

这个问题是很久之前解决的,现在想起来,还是把之前的问题解决过程总结一下。
问题的起因是内部的一个Socket代理,用户对独享数据库的所有请求都需要经过这个Socket代理,某天一个用户反馈,切换到独享数据库之后,页面响应变得异常的慢,大概从1s左右直接到了60s左右,明显是有问题的,首先让用户开了xhprof看了一下,发现用户一个页面牵涉到了超过1000次SQL查询,这1000多次查询占据了绝大部分的时间,因为仅仅切换了数据库,所以问题的原因肯定还是数据库相关。
这个场景还是稍微有点特殊,一个页面里有超过1000次SQL查询的设计也不算合理,所以,我们就编写了测试用例,在PHP中,查询数据库1000次,测试直接连接数据库,和通过Socket代理连接数据库的情况:最后发现直连的速度非常快,但是过代理则慢的不可接受了,很明显是代理的问题。
于是我们尝试在代理机器上抓包分析一下:

tcpdump

其中10.67.15.102是我们Web运行环境的机器IP,10.67.15.212是Socket代理所在的机器10.13.144.139是数据库所在的机器,从id为22953的数据包开始,到22957,是一个SQL查询从Web运行环境到数据库的整个过程:

id为22953:运行环境102发送select语句到Socket代理212。数据包长度为296byte 时间:23.877515
id为22954:Socket代理212发送了一部分select语句128byte到数据库139。       时间:23.877611
id为22955:Socket代理212回运行环境103的ack。                             时间:23.917294
id为22956:数据库139回Socket代理212的ack。                               时间:23.918398
id为22957:Socket代理212发送剩余部分select语句168byte到数据库139。       时间:23.918415
阅读全文 »

一条syslog信息包含三部分,PRI, HEADER和MSG,其中PRI是<>扩起来的一个数字,这个数字就代表了不同Facility和Severity的消息。
其中Facility, Severity的数字代号列表如下:

Facility:

Code Keyword Description
0 kern kernel messages
1 user user-level messages
2 mail mail system
3 daemon system daemons
4 auth security/authorization messages
5 syslog messages generated internally by syslogd
6 lpr line printer subsystem (archaic subsystem)
7 news network news subsystem (archaic subsystem)
8 uucp UUCP subsystem (archaic subsystem)
9 clock daemon
10 authpriv security/authorization messages
11 ftp FTP daemon
12 - NTP subsystem
13 - log audit
14 - log alert
15 cron scheduling daemon
16 local0 local use 0 (local0)
17 local1 local use 1 (local1)
18 local2 local use 2 (local2)
19 local3 local use 3 (local3)
20 local4 local use 4 (local4)
21 local5 local use 5 (local5)
22 local6 local use 6 (local6)
23 local7 local use 7 (local7)

Severity:

Code Keyword Description
0 emerg System is unusable
1 alert Should be corrected immediately
2 crit Critical conditions
3 err Error conditions
4 warning May indicate that an error will occur if action is not taken.
5 notice Events that are unusual, but not error conditions.
6 info Normal operational messages that require no action.
7 debug Information useful to developers for debugging the application.

针对PRI的计算公式:PRI = FacilityCode*8 + SeverityCode
举个例子: local3.info的日志,它的PRI就是19*8+6=158,所以这条消息在传输中的格式为<158> {HEADER} {MEG}
再一个例子,如果看到一条PRI为14的消息,那么它实际的级别就是user.info (1*8+6=14)

参考:

  1. https://tools.ietf.org/html/rfc3164
  2. https://wiki.archlinux.org/index.php/systemd#Journal

这几天翻了翻项目的代码,看到了一个非常简单的代理程序,使用poll实现,可以在代理过程中输出数据流,基本上算是教科书级别的poll使用例子了,分享一下:

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iterator>
#include <iostream>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <poll.h>

#define PROXY_PORT  8888
#define BIN_TXT     0
#define RAW_TXT     1

static const char *backendHost = 0;
static int backendPort;
static int oflag = RAW_TXT;

bool proxy(int cfd);

// g++ -o proxy proxy.cpp -I. -Wall
int main(int argc, char *argv[])
{
    if (argc == 3 || argc == 4) {
        backendHost = argv[1];  
        backendPort = atoi(argv[2]);    
        if (argc == 4) oflag = atoi(argv[3]);
    } else {
        fprintf(stderr, "usage: %s BackendHost BackendPort oflag\n", argv[0]);  
        exit(-1);
    }

    int fd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in addr;
    memset(&addr, 0x00, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(PROXY_PORT);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int flags = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));

    if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
        fprintf(stderr, "[error] bind error, %s\n", strerror(errno));
        return EXIT_FAILURE;
    }

    listen(fd, 10);

    int cfd;
    while ((cfd = accept(fd, NULL, NULL)) > 0) {
#ifdef USE_FORK
        pid_t pid = fork();
        if (pid == 0) {
#endif
            proxy(cfd);
            close(cfd);
#ifdef USE_FORK
            exit(0);
        } else if (pid == -1) {
            fprintf(stderr, "[error] fork error, %s\n", strerror(errno));   
        } else {
            close(cfd); 
        }
#endif
    }
    close(fd);

    return EXIT_SUCCESS;
}

static int connectBackend()
{
    int sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd == -1) {
        fprintf(stderr, "socket error, %s\n", strerror(errno)); 
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0x00, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(backendPort);
    inet_pton(AF_INET, backendHost, &addr.sin_addr);

    if (connect(sfd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
        fprintf(stderr, "connect backed error, %s\n", strerror(errno));
        close(sfd);
        return -1;
    }

    fprintf(stdout, "[debug %d] connect backend ok\n", getpid());
    return sfd;
}

static void txtPrint(const char *buffer, size_t size)
{
    /* stderr unbuffered */
    if (oflag == BIN_TXT) {
        for (size_t i = 0; i < size; ++i) {
            fprintf(stderr, "%02x", (unsigned char) buffer[i]);
        }
    } else {
        for (size_t i = 0; i < size; ++i) {
            if (isprint((unsigned char) buffer[i])) {
                fprintf(stderr, "%c", buffer[i]);   
            } else {
                fprintf(stderr, "%%%02x", (unsigned char) buffer[i]);
            }
        }
    }
}

bool proxy(int cfd)
{
    const size_t len = 128;
    char buf[len];
    ssize_t n;

    fcntl(cfd, F_SETFL, fcntl(cfd, F_GETFL) | O_NONBLOCK);

    int sfd = connectBackend();
    if (sfd == -1) return false;

    fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK);

    struct pollfd pfd[2] = {
        {cfd, POLLIN, 0},   
        {sfd, POLLIN, 0}
    };

    int nfds = 2;

    bool stop = false;
    while (!stop) {
        int nready = poll(pfd, nfds, -1);
        if (nready == -1) {
            fprintf(stderr, "[error] poll, %s\n", strerror(errno)); 
            close(sfd);
            return false;
        }

        for (int i = 0; i < nfds; ++i) {
            if (pfd[i].revents & POLLIN) {
                while ((n = recv(pfd[i].fd, buf, len, 0)) > 0) {
                    int fd = (pfd[i].fd == cfd) ? sfd : cfd;
                    ssize_t nn;
                    size_t offset = 0;
                    while ((nn = send(fd, buf + offset, n - offset, 0)) > 0) {
                        offset += nn;
                        if (offset == (size_t) n) break;
                    }
                    struct timeval tv;
                    gettimeofday(&tv, 0);
                    if (pfd[i].fd == cfd) {
                        fprintf(stdout, "[debug] %lu:%lu client read\n", (unsigned long) tv.tv_sec, (unsigned long) tv.tv_usec);
                    } else {
                        fprintf(stdout, "[debug] %lu:%lu server read\n", (unsigned long) tv.tv_sec, (unsigned long) tv.tv_usec);
                    }
                    txtPrint(buf, n);
                }
                if (n == 0) {
                    if (pfd[i].fd == cfd) {
                        fprintf(stdout, "[debug] client closed\n");
                    } else {
                        fprintf(stdout, "[debug] server closed\n");
                    }
                    stop = true;
                } else if (n == -1 && errno != EAGAIN) {
                    fprintf(stderr, "[error] read %s error, %s\n", pfd[i].fd == cfd ? "client" : "backend",
                                                                 strerror(errno));  
                    close(sfd);
                    return false;
                }
            }
        }
    }

    fprintf(stdout, "[debug %d] disconnect\n", getpid());

    close(sfd);
    return true;
}

使用g++ -o proxy proxy.cpp -I. -Wall 命令编译,运行时直接指定需要代理的后端IP和Port即可,也支持fork以支持多条链接。

Python中的上下文管理器可以允许精确地分配和释放资源,最常用的就是使用with语句,比如:

with open('/tmp/file_x', 'w') as file_x:
    file_x.write('Hello')

with结束,文件也会被安全的关闭。不用担心回收资源的问题了。

如果一个自定义的类也想支持类似的调用方式,需要实现__enter__(self)__exit__(self, type, value, traceback)这两个方法,具体的:

class File(object):
    def __init__(self, file_name, method):
        self.file_obj = open(file_name, method)
    def __enter__(self):
        return self.file_obj
    def __exit__(self, type, value, traceback):
        self.file_obj.close()

其中__enter__方法将打开的文件返回给with语句。
对于__exit__(self, type, value, traceback)方法,会在with语句退出时调用,如果在执行中发现异常,则异常的type,value和traceback会被传递给__exit__方法,在__exit__中可以对异常进行相应的处理,如果最终
__exit__方法返回None,则认为异常被正确处理了,如果返回的不是None,则这个异常会被with抛出,期待上层进行相应的处理。

除了上面的方法,Python还提供了一个contextlib模块,通过这个模块加上装饰器(decorators)和生成器(generators),也能实现类似的功能:

from contextlib import contextmanager

@contextmanager
def open_file(name):
    f = open(name, 'w')
    yield f
    f.close()

这样在使用中,open_file变成了一个生成器,所以contextmanager可以通过调用这个生成器next()方法控制资源的释放,具体的源代码在这里:

# 代码有所省略,具体可以参考: https://github.com/python/cpython/blob/master/Lib/contextlib.py
class _GeneratorContextManager(_GeneratorContextManagerBase,
                               AbstractContextManager,
                               ContextDecorator):
    """Helper for @contextmanager decorator."""

    def _recreate_cm(self):
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield") from None

    def __exit__(self, type, value, traceback):
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                value = type()
            try:
                self.gen.throw(type, value, traceback)
            except StopIteration as exc:
                return exc is not value
            except RuntimeError as exc:
                if exc is value:
                    return False
                if type is StopIteration and exc.__cause__ is value:
                    return False
                raise
            except:
                if sys.exc_info()[1] is value:
                    return False
                raise
            raise RuntimeError("generator didn't stop after throw()")

def contextmanager(func):
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper

参考:

  1. http://book.pythontips.com/en/latest/context_managers.html
  2. https://github.com/python/cpython/blob/master/Lib/contextlib.py

找到一个命令行版本的speedtest.net,可以在没有浏览器的情况下进行网络测速,具体的地址在[sivel/speedtest-cli][1]
[1]: https://github.com/sivel/speedtest-cli

单文件,只依赖Python,可以直接下载:

curl -Lo speedtest-cli https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest.py
chmod +x speedtest-cli

下载后直接运行即可:

[root@test ~]# ./speedtest-cli
Retrieving speedtest.net configuration...
Testing from XXX Networks (x.x.x.x)...
Retrieving speedtest.net server list...
Selecting best server based on ping...
Hosted by Atlantic Metro (Los Angeles, CA) [1.30 km]: 1.565 ms
Testing download speed................................................................................
Download: 631.41 Mbit/s
Testing upload speed....................................................................................................
Upload: 48.73 Mbit/s

OpenStack Swift中,object replicator的作用是在系统遇到诸如临时的网络中断或磁盘故障后使系统处于一致状态。object replicator会将本地数据与每个远程副本进行比较,以确保它们都包含最新版本。下面会简单分析一下object replicator的代码,了解一下整个Replication的工作流程。

阅读全文 »
0%