Monthly Archives: September 2012

Readiness Selection 倒底有什么好处?

Readiness Selection倒底好在哪里? 有人说它可以实现一个线程服务多个并发的请求,可以省线程云云。

这种囫囵吞枣的说法让人非常迷惑。在经过较为细致的学习之后,我试着来澄清一下。

首先,凡是对性能有点高的应用,想通过一个线程来服务多个并发请求分明就是天方夜谈。即使你用了Readiness Selection,你的Selector也来不及处理那么多Readiness通知。
正确的做法是,使用单线程来做监听Readiness通知,然后把事件委派给子线程处理

到这里,你也许会像我当时一样迷惑:
普通的I/O模式已经可以实现监听连接和处理数据的分离了,主线程accept(), 子线程再对刚建立起的socket读写数据;
有必要再用Readiness Selection么? 

答案是:
虽然新旧模式都实现了监听和处理的分离,但是旧模式中分离的并不彻底,从而产生一些阻塞。 在旧模式中,子线程拿到socket后调用socket.getInputStream()读数据时很有可能阻塞,因为此时socket可能还没数据让你读;在没让你读之前你就读,结果你只能等待。线程是宝贵的资源,让它去干等而不去干该干的事情(比如另外一个socket准备好了可以读了),不是浪费是什么?

而在新模式中,主线程收到acceptable通知时并没有必要立即启动子线程,它完全可以等到收到readable通知时才启动子线程或者把任务分配到子线程池中;被指派的子线程可以立即干活,不必再等待了。 线程的利用率很高,不存在浪费,系统总体的吞吐率也会比较好。

总结一下, 高并发下,使用Readiness Selection仍需要配置使用多线程来处理数据;而Readniess Selection之所以高效,是因为它直到数据真正可读或可写时才会将任务指派给处理线程将让后者立即处理,而不必像旧模式那样早早地把子线程阻塞了。

读了一小段Netty Discard Server Demo背后的源代码

Netty的官方文档使用Discard Server Demo作为第一个入门的例子。我跟了一下这段代码后面的Netty框架代码,以了解Netty是怎么玩NIO的。 我看的代码版本是Netty 3.2.7

一些关键步骤如下

1.      Bootstrap.bind()时会最终调用到 Boss的构造方法,在这里会创建selector,并注册OP_ACCEPT

2.      然后进入Boss这个runnable的run方法,在这个方法里它会轮询连接。奇怪的是,即使select()没有找到key, 它也会用accept()方法试图获得连接;而且如果找到key了,它也会将所有key清空。也就是说,它对key根本不感兴趣。

3.      建立连接后,Boss. registerAcceptedChannel()的方法会调用woker.register()方法。在这个方法里,会新建一个worker级别的selector。但这时Netty并不会立即注册worker channel,而是生成一个RegisterTask,置入某个队列里

4.      而NioWorker这个Runnable的run方法已经跑起来了,它会把RegisterTask从队列里取出来,并同步地调用RegisterTask.run()方法(很怪)。在这个方法里,会设置configureBlocking(false), 并暂时把OP_READ作为interest将channel注入到这个worker自己的selector中

5.      同时NioWorker的run()方法里会自已通过selector.select(500)不停地轮询,等待read readiness.默认情况下的NioWorker个数是cpu个数的两倍(对我的机器来说,就是4个),加上Boss,共有5个线程在侦听

6.      客户发送一段数据后,NioWorker会走到processSelectedKeys()这个方法。在这里面的迭代逻辑是:

    a)        取一个key,然后立即将其从selectKeys()里移除。

    b)        调用read(key)方法,把数据读出来。整个过程是同步的。

        i.             读出ByteBuffer

        ii.             把buffer包装成Event对象通过fireMessageReceived()方法传给Handler

        iii.             Handler处理Event

7.      如果连接数超过4个,worker数并不会跟着增长,相反worker会被共用. 也就是说,一个worker负责超过一个连接,这个worker里的selector上登记了超过一个channel. 而总的工作线程数仍等于worker数。

