首页 > 代码库 > Coherence代理节点在离开集群时的恢复

Coherence代理节点在离开集群时的恢复

Coherence的架构参考

技术分享

 

在极端压力之下,有时候代理节点会忙于处理请求而不响应其他的心跳,同步,导致其他节点传输的报文没有回应,而被认为是离开集群,从而影响业务。

写了一段代码,能让进程在监听到有节点离开时关闭节点,同时通过命令自动重起,实现恢复功能。

 

其中有几个要点问题解决如下:

1.Coherence Server可能是多台机器,这样任何proxy离开都会发送消息到监听程序,监听程序需要判断是否是本地进程才能操作。

2.如果通过ip来判断,java在获取本地ip时更多时候是一个list列表,所以程序中通过hostname进行判断

3.hostname和ip的映射,因为只涉及2台coherence server,所以直接把这种关系固定在程序中,当然也可以放到数据库,coherence或者文件。

4.通过proxy离开的消息能够获取processid,但通过pid如何可以获取proxy监听的端口,这里是通过

netstat -nap |grep "+processid +" | grep tcp | grep "+ ip+ ":9"的操作,也就是约定监听在以9开头的端口,然后把特定的行取出来后再进行解析。

5.获取port后再根据port和程序的映射关系去运行相关的启动命令。

6.测试可以分开测试,比如先是获取进程号的测试,然后再进行有进程号后如何kill和重新启动的测试. main下面好多都是调试的脚本。

代码如下:

 

package coherencetest;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.MemberEvent;
import com.tangosol.net.MemberListener;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;

import java.io.InputStreamReader;

import java.io.LineNumberReader;

import java.net.InetAddress;

import java.util.ArrayList;
import java.util.List;


class ProxyListenerNotification implements MemberListener {
static String IP1 = "192.168.0.150";
String ip;


public void memberJoined (MemberEvent e) {
System.out.println("====== Member Join ");
}
public void memberLeaving(MemberEvent e) {}
public void memberLeft(MemberEvent e) {
System.out.println("************ Member Left");
System.out.println("====================");
// System.out.println(e.toString());

System.out.println(e.getMember());

String processId = e.getMember().getProcessName();
String totalstring = e.getMember().getAddress().getHostName();
String localhostname ="";

try {

InetAddress addr = InetAddress.getLocalHost();
localhostname=addr.getHostName().toString();

System.out.println("=============*======="+ip);
} catch (Exception e1) {
System.out.println(e1.getMessage());
}

if (totalstring.indexOf(localhostname) != 0) {
if (localhostname == "ocp") {
ip=IP1;
}

String shStr;
String port = getPort(Integer.parseInt(processId),ip);


if ( port =="9099") {
shStr = "/home/oracle/Middleware/coherence/bin/proxy-cache.sh";
} else {
shStr = "/home/oracle/Middleware/coherence/bin/proxy-cache1.sh";
}
Process process;

String killStr = "kill -9 "+processId;


try {

process = Runtime.getRuntime().exec(killStr);
System.out.println("kill successful");

process = Runtime.getRuntime().exec(new String[]{"/bin/sh","-c",shStr},null,null);

} catch(Exception e1) {
System.out.println(e1.getMessage());
}

}
}

public String getPort(int processid,String ip) {

Process process;
String line="";
try {
String shStr = "netstat -nap |grep "+processid +" | grep tcp | grep "+ ip+ ":9";
System.out.println(shStr);
process = Runtime.getRuntime().exec(new String[]{"/bin/sh","-c",shStr},null,null);
InputStreamReader ir = new InputStreamReader(process.getInputStream());
LineNumberReader input = new LineNumberReader(ir);
process.waitFor();
line = input.readLine();
/*while ((line = input.readLine()) != null){
strList.add(line);
}*/
}catch (Exception e) {
System.out.println(e.getMessage());
}

int index=line.indexOf(":9");
String port = line.substring(index+1, index+5);
System.out.println(" port ="+port);
return port;
}

}

 

public class ProxyListener {

public static void main(String[] args) {
try {
NamedCache cache = CacheFactory.getCache("POFSample");
/*
cache.addMapListener(new MapListener() {

public void entryUpdated(MapEvent arg0) {
System.out.println(arg0);
}

public void entryInserted(MapEvent arg0) {
System.out.println(arg0);

}
public void entryDeleted(MapEvent arg0) {
System.out.println(arg0);
}});
*/

String processId = "6458";
String localhostname ="";

try {

InetAddress addr = InetAddress.getLocalHost();
localhostname=addr.getHostName().toString();

System.out.println("=============*======="+localhostname);
} catch (Exception e1) {
System.out.println(e1.getMessage());
}

String ip = "192.168.0.150";

String shStr;

ProxyListenerNotification pl = new ProxyListenerNotification();
String port = pl.getPort(Integer.parseInt(processId),ip);

//cache.getCacheService().addMemberListener(new ProxyListenerNotification());

if ( port.equals("9099")) {
System.out.println("1");
shStr = "/home/oracle/Middleware/coherence/bin/proxy-cache.sh";
} else {
System.out.println("2");
shStr = "/home/oracle/Middleware/coherence/bin/proxy-cache1.sh";
}
Process process;

String killStr = "kill -9 "+processId;

try {
process = Runtime.getRuntime().exec(killStr);
System.out.println("kill successful");

process = Runtime.getRuntime().exec(new String[]{"/bin/sh","-c",shStr},null,null);
} catch(Exception e1) {
System.out.println(e1.getMessage());
}

while(true){
// Thread.sleep(100000);
}
} catch (Exception e) {
System.out.println(e);
}
}
}

Coherence代理节点在离开集群时的恢复