sofabolt基础通信模型

阻塞与同步的再讨论

四个线程的模型:(非常重要。非常简单。非常关键)

  1. 有的时候,不必非得给一次通信扣上同步或者一部的的帽子。
  2. 客户端的同步异步称为阻塞和非阻塞,服务器端的同步异步称为同步异步。
  3. 只要链路上有一处是一部的,我们称整个调用链路就是异步的。
  4. 将客户端分为客户端线程和客户端连接线程,将服务端分为服务端线程和服务端连接线程。
    1. 如果客户端线程等待连接线程返回结果,则称为阻塞的。如果不等待连接线程的结果,称为非阻塞的。
    2. 如果服务端连接线程等待服务端线程的结果,则称为同步的。如果不等待服务端处理线程的结果,称为异步的。

四种客户端模型

我们提供了四种通信模型,这四种模型都是客户端的调用模式。

  1. Oneway 调用(客户端非阻塞)

当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。注意 Oneway 调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务场景需要能接受这样的异常场景,才可以使用。请参考示例。

  1. Sync 同步调用(客户端阻塞)

当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用。如果超时时间内没有得到结果,那么会抛出超时异常。这种调用模式最常用。注意要根据对端的处理能力,合理设置超时时间。请参考示例。

  1. Future调用(客户端半阻塞)

当前线程发起调用,得到一个 RpcResponseFuture 对象,当前线程可以继续执行下一次调用。可以在任意时刻,使用 RpcResponseFuture 对象的 get() 方法来获取结果,如果响应已经回来,此时就马上得到结果;如果响应没有回来,则会阻塞住当前线程,直到响应回来,或者超时时间到。请参考示例。

  1. Callback异步调用(客户端不阻塞)

当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册一个回调,该回调需要分配一个异步线程池。待响应回来后,会在回调的异步线程池,来执行回调逻辑。请参考示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package com.alipay.remoting.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.InvokeCallback;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.RpcResponseFuture;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.PortScan;
import com.alipay.remoting.rpc.common.RequestBody;
import com.alipay.remoting.rpc.common.SimpleClientUserProcessor;
import com.alipay.remoting.rpc.common.SimpleServerUserProcessor;
import com.alipay.remoting.util.RemotingUtil;

