HeLei Blog

不要因为走得太远,就忘记为什么而出发


  • 首页

  • 分类

  • 归档

  • 标签

  • 搜索
close
HeLei Blog

Redis源码剖析和注释(二)---简单动态字符串

发表于 2018-02-04

1.介绍

Redis兼容传统的C语言字符串类型,但没有直接使用C语言的传统的字符串(以’\0’结尾的字符数组)表示,而是自己构建了一种名为简单动态字符串(simple dynamic string,SDS)的对象。简单动态字符串在Redis数据库中应用很广泛,例如:键值对在底层就是由SDS实现的。

在redis种,有一种数据类型叫string类型,而string类型简单的说就是SDS实现的(简单理解),先通过几个命令来感受一下string类型:

1
2
3
4
5
6
7
8
127.0.0.1:6379> SET str1 Redis //设置key:value = str1:Redis
OK
127.0.0.1:6379> GET str1 //获取str1的value
"Redis"
127.0.0.1:6379> TYPE str1 //获取key的存储类型 string类型
string
127.0.0.1:6379> STRLEN str1 //str1的长度为5字节
(integer) 5

2.SDS的定义

SDS定义在redis源码根目录下的sds.h/sdshdr

1
2
typedef char *sds;
//sds兼容传统C风格字符串,所以起了个别名叫sds,并且可以存放sdshdr结构buf成员的地址

SDS也有一个表头(header)用来存放sds的信息。

1
2
3
4
5
struct sdshdr {
int len; //buf中已占用空间的长度
int free; //buf中剩余可用空间的长度
char buf[]; //初始化sds分配的数据空间,而且是柔性数组(Flexible array member)
};

根据这个结构体,我们用图大概表示一下str1,如下图:
sdshdr

这里写图片描述

  • len为5,表示这个sds长度为5字节。
  • free为2,表示这个sds还有2个字节未使用的空间。
  • buf是一个char[]的数组,分配了(len+1+free)个字节的长度,前len个字节保存着’R’、’e’、’d’、’i’、’s’这5个字符,接下来的1个字节保存着’\0’,剩下的free个字节未使用

3. SDS的优点

SDS本质上就是char *,因为有了表头sdshdr结构的存在,所以SDS比传统C字符串在某些方面更加优秀,并且能够兼容传统C字符串。

3.1 兼容C的部分函数

因为SDS兼容传统的C字符串,采用以’\0’作为结尾,所以SDS就能够使用一部分

3.2 二进制安全(Binary Safe)

因为传统C字符串符合ASCII编码,这种编码的操作的特点就是:遇零则止 。即,当读一个字符串时,只要遇到’\0’结尾,就认为到达末尾,就忽略’\0’结尾以后的所有字符。因此,如果传统字符串保存图片,视频等二进制文件,操作文件时就被截断了。

而SDS表头的buf被定义为字节数组,因为判断是否到达字符串结尾的依据则是表头的len成员,这意味着它可以存放任何二进制的数据和文本数据,包括’\0’,如下图:
buf

3.3 获得字符串长度的操作复杂度为O(1)

传统的C字符串获得长度时的做法:遍历字符串的长度,遇零则止,复杂度为O(n)。

而SDS表头的len成员就保存着字符串长度,所以获得字符串长度的操作复杂度为O(1)。

3.4 杜绝缓冲区溢出

因为SDS表头的free成员记录着buf字符数组中未使用空间的字节数,所以,在进行APPEND命令向字符串后追加字符串时,如果不够用会先进行内存扩展,在进行追加。

总之,正是因为表头的存在,使得redis的字符串有这么多优点

4. SDS源码剖析

4.1 SDS内存分配策略—空间预分配

空间预分配策略用于优化SDS的字符串增长操作。

如果对SDS进行修改后,SDS表头的len成员小于1MB,那么就会分配和len长度相同的未使用空间。free和len成员大小相等。
如果对SDS进行修改后,SDS的长度大于等于1MB,那么就会分配1MB的未使用空间。
通过空间预分配策略,Redis可以减少连续执行字符串增长操作所需的内存重分配次数。

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sds sdsMakeRoomFor(sds s, size_t addlen) { //对 sds 中 buf 的长度进行扩展
struct sdshdr *sh, *newsh;
size_t free = sdsavail(s); //获得s的未使用空间长度
size_t len, newlen;
//free的长度够用不用扩展直接返回
if (free >= addlen) return s;
//free长度不够用,需要扩展
len = sdslen(s); //获得s字符串的长度
sh = (void*) (s-(sizeof(struct sdshdr))); //获取表头地址
newlen = (len+addlen); //扩展后的新长度
//空间预分配
//#define SDS_MAX_PREALLOC (1024*1024)
//预先分配内存的最大长度为 1MB
if (newlen < SDS_MAX_PREALLOC) //新长度小于“最大预分配长度”,就直接将扩展的新长度乘2
newlen *= 2;
else
newlen += SDS_MAX_PREALLOC; //新长度大于“最大预分配长度”,就在加上一个“最大预分配长度”
newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1); //获得新的扩展空间的地址
if (newsh == NULL) return NULL;
newsh->free = newlen - len; //更新新空间的未使用的空间free
return newsh->buf;
}

4.2 SDS内存释放策略—惰性空间释放