总结

     Netty会使用一个Boss线程通过轮询来侦听新连接,一旦有新连接,就把它丢到一个线程数固定的worker线程池里去。池里的worker线程会接收、登记新连接并轮询当前是否有read readiness,如果有,则同步地处理数据,除非你的handler里自己实现了异步处理。

     Boss和worker的轮询都是通过selector.select()来实现的,并且都使用了超时参数,对worker来说,使用超时机制可以避免让自己阻塞过长时间,它可以阻塞一小段时间后做点别的事情,这个事情就是接收、登记新连接。 

    另外还可以看出,Worker既做select(),又亲自处理数据。

例示java线程池coreSize、maxSize、queueCapacity和timeout的作用

java线程池的coreSize、maxSize、queueCapacity和timeout倒底有什么用?它们如何影响当前池内的线程数和队列里的任务数? 

网上可以很容易搜到答案,
但为了增加感性认识,我们可以做个实验

1. 令coreSize = 2, maxSize = 4

2. 令queue最大容量 = 6

3. 令超时时间 = 1 分钟

在看代码之前,可以先看下执行结果:

引用

线程池已创建,但还没添加任务。当前池内线程数:0; 当前队列中元素数:0/6

添加了第1个任务。 当前池内线程数:1; 当前队列中元素数:0/6

添加了第2个任务。 当前池内线程数:2; 当前队列中元素数:0/6

添加了第3个任务。 当前池内线程数:2; 当前队列中元素数:1/6

添加了第4个任务。 当前池内线程数:2; 当前队列中元素数:2/6

添加了第5个任务。 当前池内线程数:2; 当前队列中元素数:3/6

添加了第6个任务。 当前池内线程数:2; 当前队列中元素数:4/6

添加了第7个任务。 当前池内线程数:2; 当前队列中元素数:5/6

添加了第8个任务。 当前池内线程数:2; 当前队列中元素数:6/6

添加了第9个任务。 当前池内线程数:3; 当前队列中元素数:6/6

添加了第10个任务。 当前池内线程数:4; 当前队列中元素数:6/6

过了30秒后,当前池内线程数:4; 当前队列中元素数:6/6

过了60秒后,当前池内线程数:4; 当前队列中元素数:2/6

过了90秒后,当前池内线程数:4; 当前队列中元素数:2/6

过了120秒后,当前池内线程数:4; 当前队列中元素数:0/6

过了150秒后,当前池内线程数:4; 当前队列中元素数:0/6

过了180秒后,当前池内线程数:2; 当前队列中元素数:0/6

过了210秒后,当前池内线程数:2; 当前队列中元素数:0/6

过了240秒后,当前池内线程数:0; 当前队列中元素数:0/6

可得结论

1. 线程池刚创建、还没有任务进来时,线程数为0

2. 任务进来时,如果当前线程数未及coreSize,系统会创建新线程来处理

3. 任务进来时,如果当前线程数已经达到或超过了coreSize,系统会把任务置入队列

4. 如果队列里已经塞不下了,则系统会看一下当前线程数是否已达maxSize,如果还没达到,则系统创建新线程来处理(否则,会拒绝任务)

5. 如果当前线程数已经达到maxSize, 那么在队列清空以前,仍会有maxSize的线程在工作; 即使队列里只剩很少的任务

6. 如果当前已经不需要maxSize数的线程数,会有线程被杀死直到线程数=coreSize

7. coreSize数的线程空闲一段时间后会被杀死。

最后给出这个实验的
代码