/**
* basic usage demo
*
* basic usage of rpc client and rpc server
*
* @author xiaomin.cxm
* @version $Id: BasicUsageDemo.java, v 0.1 Apr 6, 2016 8:58:36 PM xiaomin.cxm Exp $
*/
public class BasicUsageDemoByJunit {
static Logger logger = LoggerFactory
.getLogger(BasicUsageDemoByJunit.class);

BoltServer server;
RpcClient client;

int port = PortScan.select();
String ip = "127.0.0.1";
String addr = "127.0.0.1:" + port;

int invokeTimes = 5;

SimpleServerUserProcessor serverUserProcessor = new SimpleServerUserProcessor();
SimpleClientUserProcessor clientUserProcessor = new SimpleClientUserProcessor();
CONNECTEventProcessor clientConnectProcessor = new CONNECTEventProcessor();
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor clientDisConnectProcessor = new DISCONNECTEventProcessor();
DISCONNECTEventProcessor serverDisConnectProcessor = new DISCONNECTEventProcessor();

@Before
public void init() {
server = new BoltServer(port, true);
server.start();
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
server.registerUserProcessor(serverUserProcessor);

client = new RpcClient();
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
client.registerUserProcessor(clientUserProcessor);
client.init();
}

@After
public void stop() {
try {
server.stop();
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error("Stop server failed!", e);
}
}

@Test
public void testOneway() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world oneway");
for (int i = 0; i < invokeTimes; i++) {
try {
client.oneway(addr, req);
Thread.sleep(100);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

@Test
public void testSync() throws InterruptedException {
RequestBody req = new RequestBody(1, "hello world sync");
for (int i = 0; i < invokeTimes; i++) {
try {
String res = (String) client.invokeSync(addr, req, 3000);
logger.warn("Result received in sync: " + res);
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in sync!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in sync!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

@Test
public void testFuture() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world future");
for (int i = 0; i < invokeTimes; i++) {
try {
RpcResponseFuture future = client.invokeWithFuture(addr, req, 3000);
String res = (String) future.get();
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in future!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in future!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

@Test
public void testCallback() throws InterruptedException {
RequestBody req = new RequestBody(1, "hello world callback");
final List<String> rets = new ArrayList<String>(1);
for (int i = 0; i < invokeTimes; i++) {
final CountDownLatch latch = new CountDownLatch(1);
try {
client.invokeWithCallback(addr, req, new InvokeCallback() {
Executor executor = Executors.newCachedThreadPool();

@Override
public void onResponse(Object result) {
logger.warn("Result received in callback: " + result);
rets.add((String) result);
latch.countDown();
}

@Override
public void onException(Throwable e) {
logger.error("Process exception in callback.", e);
latch.countDown();
}

@Override
public Executor getExecutor() {
return executor;
}

}, 1000);

} catch (RemotingException e) {
latch.countDown();
String errMsg = "RemotingException caught in callback!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
try {
latch.await();
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in callback!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
if (rets.size() == 0) {
Assert.fail("No result! Maybe exception caught!");
}
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, rets.get(0));
rets.clear();
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
}

两种服务器模型

提供两种模型,都是服务器的模型。

  1. 同步处理器
    同步处理返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package com.alipay.remoting.rpc.common;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.BizContext;
import com.alipay.remoting.InvokeContext;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.rpc.protocol.SyncUserProcessor;

/**
* a demo user processor for rpc server
*
* @author xiaomin.cxm
* @version $Id: SimpleServerUserProcessor.java, v 0.1 Jan 7, 2016 3:01:49 PM xiaomin.cxm Exp $
*/
public class SimpleServerUserProcessor extends SyncUserProcessor<RequestBody> {

/** logger */
private static final Logger logger = LoggerFactory
.getLogger(SimpleServerUserProcessor.class);

/** delay milliseconds */
private long delayMs;

/** whether delay or not */
private boolean delaySwitch;

/** executor */
private ThreadPoolExecutor executor;

/** default is true */
private boolean timeoutDiscard = true;

private AtomicInteger invokeTimes = new AtomicInteger();

private AtomicInteger onewayTimes = new AtomicInteger();
private AtomicInteger syncTimes = new AtomicInteger();
private AtomicInteger futureTimes = new AtomicInteger();
private AtomicInteger callbackTimes = new AtomicInteger();

private String remoteAddr;
private CountDownLatch latch = new CountDownLatch(1);

public SimpleServerUserProcessor() {
this.delaySwitch = false;
this.delayMs = 0;
this.executor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), new NamedThreadFactory("Request-process-pool"));
}

public SimpleServerUserProcessor(long delay) {
this();
if (delay < 0) {
throw new IllegalArgumentException("delay time illegal!");
}
this.delaySwitch = true;
this.delayMs = delay;
}

public SimpleServerUserProcessor(long delay, int core, int max, int keepaliveSeconds,
int workQueue) {
this(delay);
this.executor = new ThreadPoolExecutor(core, max, keepaliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(workQueue), new NamedThreadFactory(
"Request-process-pool"));
}

// ~~~ override methods

@Override
public Object handleRequest(BizContext bizCtx, RequestBody request) throws Exception {
logger.warn("Request received:" + request + ", timeout:" + bizCtx.getClientTimeout()
+ ", arriveTimestamp:" + bizCtx.getArriveTimestamp());

if (bizCtx.isRequestTimeout()) {
String errMsg = "Stop process in server biz thread, already timeout!";
processTimes(request);
logger.warn(errMsg);
throw new Exception(errMsg);
}

this.remoteAddr = bizCtx.getRemoteAddress();

//test biz context get connection
Assert.assertNotNull(bizCtx.getConnection());
Assert.assertTrue(bizCtx.getConnection().isFine());

Long waittime = (Long) bizCtx.getInvokeContext().get(InvokeContext.BOLT_PROCESS_WAIT_TIME);
Assert.assertNotNull(waittime);
if (logger.isInfoEnabled()) {
logger.info("Server User processor process wait time {}", waittime);
}

latch.countDown();
logger.warn("Server User processor say, remote address is [" + this.remoteAddr + "].");
Assert.assertEquals(RequestBody.class, request.getClass());
processTimes(request);
if (!delaySwitch) {
return RequestBody.DEFAULT_SERVER_RETURN_STR;
}
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RequestBody.DEFAULT_SERVER_RETURN_STR;
}

@Override
public String interest() {
return RequestBody.class.getName();
}

@Override
public Executor getExecutor() {
return executor;
}

@Override
public boolean timeoutDiscard() {
return this.timeoutDiscard;
}

// ~~~ public methods
public int getInvokeTimes() {
return this.invokeTimes.get();
}

public int getInvokeTimesEachCallType(RequestBody.InvokeType type) {
return new int[] { this.onewayTimes.get(), this.syncTimes.get(), this.futureTimes.get(),
this.callbackTimes.get() }[type.ordinal()];
}

public String getRemoteAddr() throws InterruptedException {
latch.await(100, TimeUnit.MILLISECONDS);
return this.remoteAddr;
}

// ~~~ private methods
private void processTimes(RequestBody req) {
this.invokeTimes.incrementAndGet();
if (req.getMsg().equals(RequestBody.DEFAULT_ONEWAY_STR)) {
this.onewayTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_SYNC_STR)) {
this.syncTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_FUTURE_STR)) {
this.futureTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_CALLBACK_STR)) {
this.callbackTimes.incrementAndGet();
}
}

// ~~~ getters and setters
/**
* Getter method for property <tt>timeoutDiscard</tt>.
*
* @return property value of timeoutDiscard
*/
public boolean isTimeoutDiscard() {
return timeoutDiscard;
}

/**
* Setter method for property <tt>timeoutDiscard<tt>.
*
* @param timeoutDiscard value to be assigned to property timeoutDiscard
*/
public void setTimeoutDiscard(boolean timeoutDiscard) {
this.timeoutDiscard = timeoutDiscard;
}
}
  1. 异步处理器
    直接返回结果,异步处理。其实这都是服务器上开不开异步线程的问题,与sofabolt框架关系不大。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package com.alipay.remoting.rpc.common;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.rpc.protocol.AsyncUserProcessor;

/**
* a demo aysnc user processor for rpc server
*
* @author xiaomin.cxm
* @version $Id: SimpleServerUserProcessor.java, v 0.1 Jan 7, 2016 3:01:49 PM xiaomin.cxm Exp $
*/
public class AsyncServerUserProcessor extends AsyncUserProcessor<RequestBody> {

/** logger */
private static final Logger logger = LoggerFactory
.getLogger(AsyncServerUserProcessor.class);

/** delay milliseconds */
private long delayMs;

/** whether delay or not */
private boolean delaySwitch;

/** whether exception */
private boolean isException;

/** whether null */
private boolean isNull;

/** executor */
private ThreadPoolExecutor executor;

private ThreadPoolExecutor asyncExecutor;

private AtomicInteger invokeTimes = new AtomicInteger();

private AtomicInteger onewayTimes = new AtomicInteger();
private AtomicInteger syncTimes = new AtomicInteger();
private AtomicInteger futureTimes = new AtomicInteger();
private AtomicInteger callbackTimes = new AtomicInteger();

private String remoteAddr;
private CountDownLatch latch = new CountDownLatch(1);

public AsyncServerUserProcessor() {
this.delaySwitch = false;
this.isException = false;
this.delayMs = 0;
this.executor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), new NamedThreadFactory("Request-process-pool"));
this.asyncExecutor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), new NamedThreadFactory(
"Another-aysnc-process-pool"));
}

public AsyncServerUserProcessor(boolean isException, boolean isNull) {
this();
this.isException = isException;
this.isNull = isNull;
}

public AsyncServerUserProcessor(long delay) {
this();
if (delay < 0) {
throw new IllegalArgumentException("delay time illegal!");
}
this.delaySwitch = true;
this.delayMs = delay;
}

public AsyncServerUserProcessor(long delay, int core, int max, int keepaliveSeconds,
int workQueue) {
this(delay);
this.executor = new ThreadPoolExecutor(core, max, keepaliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(workQueue), new NamedThreadFactory(
"Request-process-pool"));
}

@Override
public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, RequestBody request) {
this.asyncExecutor.execute(new InnerTask(bizCtx, asyncCtx, request));
}

class InnerTask implements Runnable {
private BizContext bizCtx;
private AsyncContext asyncCtx;
private RequestBody request;

public InnerTask(BizContext bizCtx, AsyncContext asyncCtx, RequestBody request) {
this.bizCtx = bizCtx;
this.asyncCtx = asyncCtx;
this.request = request;
}

public void run() {
logger.warn("Request received:" + request);
remoteAddr = bizCtx.getRemoteAddress();
latch.countDown();
logger.warn("Server User processor say, remote address is [" + remoteAddr + "].");
Assert.assertEquals(RequestBody.class, request.getClass());
processTimes(request);
if (isException) {
this.asyncCtx.sendResponse(new IllegalArgumentException("Exception test"));
} else if (isNull) {
this.asyncCtx.sendResponse(null);
} else {
if (!delaySwitch) {
this.asyncCtx.sendResponse(RequestBody.DEFAULT_SERVER_RETURN_STR);
return;
}
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.asyncCtx.sendResponse(RequestBody.DEFAULT_SERVER_RETURN_STR);
}
}
}

private void processTimes(RequestBody req) {
this.invokeTimes.incrementAndGet();
if (req.getMsg().equals(RequestBody.DEFAULT_ONEWAY_STR)) {
this.onewayTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_SYNC_STR)) {
this.syncTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_FUTURE_STR)) {
this.futureTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_CALLBACK_STR)) {
this.callbackTimes.incrementAndGet();
}
}