惰性空间释放用于优化SDS的字符串缩短操作。

  • 当要缩短SDS保存的字符串时,程序并不立即使用内存充分配来回收缩短后多出来的字节,而是使用表头的free成员将这些字节记录起来,并等待将来使用。
    源代码如下:
    1
    2
    3
    4
    5
    6
    void sdsclear(sds s) { //重置sds的buf空间,懒惰释放
    struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
    sh->free += sh->len; //表头free成员+已使用空间的长度len = 新的free
    sh->len = 0; //已使用空间变为0
    sh->buf[0] = '\0'; //字符串置空
    }

4.3 Redis源码注释

  1. 在sds.h文件中,有两个static inline的函数,分别是sdslen和sdsavail函数,你可以把它认为是一个static的函数,加上了inline的属性。而inline关键字仅仅是建议编译器做内联展开处理,而不是强制。

  2. 在sds.c中,几乎所有的函数所传的参数都是sds类型,而非表头sdshdr的地址,但是使用了通过sds指针运算从而求得表头的地址的技巧,因为sds是指向sdshdr结构buf成员的。通过sds.h/sdslen函数,来分析:

这里的关键就是sds类型是指向sdshdr结构buf成员。

  • struct sdshdr结构共有三个变量,其中sds指向的buf成员是一个柔性数组,它仅仅起到占位符的作用,并不占用该结构体的大小,因此sizeof(sizeof(struct sdshdr))大小为8字节。
  • 由于一个SDS类型的内存是通过动态内存分配的,所以它的内存在堆区,堆由下往上增长,因此sds指针减区sizeof(struct sdshdr)的大小就得到了表头的地址,然后就可以通过”->”访问表头的成员。如下图:
    sds&sdshdr
    1
    2
    3
    4
    5
    6
    static inline size_t sdslen(const sds s) { //计算buf中字符串的长度
    struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr))); //s指针地址减去结构体大小就是结构体的地址
    return sh->len;
    }

通过这种技巧,将表头结构隐藏起来,只对外公开sds类型。

  • sds.h文件注释
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    #ifndef __SDS_H
    #define __SDS_H
    #define SDS_MAX_PREALLOC (1024*1024) //预先分配内存的最大长度为1MB
    #include <sys/types.h>
    #include <stdarg.h>
    typedef char *sds; //sds兼容传统C风格字符串,所以起了个别名叫sds,并且可以存放sdshdr结构buf成员的地址
    struct sdshdr {
    unsigned int len; //buf中已占用空间的长度
    unsigned int free; //buf中剩余可用空间的长度
    char buf[]; //初始化sds分配的数据空间,而且是柔性数组(Flexible array member)
    };
    static inline size_t sdslen(const sds s) { //计算buf中字符串的长度
    struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
    return sh->len;
    }
    static inline size_t sdsavail(const sds s) { //计算buf中的未使用空间的长度
    struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
    return sh->free;
    }
    sds sdsnewlen(const void *init, size_t initlen); //创建一个长度为initlen的字符串,并保存init字符串中的值
    sds sdsnew(const char *init); //创建一个默认长度的字符串
    sds sdsempty(void); //建立一个只有表头,字符串为空"\0"的sds
    size_t sdslen(const sds s); //计算buf中字符串的长度
    sds sdsdup(const sds s); //拷贝一份s的副本
    void sdsfree(sds s); //释放s字符串和表头
    size_t sdsavail(const sds s); //计算buf中的未使用空间的长度
    sds sdsgrowzero(sds s, size_t len); //将sds扩展制定长度并赋值为0
    sds sdscatlen(sds s, const void *t, size_t len); //将字符串t追加到s表头的buf末尾,追加len个字节
    sds sdscat(sds s, const char *t); //将t字符串拼接到s的末尾
    sds sdscatsds(sds s, const sds t); //将sds追加到s末尾
    sds sdscpylen(sds s, const char *t, size_t len); //将字符串t覆盖到s表头的buf中,拷贝len个字节
    sds sdscpy(sds s, const char *t); //将字符串覆盖到s表头的buf中
    sds sdscatvprintf(sds s, const char *fmt, va_list ap); //打印函数,被 sdscatprintf 所调用
    #ifdef __GNUC__
    sds sdscatprintf(sds s, const char *fmt, ...) //打印任意数量个字符串,并将这些字符串追加到给定 sds 的末尾
    __attribute__((format(printf, 2, 3)));
    #else
    sds sdscatprintf(sds s, const char *fmt, ...); //打印任意数量个字符串,并将这些字符串追加到给定 sds 的末尾
    #endif
    sds sdscatfmt(sds s, char const *fmt, ...); //格式化打印多个字符串,并将这些字符串追加到给定 sds 的末尾
    sds sdstrim(sds s, const char *cset); //去除sds中包含有 cset字符串出现字符 的字符
    void sdsrange(sds s, int start, int end); //根据start和end区间截取字符串
    void sdsupdatelen(sds s); //更新字符串s的长度
    void sdsclear(sds s); //将字符串重置保存空间,懒惰释放
    int sdscmp(const sds s1, const sds s2); //比较两个sds的大小,相等返回0
    sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count); //使用长度为seplen的sep分隔符对长度为len的s进行分割,返回一个sds数组的地址,*count被设置为数组元素数量
    void sdsfreesplitres(sds *tokens, int count); //释放tokens中的count个sds元素
    void sdstolower(sds s); //将sds字符串所有字符转换为小写
    void sdstoupper(sds s); //将sds字符串所有字符转换为大写
    sds sdsfromlonglong(long long value); //根据long long value创建一个SDS
    sds sdscatrepr(sds s, const char *p, size_t len); //将长度为len的字符串p以带引号""的格式追加到s末尾
    sds *sdssplitargs(const char *line, int *argc); //参数拆分,主要用于 config.c 中对配置文件进行分析。
    sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen); //将s中所有在 from 中的字符串,替换成 to 中的字符串
    sds sdsjoin(char **argv, int argc, char *sep); //以分隔符连接字符串子数组构成新的字符串
    /* Low level functions exposed to the user API */
    sds sdsMakeRoomFor(sds s, size_t addlen); //对 sds 中 buf 的长度进行扩展
    void sdsIncrLen(sds s, int incr); //根据incr的正负,移动字符串末尾的'\0'标志
    sds sdsRemoveFreeSpace(sds s); //回收sds中的未使用空间
    size_t sdsAllocSize(sds s); //获得sds所有分配的空间
    #endif
