C0reFast记事本

to inspire confidence in somebody.

最近的一个项目需要用到Kubernetes的CronJob,主要用来定时执行一个备份任务,刚开始使用的时候发现没有按照预期的情况运行,所以决定看看CronJob Controller的代码,看看他是怎么实现对应的功能的,正好发现网上也没有其他人写过关于CronJob Controller代码的解析(可能是太简单了不用写吧)。所以也就正好记录一下。

CronJob Controller的代码在kubernetes/pkg/controller/cronjob路径下,主要的逻辑实现在这个目录的cronjob_controller.go,这里分析的是v1.10.2版本的代码,可以直接链接到Github查看。

阅读全文 »

之前一直听说Designing Data-Intensive Applications (DDIA) 这本书是神书,也决定读一下,顺便做些笔记,也算巩固一下学到的东西吧。

书的第一部分,主要关注于一些基础的知识,第一章的标题Reliable, Scalable, and Maintainable Applications就讲了三个当前应用的最主要特点:可靠性,可扩展性和可维护性。

可靠性

首先是Reliability也就是可靠性,主要包含下面的几个预期:

  • 应用可以按用户所期待的功能正常运行
  • 可以容忍用户犯的错误或者不正确的使用方式
  • 在对应的系统容量下性能能满足正常的使用要求
  • 能拒绝未授权或者滥用的情况

如果能满足上面的需求,可以说一个软件是可靠的,但是并不是所有的东西都能满足预期,当一些意料之外的东西发生了,称之为faults,系统正确应对这些faults的能力则称作fault-tolerant or resilient(即容错能力或者弹性),虽然容错能力很重要,但也不是意味着可以实现一个能容忍任何错误的系统(比如地球爆炸了)。
需要注意的是,faultfailure是不一样的,前者一般指的是系统某个部分没有按照设计正常工作,而后者一般就意味着整个系统都无法正常提供服务了。当然,我们没有办法去降低fault发生的概率,特别是降低到0,能做的,就是当fault发生时,系统不会因为这些faults变成failure状态,这也是一个容错系统的设计目标。
针对一个系统,我们可以人为的提升faults的发生概率,来验证系统的可靠性,比如kill某个进程,或者断开网络等等。一般情况下,比较严重的bug都是因为对错误的处理不完善导致的。
当然尽管我们可以通过设计容忍很多错误,但是预防错误的发生,远远比发生后再去修复来的好,毕竟很多错误是没办法被修复的,比如数据库被黑客入侵,这个操作无法被修复到原始的样子(数据已经泄漏)。
常见的错误主要有三个:

阅读全文 »

转自The complete guide to Go net/http timeouts

服务端超时

对于http.Server服务端有两个超时可以设置:ReadTimeoutWriteTimeout

srv := &http.Server{
    ReadTimeout: 5 * time.Second,
    WriteTimeout: 10 * time.Second,
}
log.Println(srv.ListenAndServe())

各自的作用时间见图:

Server timeout

需要注意的是WriteTimeout被设置了两次,一次是在读取Http头过程中,另一次是在读取Http头结束后。

客户端超时

对于http.Client客户端,相对要复杂一点,一般的初始化代码如下:

c := &http.Client{
    Transport: &http.Transport{
        Dial: (&net.Dialer{
                Timeout:   30 * time.Second,
                KeepAlive: 30 * time.Second,
        }).Dial,
        TLSHandshakeTimeout:   10 * time.Second,
        ResponseHeaderTimeout: 10 * time.Second,
        ExpectContinueTimeout: 1 * time.Second,
    }
}

这些Timeout各自的作用时间见:

Client timeout

在CNI所以默认提供的Plugin中,bridge应该算是最简单的插件了,针对IPAM Plugin,最简单的应该是host-local,这两个插件也是Kubernetes网络kubenet需要的两个插件。所以这里看一下这两个插件的代码。

所有官方维护的代码,都开源在containernetworking/plugins项目中了。

其中bridge的代码在plugins/main/bridge目录,最重要的是cmdAddcmdDel两个函数,对应CNI SPEC中的ADD和DEL两个主要操作。主要来看一下cmdAdd的实现,精简(删除一些错误处理)后的代码如下:

func cmdAdd(args *skel.CmdArgs) error {
	n, cniVersion, err := loadNetConf(args.StdinData)

	if n.IsDefaultGW {
		n.IsGW = true
	}

	br, brInterface, err := setupBridge(n)
	if err != nil {
		return err
	}

	netns, err := ns.GetNS(args.Netns)
	defer netns.Close()

	hostInterface, containerInterface, err := setupVeth(netns, br, args.IfName, n.MTU, n.HairpinMode)

	r, err := ipam.ExecAdd(n.IPAM.Type, args.StdinData)
	if err != nil {
		return err
	}

	result, err := current.NewResultFromResult(r)

	if len(result.IPs) == 0 {
		return errors.New("IPAM plugin returned missing IP config")
	}

	result.Interfaces = []*current.Interface{brInterface, hostInterface, containerInterface}

	gwsV4, gwsV6, err := calcGateways(result, n)
	if err != nil {
		return err
	}

	if err := netns.Do(func(_ ns.NetNS) error {
		contVeth, err := net.InterfaceByName(args.IfName)

		for _, ipc := range result.IPs {
			if ipc.Version == "6" && (n.HairpinMode || n.PromiscMode) {
				if err := disableIPV6DAD(args.IfName); err != nil {
					return err
				}
				break
			}
		}

		if err := ipam.ConfigureIface(args.IfName, result); err != nil {
			return err
		}

		for _, ipc := range result.IPs {
			if ipc.Version == "4" {
				_ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth)
			}
		}
		return nil
	}); err != nil {
		return err
	}

	if n.IsGW {
		var firstV4Addr net.IP
		for _, gws := range []*gwInfo{gwsV4, gwsV6} {
			for _, gw := range gws.gws {
				if gw.IP.To4() != nil && firstV4Addr == nil {
					firstV4Addr = gw.IP
				}

				err = ensureBridgeAddr(br, gws.family, &gw, n.ForceAddress)
				if err != nil {
					return fmt.Errorf("failed to set bridge addr: %v", err)
				}
			}

			if gws.gws != nil {
				if err = enableIPForward(gws.family); err != nil {
					return fmt.Errorf("failed to enable forwarding: %v", err)
				}
			}
		}
	}

	if n.IPMasq {
		chain := utils.FormatChainName(n.Name, args.ContainerID)
		comment := utils.FormatComment(n.Name, args.ContainerID)
		for _, ipc := range result.IPs {
			if err = ip.SetupIPMasq(ip.Network(&ipc.Address), chain, comment); err != nil {
				return err
			}
		}
	}

	br, err = bridgeByName(n.BrName)

	brInterface.Mac = br.Attrs().HardwareAddr.String()

	result.DNS = n.DNS

	return types.PrintResult(result, cniVersion)
}
阅读全文 »

本文主要介绍一下容器网络接口(CNI) 的SPEC,主要参考SPEC v0.3.1,以及目前最新的SPEC,目前新SPEC针对v0.3.1版本改动不是很大,特别是相关接口输入输出方面,因此可以看作是一样的。

总览

所有的CNI Plugin,都必须实现为可以被容器管理系统(如rtk、Kubernetes等)调用的可执行文件。
CNI插件负责将网络接口插入容器网络命名空间(例如veth pair的其中一端),并在主机上进行任何必要的改变(例如将veth pair的另一端连接到网桥)。然后应该将IP分配给接口,并通过调用适当的IPAM插件将与“IP地址管理”部分一致的IP地址分配给该网络接口,并设置好对应的路由。

参数

所有的CNI Plugin必须实现以下操作:

  • 添加一个容器到网络

    • Parameters:
      • Version. The version of CNI spec that the caller is using (container management system or the invoking plugin).
      • Container ID. A unique plaintext identifier for a container, allocated by the runtime. Must not be empty.
      • Network namespace path. This represents the path to the network namespace to be added, i.e. /proc/[pid]/ns/net or a bind-mount/link to it.
      • Network configuration. This is a JSON document describing a network to which a container can be joined. The schema is described below.
      • Extra arguments. This provides an alternative mechanism to allow simple configuration of CNI plugins on a per-container basis.
      • Name of the interface inside the container. This is the name that should be assigned to the interface created inside the container (network namespace); consequently it must comply with the standard Linux restrictions on interface names.
    • Result:
      • Interfaces list. Depending on the plugin, this can include the sandbox (eg, container or hypervisor) interface name and/or the host interface name, the hardware addresses of each interface, and details about the sandbox (if any) the interface is in.
      • IP configuration assigned to each interface. The IPv4 and/or IPv6 addresses, gateways, and routes assigned to sandbox and/or host interfaces.
      • DNS information. Dictionary that includes DNS information for nameservers, domain, search domains and options.
  • 从网络中删除一个容器

    • Parameters:
      • Version. The version of CNI spec that the caller is using (container management system or the invoking plugin).
      • Container ID, as defined above.
      • Network namespace path, as defined above.
      • Network configuration, as defined above.
      • Extra arguments, as defined above.
      • Name of the interface inside the container, as defined above.
    • All parameters should be the same as those passed to the corresponding add operation.
    • A delete operation should release all resources held by the supplied containerid in the configured network.
  • 报告插件支持的CNI版本

    • Parameters: NONE.

    • Result: information about the CNI spec versions supported by the plugin

      {
        "cniVersion": "0.3.1", // the version of the CNI spec in use for this output
        "supportedVersions": [ "0.1.0", "0.2.0", "0.3.0", "0.3.1" ] // the list of CNI spec versions that this plugin supports
      }