package player.kent.chen.learn.poolsize;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class HelloPoolSize {

    private static final class OneMinuteTask implements Callable<Void> {
        public Void call() throws Exception {
            //            System.err.println("\t" + Thread.currentThread().getName()
            //                    + " 's call() method has been invoked");
            doSleep(1 * 60 * 1000);
            //            System.err.println("\t" + Thread.currentThread().getName()
            //                    + " 's call() method invocation done");
            return null;
        }
    };

    public static void main(String[] args) throws IOException {

        int queueCapacity = 6;
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
        ExecutorService executor = new ThreadPoolExecutor(2, 4, 1, TimeUnit.MINUTES, workQueue);
        System.out.print("线程池已创建,但还没添加任务。");
        printThreadAndQueueCount(workQueue, queueCapacity);

        for (int i = 1; i <= 10; i++) {
            executor.submit(new OneMinuteTask());
            System.out.print("添加了第" + i + "个任务。 ");
            doSleep(1 * 1000);
            printThreadAndQueueCount(workQueue, queueCapacity);

        }

        for (int i = 1; i < 1000000; i++) {
            int timePeriod = 30;
            doSleep(timePeriod * 1000);
            System.out.print("过了" + (i * timePeriod) + "秒后,");
            printThreadAndQueueCount(workQueue, queueCapacity);
        }

    }

    private static void printThreadAndQueueCount(LinkedBlockingQueue<Runnable> workQueue,
                                                 int queueCapacity) throws IOException {
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();

        int threadCount = 0;
        for (Thread thread : threadSet) {
            if (thread.isDaemon() || thread.getName().equals("main")) {
                continue;
            }
            threadCount++;
        }
        System.out.println(MessageFormat.format("当前池内线程数:{0}; 当前队列中元素数:{1}/{2}", threadCount,
                workQueue.size(), queueCapacity));

    }

    private static void doSleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

补充说明: 按照javadoc的描述,如果线程池的allowCoreThreadTimeOut == false, 则核心线程是不会被杀死的;但由于java 1.6实现上的bug,这个契约没有得到保障,核心线程一般还是会终结;据说java 1.7修正了这个问题。 

为了避免空闲线程,以后我们把 allowCoreThreadTimeOut总是设成true好了;这样在1.6和1.7下都能得到我们想要的结果。

基于Maven编写命令行应用:最佳实践

java命令行应用 =
一些*.jar文件   + 
*.sh可执行脚本

如果你的应用是基于maven的, 可以参考以下实践,避免走弯路。

以下所有资料从网上汇集而来

使用assembly插件将jar和可执行脚本打包成 *.zip


<!--pom.xml-->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.3</version>
				<configuration>
					<descriptors>
						<descriptor>src/main/assembly/assembly.xml</descriptor>
					</descriptors>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

<!--src/main/assembly/assembly.xml -->
<!--jar文件将打在zipFile/jars目录下 -->
<!--脚本文件可以src/main/bin下,打包后在zipFile/bin目录下-->
<assembly>
	<id>jarset</id>
	<formats>
		<format>zip</format>
	</formats>
	<includeBaseDirectory>true</includeBaseDirectory>
	<fileSets>
		<fileSet>
			<directory>src/main/bin</directory>
			<outputDirectory>/bin</outputDirectory>
			<fileMode>755</fileMode>
			<directoryMode>755</directoryMode>
		</fileSet>
	</fileSets>
	<dependencySets>
		<dependencySet>
			<outputDirectory>/jars</outputDirectory>
			<scope>runtime</scope>
		</dependencySet>
	</dependencySets>
</assembly> 

编写*nix下的shell文件

#这里的关键是要把jars/*.jar加入classpath,可以这样写
#!/bin/sh

PRG="$0"  
  
# resolve links - $0 may be a softlink  
while [ -h "$PRG" ]; do  
  ls=`ls -ld "$PRG"`  
  link=`expr "$ls" : '.*-> \(.*\)$'`  
  if expr "$link" : '/.*' > /dev/null; then  
    PRG="$link"  
  else  
    PRG=`dirname "$PRG"`/"$link"  
  fi  
done  
  
  
PRGDIR=`dirname "$PRG"`  
PRGDIR_PARENT=`cd "$PRGDIR/.." >/dev/null; pwd`


java -cp $(echo $PRGDIR_PARENT/jars/*.jar | tr ' ' ':') somepackage.SomeClass $*

(可选)使用timestamp作为程序的版本号

频繁变更、不太严谨的命令行应用可以使用timestamp作为程序的版本号

<!--pom.xml-->
	<groupId>some.group</groupId>
	<artifactId>some.artifact</artifactId>
	<version>${maven.build.timestamp}</version>
	<properties>
		<maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format>
	</properties>

Runtime.getRuntime().exec()对流重定向的命令无效怎么办?

Runtime.getRuntime().exec("ls > ~/1.txt") 

执行后,1.txt还是空的。

为了解决这个问题,有人想出了一个很轻巧的办法:

http://www.codefutures.com/weblog/andygrove/2008/06/generated-script-approach-to-running.html

        // generate a script file containg the command to run
        final File scriptFile = new File("/tmp/runcommand.sh");
        PrintWriter w = new PrintWriter(scriptFile);
        w.println( "#!/bin/sh" );
        w.println( cmd  ); 
        w.close(); 
 
        // make the script executable
        Process p = Runtime.getRuntime().exec( "chmod +x " + scriptFile.getAbsolutePath() );
        p.waitFor(); 
 
        // execute the script
        p = Runtime.getRuntime().exec( scriptFile.getAbsolutePath() ); 
        p.waitFor(); 
    }


一种健壮性测试的思路:用爬虫扫描全站,然后查看系统日志

以前一直有这样一个想法,今天终于付诸实践了。

为了查看web应用有后台错误日志,最基本的测试办法是手工点页面。

但要手工点全站,会比较慢。 最理想的办法是建立一套自动化测试脚本。

但如果没有一套这样的脚本,或者说没有这个脚本还不够完备时,就可以考虑使用爬虫了。 爬虫的好处是它可以递归地找出一个站点的所有链接并跟踪,也就是说能够完整遍历你的网站提供的服务。

所以,你可以一边用爬虫扫,一边盯着你后台的日志文件,看看有没有ERROR Log.

这里推荐一个轻巧的爬虫:skipfish. 它是一个命令行程序,主要用于安全性扫描,不过当然也可以用于健壮性测试:

./skipfish  -D my-friend.com -g 1 -m 1  -r 100  -o  ~/report-dir http://my.com/index  #这是linux版的,windows下也有skipfish.exe
#你还可以使用-C参数注入cookie, 方便扫描登录后的页面

    

扫完之后找出所有错误日志:

cat my.log|grep ERROR|awk '{for (i=5;i<=NF;i++ )printf( "%s ",$i); printf("\n")}' |sort|uniq  # 这里从第5列开始看日志,请按你的日志pattern来调整这个值

不过skipfish有个缺点,它对javascript的处理能力;如果你的网站有大量的ajax请求,那skipfish是不够用的,需要另找其它爬虫。

java线程池的maxSize应该设多大?

今天听高人透露了一个公式:

引用
maxSize =  逻辑cpu数/(1 – I/O等待时间占比)

也就是说,

1. 如果你的任务是纯CPU操作,则maxSize = cpu数,一个CPU服务一个线程。如果线程数<cpu数,则cpu的并行能力未得到充分利用;如果线程数>cpu数,吞吐率也不会变高,反而会因为过多context switch而损害性能。

2. 如果你的任务I/O等待占了一半时间,则maxSize = cpu * 2; 这时,用于cpu操作的线程数是maxSize的一半,恰好=cpu数。如果你的cpu是双核,则maxSize=4, 当4个中的2个线程因为I/O而阻塞时,另外两个非I/O线程可以被调度进来,刚好占满CPU的坑位。

synchronized锁机制详解

synchronized关键字用在方法签名上或者代码块上,用于给资源加锁。不过这个锁的粒度并不限于这个方法或者代码块。它是“对象级别的锁”。

这个所谓的“对象级别的锁”那底是什么意思呢? 也就是说,当一个线程持有这个锁时,访问同一个对象的另一个线程倒底会受到什么影响?

经实验,结论如下:

引用

1. 当一个线程在执行一个synchronized方法时,另一个线程无法立即执行同一个对象的相同方法,也不能执行这个对象的其他任意一个synchronized方法。这就是“对象级别锁”的意义。

2. 当一个线程在执行一个synchronized方法时,另一个线程可以立即执行同一个对象的任意一个
synchronized方法

3. 当一个线程在执行一个synchronized代码块时,另一个线程无法立即执行同一个对象的任意一个synchronized方法,但可以立即执行同一个对象的任意一个
synchronized方法

4. 当一个线程在执行一个synchronized代码块时,另一个线程无法立即执行施加在同一个对象的任意一个synchronized代码块

5. 当一个线程在执行一个synchronized代码块时,另一个线程可以立即执行施加在同一个对象的任意一个
synchronized代码块,即使后者与前者调用了相同的方法

可以看出,synchronized代码块和synchronized方法基本上是对等的。那么,当一个线程在执行一个synchronized方法时,另一个线程是否能够立即执行施加在同一个对象的一个syncrhonized代码块? 答案是显然的。

总之,在同一个对象内,synchronized和非synchronized的资源不会彼此阻塞访问,只有synchronized和synchronized的资源会互相阻塞访问