HeLei Blog

谈谈Java并发(二)

发表于 2018-01-03

http://www.cnblogs.com/shamo89/p/6684960.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class Concurrent {
public final static int THREAD_POOL_SIZE = 5;
public static Map<String, Integer> crunchifyHashTableObject = null;
public static Map<String, Integer> crunchifySynchronizedMapObject = null;
public static Map<String, Integer> crunchifyConcurrentHashMapObject = null;
public static void main(String[] args) throws InterruptedException {
// Test with Hashtable Object
crunchifyHashTableObject = new Hashtable<String, Integer>();
crunchifyPerformTest(crunchifyHashTableObject);
// Test with synchronizedMap Object
crunchifySynchronizedMapObject = Collections.synchronizedMap(new HashMap<String, Integer>());
crunchifyPerformTest(crunchifySynchronizedMapObject);
// Test with ConcurrentHashMap Object
crunchifyConcurrentHashMapObject = new ConcurrentHashMap<String, Integer>();
crunchifyPerformTest(crunchifyConcurrentHashMapObject);
}
public static void crunchifyPerformTest(final Map<String, Integer> crunchifyThreads) throws InterruptedException {
System.out.println("Test started for: " + crunchifyThreads.getClass());
long averageTime = 0;
for (int i = 0; i < 5; i++) {
long startTime = System.nanoTime();
ExecutorService crunchifyExServer = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
for (int j = 0; j < THREAD_POOL_SIZE; j++) {
crunchifyExServer.execute(new Runnable() {
@SuppressWarnings("unused")
public void run() {
for (int i = 0; i < 500000; i++) {
Integer crunchifyRandomNumber = (int) Math.ceil(Math.random() * 550000);
// Retrieve value. We are not using it anywhere
Integer crunchifyValue = crunchifyThreads.get(String.valueOf(crunchifyRandomNumber));
// Put value
crunchifyThreads.put(String.valueOf(crunchifyRandomNumber), crunchifyRandomNumber);
}
}
});
}
// Make sure executor stops
crunchifyExServer.shutdown();
// Blocks until all tasks have completed execution after a shutdown request
crunchifyExServer.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
long entTime = System.nanoTime();
long totalTime = (entTime - startTime) / 1000000L;
averageTime += totalTime;
System.out.println("2500K entried added/retrieved in " + totalTime + " ms");
}
System.out.println("For " + crunchifyThreads.getClass() + " the average time is " + averageTime / 5 + " ms\n");
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Test started for: class java.util.Hashtable
2500K entried added/retrieved in 2585 ms
2500K entried added/retrieved in 1645 ms
2500K entried added/retrieved in 1612 ms
2500K entried added/retrieved in 1615 ms
2500K entried added/retrieved in 1611 ms
For class java.util.Hashtable the average time is 1813 ms
Test started for: class java.util.Collections$SynchronizedMap
2500K entried added/retrieved in 2186 ms
2500K entried added/retrieved in 1730 ms
2500K entried added/retrieved in 1735 ms
2500K entried added/retrieved in 1693 ms
2500K entried added/retrieved in 1435 ms
For class java.util.Collections$SynchronizedMap the average time is 1755 ms
Test started for: class java.util.concurrent.ConcurrentHashMap
2500K entried added/retrieved in 941 ms
2500K entried added/retrieved in 2475 ms
2500K entried added/retrieved in 768 ms
2500K entried added/retrieved in 641 ms
2500K entried added/retrieved in 770 ms
For class java.util.concurrent.ConcurrentHashMap the average time is 1119 ms

性能表现:ConcurrentHashMap > SynchronizedMap > Hashtable

HeLei Blog

谈谈Java并发(一)

发表于 2017-12-15

SimpleDateFormat是一个常用的类日期格式化类,这个类不是线程安全的,在多线程环境下调用 format() 和 parse() 方法应该使用同步代码来避免问题。也借由这个问题,进一步来探讨下如何在多线程并发执行的程序中写出安全、高效的代码。

##发现问题
例如我们要把时间格式化后再使用,很容易我们可以写出这样一个DateUtil,每次处理一个请求的时候,就需要创建一个SimpleDateFormat实例对象,然后再丢弃这个对象。大量的对象就这样被创建出来,占用大量的内存和jvm空间。

1
2
3
4
5
6
7
8
9
10
11
12
public class DateUtil {
public static String formatDate(Date date)throws ParseException{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date);
}
public static Date parse(String strDate) throws ParseException{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.parse(strDate);
}
}