阅读全文 »

默认情况下,在使用rpmbuild打包时,会对安装的所有文件进行strip操作,去除文件的一些调试信息,并将这些调试信息放到debuginfo包中,但在很多时候,我们并不需要rpmbuild帮我们执行strip,也不需要生成debuginfo包,所以我们可以修改一下spec文件,关闭这些选项。

针对文件的strip操作是在__os_install_post这个宏中定义的,我们可以运行一下rpmbuild --showrc看一下原始的__os_install_post做了哪些操作:

...
-14: __os_install_post
    /usr/lib/rpm/redhat/brp-compress
    %{!?__debug_package:/usr/lib/rpm/redhat/brp-strip %{__strip}}
    /usr/lib/rpm/redhat/brp-strip-static-archive %{__strip}
    /usr/lib/rpm/redhat/brp-strip-comment-note %{__strip} %{__objdump}
    /usr/lib/rpm/brp-python-bytecompile
    /usr/lib/rpm/redhat/brp-python-hardlink
    %{!?__jar_repack:/usr/lib/rpm/redhat/brp-java-repack-jars}
...

可以看到在打包时会对文件进行一系列操作,比如压缩,strip,编译Python脚本等,所以,我们只需要在spec文件中,加上%define __os_install_post %{nil},将__os_install_post设置为空,这样在打包的时候,就不会执行上面的这些操作了,也就不会对文件进行strip操作了。同样的,如果不需要生成debuginfo包,只需要再加上%define debug_package %{nil}就可以了。

这个问题是很久之前解决的,现在想起来,还是把之前的问题解决过程总结一下。
问题的起因是内部的一个Socket代理,用户对独享数据库的所有请求都需要经过这个Socket代理,某天一个用户反馈,切换到独享数据库之后,页面响应变得异常的慢,大概从1s左右直接到了60s左右,明显是有问题的,首先让用户开了xhprof看了一下,发现用户一个页面牵涉到了超过1000次SQL查询,这1000多次查询占据了绝大部分的时间,因为仅仅切换了数据库,所以问题的原因肯定还是数据库相关。
这个场景还是稍微有点特殊,一个页面里有超过1000次SQL查询的设计也不算合理,所以,我们就编写了测试用例,在PHP中,查询数据库1000次,测试直接连接数据库,和通过Socket代理连接数据库的情况:最后发现直连的速度非常快,但是过代理则慢的不可接受了,很明显是代理的问题。
于是我们尝试在代理机器上抓包分析一下:

tcpdump

其中10.67.15.102是我们Web运行环境的机器IP,10.67.15.212是Socket代理所在的机器10.13.144.139是数据库所在的机器,从id为22953的数据包开始,到22957,是一个SQL查询从Web运行环境到数据库的整个过程:

id为22953:运行环境102发送select语句到Socket代理212。数据包长度为296byte 时间:23.877515
id为22954:Socket代理212发送了一部分select语句128byte到数据库139。       时间:23.877611
id为22955:Socket代理212回运行环境103的ack。                             时间:23.917294
id为22956:数据库139回Socket代理212的ack。                               时间:23.918398
id为22957:Socket代理212发送剩余部分select语句168byte到数据库139。       时间:23.918415
阅读全文 »

一条syslog信息包含三部分,PRI, HEADER和MSG,其中PRI是<>扩起来的一个数字,这个数字就代表了不同Facility和Severity的消息。
其中Facility, Severity的数字代号列表如下:

Facility:

