1.问题
单机应用内,在进程内部,我们可以使用ThreadLocal传递应用上下文的方式. 当前的 Spring Secrucity , Spring TransactionManager, Log4J MDC, Struts2 ActionContext等等应用场景随处可见.
但在是分布式系统下,由于不是在同一个进程内,所以无法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是将一个系统中的ThreadLocal信息可以传递至下一个系统,将两者的调用可以关联起来。如对应用有一个调用,我们生成一个请求ID (traceId),在后面所有分布式系统调用中,可以通过这个traceId将所有调用关联起来,这样查找调用日志都将十分方便.
2.实现方式
我们现在使用的通讯协议,一般都包含两部分:Header,Body. 如 Soap Header,Http Header. 通过自定义Header,可以带上我们的自定义信息。 然后在服务器端解析Header,再得到自定义信息。那么就可以完成Distributed ThreadLocal的功能。
如上图,通过两个拦截器,client在调用之前,将DistrbiutedThreadLocal中的信息放在soap header中,在服务端方法调用之前,从soap header中取回 DistrbiutedThreadLocal信息。
3. 实现代码.
以下为CXF webservice的实现代码,一个DistributedThreadLocal及增加了两个拦截器. hessian 也可以自定义Header,完成传递.
DistributedThreadLocal
/**
* 分布式 ThreadLocal, 存放在ThreadLocal中的数据可以传输至另外一台机器上
* @author badqiu
*/
public class DistributedThreadLocal {
public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_";
public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>();
public static void putAll(Map<String, String> map) {
getMap().putAll(map);
}
public static void put(String key, String value) {
getMap().put(key, value);
}
public static String get(String key) {
Map<String, String> map = threadLocal.get();
if (map == null)
return null;
return (String) map.get(key);
}
public static Map<String, String> getMap() {
Map<String, String> map = threadLocal.get();
if (map == null) {
map = new HashMap();
threadLocal.set(map);
}
return map;
}
public static void clear() {
threadLocal.set(null);
}
}
DistributedThreadLocalInSOAPHeaderInterceptor
/**
* 输入(In)拦截器,用于从 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中
*
* @author badqiu
*/
public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor {
private SAAJInInterceptor saajIn = new SAAJInInterceptor();
public DistributedThreadLocalInSOAPHeaderInterceptor() {
super(Phase.PRE_PROTOCOL);
getAfter().add(SAAJInInterceptor.class.getName());
}
public void handleMessage(SoapMessage message) throws Fault {
SOAPMessage doc = message.getContent(SOAPMessage.class);
if (doc == null) {
saajIn.handleMessage(message);
doc = message.getContent(SOAPMessage.class);
}
Map<String,String> headers = toHeadersMap(doc);
DistributedThreadLocal.putAll(headers);
}
private Map toHeadersMap(SOAPMessage doc) {
SOAPHeader header = getSOAPHeader(doc);
if (header == null) {
return new HashMap(0);
}
Map<String,String> headersMap = new HashMap();
NodeList nodes = header.getChildNodes();
for(int i=0; i<nodes.getLength(); i++) {
Node item = nodes.item(i);
if(item.hasChildNodes()) {
headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue());
}
}
return headersMap;
}
private SOAPHeader getSOAPHeader(SOAPMessage doc) {
SOAPHeader header;
try {
header = doc.getSOAPHeader();
} catch (SOAPException e) {
throw new RuntimeException(e);
}
return header;
}
}
DistributedThreadLocalOutSOAPHeaderInterceptor
/**
* 输出(Out)拦截器,用于将DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中
*
* @author badqiu
*/
public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor {
public DistributedThreadLocalOutSOAPHeaderInterceptor() {
super(Phase.WRITE);
}
public void handleMessage(SoapMessage message) throws Fault {
List<Header> headers = message.getHeaders();
Map<String,String> threadlocalMap = DistributedThreadLocal.getMap();
for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) {
headers.add(getHeader(entry.getKey(), entry.getValue()));
}
}
private Header getHeader(String key, String value) {
QName qName = new QName(key);
Document document = DOMUtils.createDocument();
Element element = document.createElement(key);
element.appendChild(document.createTextNode(value));
SoapHeader header = new SoapHeader(qName, element);
return (header);
}
}
CXF spring配置文件:
server端:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-lazy-init="true">
<description>Apache CXF的Web Service配置</description>
<import resource="classpath:META-INF/cxf/cxf.xml" />
<import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
<import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
<!-- jax-ws endpoint定义 -->
<jaxws:endpoint address="/hello" >
<jaxws:implementor ref="hello" />
<jaxws:inInterceptors>
<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/>
</jaxws:inInterceptors>
</jaxws:endpoint>
<!-- WebService的实现Bean定义 -->
<bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" />
</beans>
client端:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-lazy-init="true">
<description>Apache CXF Web Service Client端配置</description>
<jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello"
address="http://localhost:8080/service/hello" >
<jaxws:outInterceptors>
<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/>
</jaxws:outInterceptors>
</jaxws:client>
</beans>
4. 应用场景.
通过分布式应用上下文,暂时想到的几个应用场景.
1. Log4j MDC traceId传递. 通过一个traceId,将所有相关的 操作所有的日志信息关联起来。
2. sessionId 传递, 让我们的应用也有状态,可以使用session什么的
3. Security(username,password)传递. 在需要安全调用的地方,避免污染接口,需要显式的在接口传递username,password. 相对应的 WSSecurity也可以走这个通道
分布式应用上下文的概念,全球首创,欢迎转载(因为google 搜索不到相关文章,或许早已经有相同的概念了,欢迎提醒我)。
分享到:
相关推荐
分布式定时任务库 distributed-cron
分布式 Java(Distributed Java)
分布式事务演示-distributed-transaction-demo
基于mpi,linux平台,c++分布式数据挖掘算法。
1. 基础知识 3 2. 环的原子管理算法 4 3. 路由算法 10 4. 组通信算法 15 5. 副本管理算法 17 6. 分布式哈希表的应用 21 1. 基
[分布式算法].(Distributed.Algorithms).Lynch,.扫描版.pdf
企业级分布式应用服务EDAS(Enterprise Distributed Application Service) 期望做一个基于go-micro + casbin + jwt 的用户认证和权限的微服务
.Net分布式应用程序开发 包括Webservice,.Net Remoting的开发.
openGauss_3.0.0 分布式镜像(openGauss-distributed-CentOS-x86_64.tar.gz)适用于centos_x86_64
分布式事务服务 (Distributed Transaction Service, DTS) 是一个分布式事务框架,用来保障在大规模分布式环境下事务的最终一致性。DTS 从架构上分为 dts-core 、dts-schedule、 dts-server 两部分,dts-core是一个...
这项工作改进了经典的认证信誉(CR)模型,并为LBSR提出了一种新颖的完全分布式的上下文感知信任(FCT)模型。 推荐操作由服务提供商直接进行,而我们的FCT模型中不再需要受信任的第三方。 此外,由于我们的FCT模型...
Distributed Systems: Principles and Paradigms 3,第三版,分布式 by Andrew S. Tanenbaum and Maarten van Steen 《分布式系统原理与范型》第三版,英文原版
云计算(Cloud Computing ):是分布式处理(Distributed Computing)、并行处理(Parallel Computing)和网格计算(Grid Computing)的发展,或者说是这些计算机科学概念的商业实现。是指基于互联网的超级计算模式--即把...
分布式系统具有高度的内聚性和透明性,分布式软件系统(Distributed Software Systems)是支持分布式处理的软件系统
本书为英文版 共19章 1. 简介 Introduction 2. 准备 Preliminaries 3. 快照 Snapshots 4. 波 Waves 5. 死锁 Deadlock Detection 6. 终止 Termination Detection 7. 垃圾回收 Garbage Collection ...
The purpose of a distributed file system (DFS) is to allow users of physically distributed computers to share data and storage resources by using a common file system. A typical configuration for a ...
distributed-java, Distributed Java.《分布式 Java》
应用层分布式拒绝服务攻击检测模型,谢逸,余顺争,随着攻击者所拥有的网络资源与技术的不断增长, 分布式拒绝服务(Distributed Denial of Service, DDoS)攻击对现代网络安全形成了新的挑战. 传统�
进入2000年以来,网络遭受攻击事件不断发生,全球许多著名网站如...取而代之的是,在一定时间内,彻底使被攻击的网络丧失正常服务功能,这种攻击手法为 DDoS,即分布式拒绝服务攻击(Distributed denial of service )。
分布式应用程序描述安装构建状态用法 // you need a service host firstvar serviceHost = new ServiceHost();// register servicesserviceHost.registerService({ name: 'distributed-relational-crud-service' , ...