你也许会说,OK,那我就创建一个静态的simpleDateFormat实例,然后放到一个DateUtil类(如下)
中,在使用时直接使用这个实例进行操作,这样问题就解决了。改进后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.peidasoft.dateformat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtil {
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static String formatDate(Date date)throws ParseException{
return sdf.format(date);
}
public static Date parse(String strDate) throws ParseException{
return sdf.parse(strDate);
}
}

当然,这个方法的确很不错,在大部分的时间里面都会工作得很好。但当你在生产环境中使用一段时间之后,你就会发现这么一个事实:它不是线程安全的。在正常的测试情况之下,都没有问题,但一旦在生产环境中一定负载情况下时,这个问题就出来了。他会出现各种不同的情况,比如转化的时间不正确,比如报错,比如线程被挂死等等。我们看下面的测试用例,那事实说话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DateUtilTest {
public static class TestSimpleDateFormatThreadSafe extends Thread {
@Override
public void run() {
while(true) {
try {
this.join(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
System.out.println(this.getName()+":"+DateUtil.parse("2013-05-24 06:02:20"));
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 3; i++){
new TestSimpleDateFormatThreadSafe().start();
}
}
}

执行输出如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Exception in thread "Thread-1" java.lang.NumberFormatException: multiple points
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1082)
at java.lang.Double.parseDouble(Double.java:510)
at java.text.DigitList.getDouble(DigitList.java:151)
at java.text.DecimalFormat.parse(DecimalFormat.java:1302)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1589)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1311)
at java.text.DateFormat.parse(DateFormat.java:335)
at com.peidasoft.orm.dateformat.DateNoStaticUtil.parse(DateNoStaticUtil.java:17)
at com.peidasoft.orm.dateformat.DateUtilTest$TestSimpleDateFormatThreadSafe.run(DateUtilTest.java:20)
Exception in thread "Thread-0" java.lang.NumberFormatException: multiple points
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1082)
at java.lang.Double.parseDouble(Double.java:510)
at java.text.DigitList.getDouble(DigitList.java:151)
at java.text.DecimalFormat.parse(DecimalFormat.java:1302)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1589)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1311)
at java.text.DateFormat.parse(DateFormat.java:335)
at com.peidasoft.orm.dateformat.DateNoStaticUtil.parse(DateNoStaticUtil.java:17)
at com.peidasoft.orm.dateformat.DateUtilTest$TestSimpleDateFormatThreadSafe.run(DateUtilTest.java:20)
Thread-2:Mon May 24 06:02:20 CST 2021
Thread-2:Fri May 24 06:02:20 CST 2013
Thread-2:Fri May 24 06:02:20 CST 2013
Thread-2:Fri May 24 06:02:20 CST 2013

##分析问题
SimpleDateFormat继承了DateFormat,在DateFormat中定义了一个protected属性的 Calendar类的对象:calendar。只是因为Calendar累的概念复杂,牵扯到时区与本地化等等,Jdk的实现中使用了成员变量来传递参数,这就造成在多线程的时候会出现错误。

##解决问题

  1. 需要的时候创建新实例:
1
2
3
4
5
6
7
8
9
10
11
12
public class DateUtil {
public static String formatDate(Date date)throws ParseException{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date);
}
public static Date parse(String strDate) throws ParseException{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.parse(strDate);
}
}

说明:在需要用到SimpleDateFormat 的地方新建一个实例,不管什么时候,将有线程安全问题的对象由共享变为局部私有都能避免多线程问题,不过也加重了创建对象的负担。在一般情况下,这样其实对性能影响比不是很明显的。

  1. 使用同步:同步SimpleDateFormat对象
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class DateSyncUtil {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static String formatDate(Date date)throws ParseException{
    synchronized(sdf){
    return sdf.format(date);
    }
    }
    public static Date parse(String strDate) throws ParseException{
    synchronized(sdf){
    return sdf.parse(strDate);
    }
    }
    }

说明:当线程较多时,当一个线程调用该方法时,其他想要调用此方法的线程就要block,多线程并发量大的时候会对性能有一定的影响。

  1. 使用ThreadLocal: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class ConcurrentDateUtil {
    private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>() {
    @Override
    protected DateFormat initialValue() {
    return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }
    };
    public static Date parse(String dateStr) throws ParseException {
    return threadLocal.get().parse(dateStr);
    }
    public static String format(Date date) {
    return threadLocal.get().format(date);
    }
    }

    另外一种写法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ThreadLocalDateUtil {
    private static final String date_format = "yyyy-MM-dd HH:mm:ss";
    private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>();
    public static DateFormat getDateFormat()
    {
    DateFormat df = threadLocal.get();
    if(df==null){
    df = new SimpleDateFormat(date_format);
    threadLocal.set(df);
    }
    return df;
    }
    public static String formatDate(Date date) throws ParseException {
    return getDateFormat().format(date);
    }
    public static Date parse(String strDate) throws ParseException {
    return getDateFormat().parse(strDate);
    }
    }