Code Keyword Description
0 kern kernel messages
1 user user-level messages
2 mail mail system
3 daemon system daemons
4 auth security/authorization messages
5 syslog messages generated internally by syslogd
6 lpr line printer subsystem (archaic subsystem)
7 news network news subsystem (archaic subsystem)
8 uucp UUCP subsystem (archaic subsystem)
9 clock daemon
10 authpriv security/authorization messages
11 ftp FTP daemon
12 - NTP subsystem
13 - log audit
14 - log alert
15 cron scheduling daemon
16 local0 local use 0 (local0)
17 local1 local use 1 (local1)
18 local2 local use 2 (local2)
19 local3 local use 3 (local3)
20 local4 local use 4 (local4)
21 local5 local use 5 (local5)
22 local6 local use 6 (local6)
23 local7 local use 7 (local7)

Severity:

Code Keyword Description
0 emerg System is unusable
1 alert Should be corrected immediately
2 crit Critical conditions
3 err Error conditions
4 warning May indicate that an error will occur if action is not taken.
5 notice Events that are unusual, but not error conditions.
6 info Normal operational messages that require no action.
7 debug Information useful to developers for debugging the application.

针对PRI的计算公式:PRI = FacilityCode*8 + SeverityCode
举个例子: local3.info的日志,它的PRI就是19*8+6=158,所以这条消息在传输中的格式为<158> {HEADER} {MEG}
再一个例子,如果看到一条PRI为14的消息,那么它实际的级别就是user.info (1*8+6=14)

参考:

  1. https://tools.ietf.org/html/rfc3164
  2. https://wiki.archlinux.org/index.php/systemd#Journal

这几天翻了翻项目的代码,看到了一个非常简单的代理程序,使用poll实现,可以在代理过程中输出数据流,基本上算是教科书级别的poll使用例子了,分享一下:

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iterator>
#include <iostream>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <poll.h>

#define PROXY_PORT  8888
#define BIN_TXT     0
#define RAW_TXT     1

static const char *backendHost = 0;
static int backendPort;
static int oflag = RAW_TXT;

bool proxy(int cfd);

// g++ -o proxy proxy.cpp -I. -Wall
int main(int argc, char *argv[])
{
    if (argc == 3 || argc == 4) {
        backendHost = argv[1];  
        backendPort = atoi(argv[2]);    
        if (argc == 4) oflag = atoi(argv[3]);
    } else {
        fprintf(stderr, "usage: %s BackendHost BackendPort oflag\n", argv[0]);  
        exit(-1);
    }

    int fd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in addr;
    memset(&addr, 0x00, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(PROXY_PORT);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int flags = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));

    if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
        fprintf(stderr, "[error] bind error, %s\n", strerror(errno));
        return EXIT_FAILURE;
    }

    listen(fd, 10);

    int cfd;
    while ((cfd = accept(fd, NULL, NULL)) > 0) {
#ifdef USE_FORK
        pid_t pid = fork();
        if (pid == 0) {
#endif
            proxy(cfd);
            close(cfd);
#ifdef USE_FORK
            exit(0);
        } else if (pid == -1) {
            fprintf(stderr, "[error] fork error, %s\n", strerror(errno));   
        } else {
            close(cfd); 
        }
#endif
    }
    close(fd);

    return EXIT_SUCCESS;
}