@Override
public String interest() {
return RequestBody.class.getName();
}

@Override
public Executor getExecutor() {
return executor;
}

public int getInvokeTimes() {
return this.invokeTimes.get();
}

public int getInvokeTimesEachCallType(RequestBody.InvokeType type) {
return new int[] { this.onewayTimes.get(), this.syncTimes.get(), this.futureTimes.get(),
this.callbackTimes.get() }[type.ordinal()];
}

public String getRemoteAddr() throws InterruptedException {
latch.await(100, TimeUnit.MILLISECONDS);
return this.remoteAddr;
}
}

客户端和服务端初始化

  • 客户端初始化方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.alipay.remoting.demo;

import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.RequestBody;
import com.alipay.remoting.rpc.common.SimpleClientUserProcessor;

/**
* a demo for rpc client, you can just run the main method after started rpc server of {@link RpcServerDemoByMain}
*
* @author tsui
* @version $Id: RpcClientDemoByMain.java, v 0.1 2018-04-10 10:39 tsui Exp $
*/
public class RpcClientDemoByMain {
static Logger logger = LoggerFactory
.getLogger(RpcClientDemoByMain.class);

static RpcClient client;

static String addr = "127.0.0.1:8999";

SimpleClientUserProcessor clientUserProcessor = new SimpleClientUserProcessor();
CONNECTEventProcessor clientConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor clientDisConnectProcessor = new DISCONNECTEventProcessor();

public RpcClientDemoByMain() {
// 1. create a rpc client
client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.init();
}

public static void main(String[] args) {
new RpcClientDemoByMain();
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
client.shutdown();
}
}
  • 服务器端初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.alipay.remoting.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.SimpleServerUserProcessor;