说明:使用ThreadLocal, 也是将共享变量变为独享,线程独享肯定能比方法独享在并发环境中能减少不少创建对象的开销。如果对性能要求比较高的情况下,一般推荐使用这种方法。

4.抛弃JDK,使用其他类库中的时间格式化类:
a. 使用Apache commons 里的FastDateFormat,宣称是既快又线程安全的SimpleDateFormat, 可惜它只能对日期进行format, 不能对日期串进行解析。
b. 使用Joda-Time类库来处理时间相关问题

##总结

1
2
3
4
5
由SimpleDateFormat的线程安全问题,我们在遇到多线程并发问题是可以得到如下的解决方案:
1. 函数内单独创建对象
2. 采用同步代码
3. 用threadlocal变量
4. 采用线程安全的第三方库

HeLei Blog

比较read/write & fread/fwrite

发表于 2017-12-07 | 分类于 Linux

UNIX环境下的C对二进制流文件的读写有两套:

  1. fopen,fread,fwrite,fprintf
  2. open, read, write

##区别

  1. fopen 系列是标准的C库函数;open系列是 POSIX 定义的,是UNIX系统里的system call。
    也就是说,fopen系列更具有可移植性;而open系列只能用在 POSIX 的操作系统上。
  2. 使用fopen 系列函数时要定义一个指代文件的对象,被称为“文件句柄”(file handler),是一个结构体;而open系列使用的是一个被称为“文件描述符” (file descriptor)的int型整数。
  3. fopen 系列是级别较高的I/O,读写时使用缓冲;而open系列相对低层,更接近操作系统,读写时没有缓冲。由于能更多地与操作系统打交道,open系列可以访问更改一些fopen系列无法访问的信息,如查看文件的读写权限。这些额外的功能通常因系统而异。
  4. 使用fopen系列函数需要”#include “;使用open系列函数需要”#include “ ,链接时要之用libc(-lc)

##在开源项目中的运用
redis中写日志使用的是第一套,下面是redis的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void serverLogRaw(int level, const char *msg) {
const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
const char *c = ".-*#";
FILE *fp;
char buf[64];
int rawmode = (level & LL_RAW);
int log_to_stdout = server.logfile[0] == '\0';
level &= 0xff; /* clear flags */
if (level < server.verbosity) return;
fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
if (!fp) return;
if (rawmode) {
fprintf(fp,"%s",msg);
} else {
int off;
struct timeval tv;
int role_char;
pid_t pid = getpid();
gettimeofday(&tv,NULL);
off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec));
snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
if (server.sentinel_mode) {
role_char = 'X'; /* Sentinel. */
} else if (pid != server.pid) {
role_char = 'C'; /* RDB / AOF writing child. */
} else {
role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
}
fprintf(fp,"%d:%c %s %c %s\n",
(int)getpid(),role_char, buf,c[level],msg);
}
fflush(fp);
if (!log_to_stdout) fclose(fp);
if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
}

google开源的日志系统glog写文件操作也采用第一套io函数,在函数LogFileObject::Write

HeLei Blog

TCP状态机分析(二)CLOSE_WAIT&TIME_WAIT

发表于 2017-11-29 | 分类于 network

CLOSE_WAIT

在被动关闭连接(对方关闭)情况下,在已经接收到FIN,但是还没有发送自己的FIN的时刻,连接处于CLOSE_WAIT状态。出现大量close_wait的现象,主要原因是某种情况下对方关闭了socket链接,但是我方忙与读或者写,没有关闭连接。代码需要判断socket,一旦读到0,断开连接,read返回负,检查一下errno,如果不是AGAIN,就断开连接。

解决方法:
基本的思想就是要检测出对方已经关闭的socket,然后关闭它。

  1. 代码需要判断socket,一旦read返回0,断开连接,read返回负,检查一下errno,如果不是AGAIN,也断开连接。(注:在UNP 7.5节的图7.6中,可以看到使用select能够检测出对方发送了FIN,再根据这条规则就可以处理CLOSE_WAIT的连接)
  2. 给每一个socket设置一个时间戳last_update,每接收或者是发送成功数据,就用当前时间更新这个时间戳。定期检查所有的时间戳,如果时间戳与当前时间差值超过一定的阈值,就关闭这个socket。
  3. 使用一个Heart-Beat线程,定期向socket发送指定格式的心跳数据包,如果接收到对方的RST报文,说明对方已经关闭了socket,那么我们也关闭这个socket。
  4. 设置SO_KEEPALIVE选项,并修改内核参数,下面会详细介绍
    1
    2
    a. 参数设置
    查看相关的参数,这是当前线上环境的配置

sysctl -a|grep tcp_keepalive
net.ipv4.tcp_keepalive_intvl = 300
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_time = 75

1
2
参数的含义

tcp_keepalive_intvl: 在TCP保活打开的情况下,最后一次数据交换到TCP发送第一个保活探测包的间隔,即允许的持续空闲时长,或者说每次正常发送心跳的周期。
tcp_keepalive_probes: 在tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次)
tcp_keepalive_time: tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包的发送频率,默认值为75s。

1
2
设置相关的参数

sysctl -w net.ipv4.tcp_keepalive_time = 75
也可以直接打开/etc/sysctl.conf
加入net.ipv4.tcp_keepalive_time = 75
``

让参数生效
sysctl -p

(tcp_keepalive_time + tcp_keepalive_intvl * tcp_keepalive_probes)时间内可检测到连接失效与否。