static int connectBackend()
{
    int sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd == -1) {
        fprintf(stderr, "socket error, %s\n", strerror(errno)); 
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0x00, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(backendPort);
    inet_pton(AF_INET, backendHost, &addr.sin_addr);

    if (connect(sfd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
        fprintf(stderr, "connect backed error, %s\n", strerror(errno));
        close(sfd);
        return -1;
    }

    fprintf(stdout, "[debug %d] connect backend ok\n", getpid());
    return sfd;
}

static void txtPrint(const char *buffer, size_t size)
{
    /* stderr unbuffered */
    if (oflag == BIN_TXT) {
        for (size_t i = 0; i < size; ++i) {
            fprintf(stderr, "%02x", (unsigned char) buffer[i]);
        }
    } else {
        for (size_t i = 0; i < size; ++i) {
            if (isprint((unsigned char) buffer[i])) {
                fprintf(stderr, "%c", buffer[i]);   
            } else {
                fprintf(stderr, "%%%02x", (unsigned char) buffer[i]);
            }
        }
    }
}

bool proxy(int cfd)
{
    const size_t len = 128;
    char buf[len];
    ssize_t n;

    fcntl(cfd, F_SETFL, fcntl(cfd, F_GETFL) | O_NONBLOCK);

    int sfd = connectBackend();
    if (sfd == -1) return false;

    fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK);

    struct pollfd pfd[2] = {
        {cfd, POLLIN, 0},   
        {sfd, POLLIN, 0}
    };

    int nfds = 2;

    bool stop = false;
    while (!stop) {
        int nready = poll(pfd, nfds, -1);
        if (nready == -1) {
            fprintf(stderr, "[error] poll, %s\n", strerror(errno)); 
            close(sfd);
            return false;
        }

        for (int i = 0; i < nfds; ++i) {
            if (pfd[i].revents & POLLIN) {
                while ((n = recv(pfd[i].fd, buf, len, 0)) > 0) {
                    int fd = (pfd[i].fd == cfd) ? sfd : cfd;
                    ssize_t nn;
                    size_t offset = 0;
                    while ((nn = send(fd, buf + offset, n - offset, 0)) > 0) {
                        offset += nn;
                        if (offset == (size_t) n) break;
                    }
                    struct timeval tv;
                    gettimeofday(&tv, 0);
                    if (pfd[i].fd == cfd) {
                        fprintf(stdout, "[debug] %lu:%lu client read\n", (unsigned long) tv.tv_sec, (unsigned long) tv.tv_usec);
                    } else {
                        fprintf(stdout, "[debug] %lu:%lu server read\n", (unsigned long) tv.tv_sec, (unsigned long) tv.tv_usec);
                    }
                    txtPrint(buf, n);
                }
                if (n == 0) {
                    if (pfd[i].fd == cfd) {
                        fprintf(stdout, "[debug] client closed\n");
                    } else {
                        fprintf(stdout, "[debug] server closed\n");
                    }
                    stop = true;
                } else if (n == -1 && errno != EAGAIN) {
                    fprintf(stderr, "[error] read %s error, %s\n", pfd[i].fd == cfd ? "client" : "backend",
                                                                 strerror(errno));  
                    close(sfd);
                    return false;
                }
            }
        }
    }

    fprintf(stdout, "[debug %d] disconnect\n", getpid());

    close(sfd);
    return true;
}

使用g++ -o proxy proxy.cpp -I. -Wall 命令编译,运行时直接指定需要代理的后端IP和Port即可,也支持fork以支持多条链接。

Python中的上下文管理器可以允许精确地分配和释放资源,最常用的就是使用with语句,比如:

with open('/tmp/file_x', 'w') as file_x:
    file_x.write('Hello')

with结束,文件也会被安全的关闭。不用担心回收资源的问题了。

如果一个自定义的类也想支持类似的调用方式,需要实现__enter__(self)__exit__(self, type, value, traceback)这两个方法,具体的:

class File(object):
    def __init__(self, file_name, method):
        self.file_obj = open(file_name, method)
    def __enter__(self):
        return self.file_obj
    def __exit__(self, type, value, traceback):
        self.file_obj.close()

其中__enter__方法将打开的文件返回给with语句。
对于__exit__(self, type, value, traceback)方法,会在with语句退出时调用,如果在执行中发现异常,则异常的type,value和traceback会被传递给__exit__方法,在__exit__中可以对异常进行相应的处理,如果最终
__exit__方法返回None,则认为异常被正确处理了,如果返回的不是None,则这个异常会被with抛出,期待上层进行相应的处理。

除了上面的方法,Python还提供了一个contextlib模块,通过这个模块加上装饰器(decorators)和生成器(generators),也能实现类似的功能:

from contextlib import contextmanager

@contextmanager
def open_file(name):
    f = open(name, 'w')
    yield f
    f.close()

这样在使用中,open_file变成了一个生成器,所以contextmanager可以通过调用这个生成器next()方法控制资源的释放,具体的源代码在这里:

# 代码有所省略,具体可以参考: https://github.com/python/cpython/blob/master/Lib/contextlib.py
class _GeneratorContextManager(_GeneratorContextManagerBase,
                               AbstractContextManager,
                               ContextDecorator):
    """Helper for @contextmanager decorator."""

    def _recreate_cm(self):
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield") from None

    def __exit__(self, type, value, traceback):
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                value = type()
            try:
                self.gen.throw(type, value, traceback)
            except StopIteration as exc:
                return exc is not value
            except RuntimeError as exc:
                if exc is value:
                    return False
                if type is StopIteration and exc.__cause__ is value:
                    return False
                raise
            except:
                if sys.exc_info()[1] is value:
                    return False
                raise
            raise RuntimeError("generator didn't stop after throw()")

def contextmanager(func):
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper

参考:

  1. http://book.pythontips.com/en/latest/context_managers.html
  2. https://github.com/python/cpython/blob/master/Lib/contextlib.py
0%