/**
* a demo for rpc server, you can just run the main method to start a server
*
* @author tsui
* @version $Id: RpcServerDemoByMain.java, v 0.1 2018-04-10 10:37 tsui Exp $
*/
public class RpcServerDemoByMain {
static Logger logger = LoggerFactory
.getLogger(RpcServerDemoByMain.class);

BoltServer server;

int port = 8999;

SimpleServerUserProcessor serverUserProcessor = new SimpleServerUserProcessor();
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor serverDisConnectProcessor = new DISCONNECTEventProcessor();

public RpcServerDemoByMain() {
// 1. create a Rpc server with port assigned
server = new BoltServer(port);
// 2. add processor for connect and close event if you need
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
// 3. register user processor for client request
server.registerUserProcessor(serverUserProcessor);
// 4. server start
if (server.start()) {
System.out.println("server start ok!");
} else {
System.out.println("server start failed!");
}
// server.getRpcServer().stop();
}

public static void main(String[] args) {
new RpcServerDemoByMain();
}
}

2 进阶功能

请求上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void testOneway() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world oneway");
for (int i = 0; i < invokeTimes; i++) {
try {
InvokeContext invokeContext = new InvokeContext();
client.oneway(addr, req, invokeContext);
Assert.assertEquals("127.0.0.1", invokeContext.get(InvokeContext.CLIENT_LOCAL_IP));
Assert.assertEquals("127.0.0.1", invokeContext.get(InvokeContext.CLIENT_REMOTE_IP));
Assert.assertNotNull(invokeContext.get(InvokeContext.CLIENT_LOCAL_PORT));
Assert.assertNotNull(invokeContext.get(InvokeContext.CLIENT_REMOTE_PORT));
Assert.assertNotNull(invokeContext.get(InvokeContext.CLIENT_CONN_CREATETIME));
logger.warn("CLIENT_CONN_CREATETIME:"
+ invokeContext.get(InvokeContext.CLIENT_CONN_CREATETIME));
Thread.sleep(100);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

双工通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

@Test
public void testServerSyncUsingConnection1() throws Exception {
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(addr, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);

Assert.assertNotNull(serverConnectProcessor.getConnection());
Connection serverConn = serverConnectProcessor.getConnection();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

@Test
public void testServerSyncUsingConnection() throws Exception {
Connection clientConn = client.createStandaloneConnection(ip, port, 1000);

for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(clientConn, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);

Assert.assertNotNull(serverConnectProcessor.getConnection());
Connection serverConn = serverConnectProcessor.getConnection();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

@Test
public void testServerSyncUsingAddress() throws Exception {
Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
String remote = RemotingUtil.parseRemoteAddress(clientConn.getChannel());
String local = RemotingUtil.parseLocalAddress(clientConn.getChannel());
logger.warn("Client say local:" + local);
logger.warn("Client say remote:" + remote);

for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(clientConn, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);

Assert.assertNotNull(serverConnectProcessor.getConnection());
// only when client invoked, the remote address can be get by UserProcessor
// otherwise, please use ConnectionEventProcessor
String remoteAddr = serverUserProcessor.getRemoteAddr();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(remoteAddr, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}

Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}

建立多连接与连接预热

通常来说,点对点的直连通信,客户端和服务端,一个 IP 一个连接对象就够用了。不管是吞吐能力还是并发度,都能满足一般业务的通信需求。而有一些场景,比如不是点对点直连通信,而是经过了 LVS VIP,或者 F5 设备的连接,此时,为了负载均衡和容错,会针对一个 URL 地址建立多个连接。我们提供如下方式来建立多连接,即发起调用时传入的 URL 增加如下参数 127.0.0.1:12200?_CONNECTIONNUM=30&_CONNECTIONWARMUP=true,表示针对这个 IP 地址,需要建立30个连接,同时需要预热连接。其中预热与不预热的区别是:

预热:即第一次调用(比如 Sync 同步调用),就建立30个连接
不预热:每一次调用,创建一个连接,直到创建满30个连接

  • 客户端能够创建多个连接connection,放到连接池,
  • 客户端和服务端都能注册多个Processor。

Connection与Processor是多对多的关系。

自动断连与重连

序列化与反序列化器