b. 开启keepalive属性
int keepAlive = 1;
setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keepAlive, sizeof(keepAlive));

c. 系统调用设置
这样只会影响单个连接,上面修改内核参数会影响所有设置keepalive属性的连接

#include

#include

#include

int keepAlive = 1; // 开启keepalive属性
int keepIdle = 1800; // 如该连接在1800秒内没有任何数据往来,则进行探测
int keepInterval = 3; // 探测时发包的时间间隔为3秒
int keepCount = 2; // 探测尝试的次数.如果第1次探测包就收到响应了,则后几次的不再发.
setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void)&keepAlive, sizeof(keepAlive));
setsockopt(client_fd, SOL_TCP, TCP_KEEPIDLE, (void
)&keepIdle, sizeof(keepIdle));
setsockopt(client_fd, SOL_TCP,TCP_KEEPINTVL, (void )&keepInterval, sizeof(keepInterval));
setsockopt(client_fd, SOL_TCP, TCP_KEEPCNT, (void
)&keepCount, sizeof(keepCount));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
这篇文章对于一次生产环境遇到的CLOSE_WAIT问题分析很到位
转载:https://mp.weixin.qq.com/s?__biz=MzI4MjA4ODU0Ng==&mid=402163560&idx=1&sn=5269044286ce1d142cca1b5fed3efab1&3rd=MzA3MDU4NTYzMw==&scene=6#rd
### 总结
1. 默认情况下使用keepalive周期为2个小时,如不选择更改,属于误用范畴,造成资源浪费:内核会为每一个连接都打开一个保活计时器,N个连接会打开N个保活计时器。 优势很明显:
* TCP协议层面保活探测机制,系统内核完全替上层应用自动给做好了
* 内核层面计时器相比上层应用,更为高效
* 上层应用只需要处理数据收发、连接异常通知即可
* 数据包将更为紧凑
2. 关闭TCP的keepalive,完全使用业务层面心跳保活机制 完全应用掌管心跳,灵活和可控,比如每一个连接心跳周期的可根据需要减少或延长
业务心跳 + TCP keepalive一起使用,互相作为补充,但TCP保活探测周期和应用的心跳周期要协调,以互补方可,不能够差距过大,否则将达不到设想的效果。朋友的公司所做IM平台业务心跳2-5分钟智能调整 + tcp keepalive 300秒,组合协作,据说效果也不错。
虽然说没有固定的模式可遵循,那么有以下原则可以参考:
1. 不想折腾,那就弃用TCP Keepalive吧,完全依赖应用层心跳机制,灵活可控性强
2. 除非可以很好把控TCP Keepalive机制,那就可以根据需要自由使用吧
### http keep-alive与tcp keep-alive
http keep-alive与tcp keep-alive,不是同一回事,意图不一样。http keep-alive是为了让tcp活得更久一点,以便在同一个连接上传送多个http,提高socket的效率。而tcp keep-alive是TCP的一种检测TCP连接状况的保活机制。
## TIME_WAIT
主动发起close的一方会出现TIME_WAIT,比如nginx到web server之间,TIME_WAIT状态是为了保护TCP协议的正确性,避免端口发生复用后老的TCP连接残留在网络上的报文进入新的连接里。但这也引入了一个问题,临时端口数量有限,耗尽后,新建连接就会报错EADDRNOTAVAIL
我们来看下,为什么这个状态能影响到一个处理大量连接的服务器,从下面三个方面来说:
* 新老连接(相同四元组)在TCP连接表中的slot复用避免
* 内核中,socket结构体的内存占用
* 额外的CPU开销
### 解决方法
* 首先要增加临时端口的数量,增加可被消耗的临时端口资源

sysctl -w “net.ipv4.ip_local_port_range=1024 65535”

1
2
3
* 然后要加速临时端口回收,可以对内核参数做优化(/etc/sysctl.conf)
1. 启用tcp_tw_reuse

sysctl -w net.ipv4.tcp_timestamps=1
sysctl -w net.ipv4.tcp_tw_reuse=1
sysctl -p

1
2
3
4
5
它定义了一个新的TCP选项–两个四字节的timestamp fields时间戳字段,第一个是TCP发送方的当前时钟时间戳,而第二个是从远程主机接收到的最新时间戳。
启用net.ipv4.tcp_tw_reuse后,如果新的时间戳,比以前存储的时间戳更大,那么linux将会从TIME-WAIT状态的存活连接中,选取一个,重新分配给新的连接出去的TCP连接。
连出的TIME-WAIT状态连接,仅仅1秒后就可以被重用了
2. 方法更激进些,启用tw_recycle,tw_recycle允许在两个RTT。当多个客户端处于NAT后时,在服务器端开启tw_recycle会引起丢包问题,如果丢SYN包,就会造成新建连接失败

sysctl -w net.ipv4.tcp_timestamps=1
sysctl -w net.ipv4.tcp_tw_recycle=1
sysctl -p

1
2
3
4
5
6
7
8
3. 给socket配置SO_LINGER,on设为1,linger设为0,这样关闭连接后TCP状态从ESTAB直接进入CLOSED,向服务器发rst包而不是fin包来关闭连接。这种方法风险最高,会丢弃buffer里未发送完的数据,不过通过设计协议(客户端和服务器协议上协商后再关闭TCP连接)可以规避这个问题,使用需要小心,选择合适的场景。
### 代码验证
"纸上得来终觉浅,绝知此事要躬行", 下面我们来验证这样的方法是否真的可行
1. TIME_WAIT过多会耗尽端口,为了模拟端口耗尽的情况,先修改本地临时端口只有一个可用

sysctl -w “net.ipv4.ip_local_port_range=20000 20000”

1
2
2. 启动一个server进程

./tcp_server05 127.0.0.1

1
2
3. 第一个客户端连接

./tcp_client01 127.0.0.1

1
2
3
4
5
关闭client,查看连接状态
![img01](/2017/11/29/network-tcp-machine-state/tcp_machine_state.png)
4. 第二个客户端连接

chris@ubuntu:~/myspace/test/network$ ./tcp_client01 127.0.0.1
connect: Cannot assign requested address

1
2
5. 修改系统参数

sysctl -w net.ipv4.tcp_tw_reuse=1
```

  1. 重复3、4,可以看到可以成功建立连接

参数说明

  • tcp_tw_recycle (Boolean; default: disabled; since Linux 2.4)
    Enable fast recycling of TIME_WAIT sockets. Enabling this option is not recommended since this causes problems when working with NAT (Network Address Translation).
  • tcp_tw_reuse (Boolean; default: disabled; since Linux 2.4.19/2.6)
    Allow to reuse TIME_WAIT sockets for new connections when it is safe from protocol viewpoint. It should not be changed without advice/request of technical experts.

参考文献

  • http://www.cnxct.com/coping-with-the-tcp-time_wait-state-on-busy-linux-servers-in-chinese-and-dont-enable-tcp_tw_recycle/
HeLei Blog

<>笔记(一)

发表于 2017-10-30

##c++主要内存问题及解决方法
1、缓冲区溢出
solution:使用vector、string或自己编写的BufferClass来管理缓冲区,记录缓冲区的长度,并通过成员函数而不是裸指针修改缓冲区。

2、空悬指针/野指针
solution:shared_ptr/weak_ptr

3、重复释放
solution:scoped_ptr,只在对象析构时候释放一次

4、内存泄漏
solution:scoped_ptr,对象析构时候自动释放内存

5、不配对的new[]/delete
solution:把new[]替换为vector/scoped_array

6、内存碎片
solution:todo

##shared_ptr是否线程安全
shared_ptr计数操作是线程安全的,release 1.33.0后在大多数系统中采用无锁的原子操作实现;但对于对象本身的访问不是线程安全的。对于shared_ptr的线程安全问题,boost官方文档中作了详细说明, http://www.boost.org/doc/libs/1_65_1/libs/smart_ptr/doc/html/smart_ptr.html#shared_ptr
这里作了下总结:

  1. 多个线程可以同时读一个shared_ptr实例
  2. 不同的线程中可以对不同的shared_ptr实例进行“写操作”(包括operator=、reset、析构)
  3. 一个shared_ptr实例被不同的线程同时读写是不安全的

代码例子:
Reading a shared_ptr from two threads

1
2
3
4
5
6
7
shared_ptr<int> p(new int(42));
// thread A
shared_ptr<int> p2(p); // reads p
// thread B
shared_ptr<int> p3(p); // OK, multiple reads are safe

Writing different shared_ptr instances from two threads

1
2
3
4
5
// thread A
p.reset(new int(1912)); // writes p
// thread B
p2.reset(); // OK, writes p2

Reading and writing a shared_ptr from two threads

1
2
3
4
5
// thread A
p = p3; // reads p3, writes p
// thread B
p3.reset(); // writes p3; undefined, simultaneous read/write

Reading and destroying a shared_ptr from two threads

1
2
3
4
5
// thread A
p3 = p2; // reads p2, writes p3
// thread B
// p2 goes out of scope: undefined, the destructor is considered a "write access"

Writing a shared_ptr from two threads

1
2
3
4
5
// thread A
p3.reset(new int(1));
// thread B
p3.reset(new int(2)); // undefined, multiple writes

HeLei Blog

redis-rdb

发表于 2017-10-17 | 分类于 Redis

Reids是一个内存型数据库,所有的数据都存放在内存中。这种模式的缺点就是一旦服务器关闭后会立刻丢失所有存储的数据,Redis当然要避免这种情况的发生,于是其提供了两种持久化机制:RDB和AOF。它们的功能都是将内存中存放的数据保存到磁盘文件上,等到服务器下次开启时能重载数据,以免数据丢失。今天,我们先来剖析一下RDB持久化机制。

##RDB概述
开启一个redis-cli,执行添加数据操作如下

1
2
3
4
5
6
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> set name chris
OK
127.0.0.1:6379> save
OK

我先开启了一个Redis客户端,清空数据,然后依次添加了一个键值对到数据库,最后通过SAVE文件将数据库中的数据保存到rdb文件中,实现数据的持久化,服务器会显示数据已经存放在磁盘文件上。

1
6228:M 17 Oct 05:15:16.846 * DB saved on disk

保存到磁盘的文件名为dump.rdb,利用od命令就能查看里面的数据

1
2
3
4
5
6
7
8
9
chris@ubuntu:~/software/redis-3.2.6/src$ od -c dump.rdb
0000000 R E D I S 0 0 0 7 372 \t r e d i s
0000020 - v e r 005 3 . 2 . 6 372 \n r e d i
0000040 s - b i t s 300 @ 372 005 c t i m e 302
0000060 T 364 345 Y 372 \b u s e d - m e m 302 350
0000100 H \b \0 376 \0 373 001 \0 \0 004 n a m e 005 c
0000120 h r i s 376 002 373 001 \0 \0 004 n a m e 005
0000140 c h r i s 377 326 354 a i 227 . 350 %
0000156

RDB文件标识和版本号:REDIS0007 Redis版本:redis-ver 3.2.3
Redis系统位数(32位或64位):redis-bits 系统时间:ctime
内存使用量:used-mem 一组键值对:name-chris

##RDB文件结构
| REDIS | db_version | databases | EOF | checksum |

HeLei Blog

jvm04

发表于 2017-10-11

jstat -gcutil 25354 500 3
S0 S1 E O P YGC YGCT FGC FGCT GCT
96.48 0.00 29.45 57.62 89.07 17974 409.657 18 7.916 417.572
96.48 0.00 30.32 57.62 89.07 17974 409.657 18 7.916 417.572
96.48 0.00 31.14 57.62 89.07 17974 409.657 18 7.916 417.572

after restart:
3733 work 20 0 6525m 1.4g 18m S 9.3 2.3 1:45.08 java

jstat -gcutil 3733 500 3
S0 S1 E O P YGC YGCT FGC FGCT GCT
0.00 21.84 13.14 29.65 97.65 19 0.403 2 0.162 0.565
0.00 21.84 14.02 29.65 97.65 19 0.403 2 0.162 0.565
0.00 21.84 14.42 29.65 97.65 19 0.403 2 0.162 0.565

HeLei Blog

jvm03

发表于 2017-10-11

Sun JVM参数选项的官方文档
http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html

HeLei Blog

docker(一)

发表于 2017-10-11

简介

Docker是一个开源的引擎,可以轻松的为任何应用创建一个轻量级的、可移植的、自给自足的容器。开发者在笔记本上编译测试通过的容器可以批量地在生产环境中部署,包括VMs(虚拟机)、bare metal、OpenStack 集群和其他的基础应用平台。
Docker通常用于如下场景:

  1. web应用的自动化打包和发布;
  2. 自动化测试和持续集成、发布;
  3. 在服务型环境中部署和调整数据库或其他的后台应用;
  4. 从头编译或者扩展现有的OpenShift或Cloud Foundry平台来搭建自己的PaaS环境。

docker vs 虚拟机

容器与虚拟机拥有着类似的使命:对应用程序及其关联性进行隔离,从而构建起一套能够随处运

行的自容纳单元。此外,容器与虚拟机还摆脱了对物理硬件的需求,允许我们更为高效地使用计算资源,从而提升能源效率与成本效益。
  
  虚拟机会将虚拟硬件、内核(即操作系统)以及用户空间打包在新虚拟机当中,虚拟机能够利用“虚拟机管理程序”运行在物理设备之上。虚拟机依赖于hypervisor,其通常被安装在“裸金属”系统硬件之上,这导致hypervisor在某些方面被认为是一种操作系统。一旦 hypervisor安装完成, 就可以从系统可用计算资源当中分配虚拟机实例了,每台虚拟机都能够获得唯一的操作系统和负载(应用程序)。简言之,虚拟机先需要虚拟一个物理环境,然后构建一个完整的操作系统,再搭建一层Runtime,然后供应用程序运行。
  
   对于容器环境来说,不需要安装主机操作系统,直接将容器层(比如LXC或libcontainer)安装在主机操作系统(通常是Linux变种)之上。在安装完容器层之后,就可以从系统可用计算资源当中分配容器实例了,并且企业应用可以被部署在容器当中。但是,每个容器化应用都会共享相同的操作系统(单个主机操作系统)。容器可以看成一个装好了一组特定应用的虚拟机,它直接利用了宿主机的内核,抽象层比虚拟机更少,更加轻量化,启动速度极快。

  相比于虚拟机,容器拥有更高的资源使用效率,因为它并不需要为每个应用分配单独的操作系统——实例规模更小、创建和迁移速度也更快。这意味相比于虚拟机,单个操作系统能够承载更多的容器。云提供商十分热衷于容器技术,因为在相同的硬件设备当中,可以部署数量更多的容器实例。此外,容器易于迁移,但是只能被迁移到具有兼容操作系统内核的其他服务器当中,这样就会给迁移选择带来限制。
  
  因为容器不像虚拟机那样同样对内核或者虚拟硬件进行打包,所以每套容器都拥有自己的隔离化用户空间,从而使得多套容器能够运行在同一主机系统之上。我们可以看到全部操作系统层级的架构都可实现跨容器共享,惟一需要独立构建的就是二进制文件与库。正因为如此,容器才拥有极为出色的轻量化特性。
  
  对Docker稍有接触的人应该都见过下图,无需更多解释,Docker减少Guest OS这一层级,所以更轻量和更高性能。

docker架构图
virtual machine diagram

虚拟机架构图
docker diagram

1…456…8
He Lei

He Lei

c/c++/python | redis | recommend algorithm | search engine

75 日志
16 分类
3 标签
GitHub Weibo
© 2018 He Lei
由 Hexo 强力驱动
主题 - NexT.Pisces