0%

最近在处理项目中,处理关于大模型流式接口的对接,并将处理后的数据,按照流式方式发送给前端。在了解实现方案中,发现基于 OkHttp Sse + Spring SseEmitter 的技术方案。OkHttp Sse 实现对大模型输出的流式结果进行监控处理,并通过 Spring SseEmitter 将处理后的结果输出到前端。

Spring SseEmitter

服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端,直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议简单的服务器数据推送的场景,使用服务器推送事件, SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

SseEmitter 是SpringMVC(4.2+)提供的一种技术,它是基于Http协议的,相比WebSocket,它更轻量,但是它只能从服务端向客户端单向发送信息。

案例

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 模拟流式结果输出
*/
@GetMapping(value = "stream_query", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter streamQuery() {
SseEmitter emitter = new SseEmitter(60000L);
Flux.interval(Duration.ofSeconds(1))
.map(data -> "Server Message # " + data)
.doOnCancel(emitter::complete)
.subscribe(data -> {
try {
emitter.send(SseEmitter.event().data(data));
} catch (Exception e) {
log.error("发送数据异常", e);
}
});
return emitter;
}

与 WebSocket 比较

Spring SseEmitter 和 WebSocket 都实现了服务端向客户端推送数据,不过 WebSocket 是 全双工通信,客户端和服务端可以互相通信。而 SseEmitter 只能从服务端向 客户端推送(该功能以及包含在 spring boot jar 中)。

Read more »

前几天在项目中使用 fastjson,对对象进行修改,发现我修改了新创建的对象,但是原始的 fastjson 的内容也发生了改变。当时是在一个方法中,有个 JSONObject 对象传递进来,然后在 for 循环中重新创建一个对象,塞进数组中,下面模拟一个简单事例:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
JSONObject data = new JSONObject();
data.put("a", "a");
data.put("b", "b");
JSONObject after = new JSONObject(data);
after.put("c", "c");
System.out.println("原始数据:" + data);
System.out.println("修改后:" + after);
}

输出结果:

1
2
原始数据:{"a":"a","b":"b","c":"c"}
修改后:{"a":"a","b":"b","c":"c"}

在印象中的Java中,只要我通过 new 创建一个新的对象,那么和之前的对象数据应该就没关系了,但是我们会发现上面的代码中,我修改了新创建的对象,但是原始数据还是跟着修改了。

翻了下代码,发现了 fastjson 中这个问题的奥秘。通过下面的代码,会发现,在创建一个新的 JSONObject 对象时,赋值的对象 map 是通过 final 进行修饰的,这时想起来,final 有个特性:如果引用为引用类型数据,比如对象、数组,则该对象、数组本身可以修改,但指向该对象或数组的地址引用不能修改。

1
2
3
4
5
6
7
8
9
10
private static final int DEFAULT_INITIAL_CAPACITY = 16;
private final Map<String, Object> map;

public JSONObject(Map<String, Object> map) {
if (map == null) {
throw new IllegalArgumentException("map is null.");
} else {
this.map = map;
}
}

通过上面的例子,虽然我通过 new 创建了一个新的对象,但是新对象的地址还是指向了最原始的那个地址。

PS:不知道这个点是不是 fastjson 的开发者考虑性能问题。在我尝试使用 fastjson2时,发现已经不存在这个问题了

如果想使用 fastjson 进行拷贝数据,可以先将源对象转化成对象,再转成 JSONObject,比如下面的例子:

Read more »

最近在项目中使用 mongo,ORM 使用的是 spring-boot-starter-data-mongo。在使用过程中,对于数据库字段,每次都要写,觉得麻烦,然后想起以前使用 mybatis-plus,只需要使用 lambda 获取字段名称即可,如下:

1
2
3
4
5
public List<UserInfo> getListByName(String name) {
LambdaQueryWrapper<UserInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(UserInfo::getName, name);
return list(queryWrapper);
}

下面利用Lambda 函数式接口,通过 SerializedLambda 原理来获取 Java Bean 的实例。

定义函数接口

1
2
3
4
@FunctionalInterface
public interface Func<E, R> extends Function<E, R>, Serializable {

}

这里函数式接口继承 Serializable 接口,在 JDK 1.8 之后,JDK提供了一个新的类,凡是继承了 Serializable 的函数式接口的实例,都可以获取一个属于它的 SerializedLambda 实例,并通过它获取实例的信息。所以,我们可以通过这个原理来实现如何通过 Lambda 来获取 Java Bean 的属性名称。

实现获取方法字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ReflectLambdaUtils {

private static final Logger logger = LoggerFactory.getLogger(ReflectLambdaUtils.class);

/**
* 根据参数获取字段名称
*
* @param func
* @return
*/
public static <E, R> String getFieldName(Func<E, R> func) {
Field field = getField(func);
return field.getName();
}

/**
* 获取表达式的字段
*
* @param func
* @return
*/
public static <E, R> Field getField(Func<E, R> func) {
try {
Method method = func.getClass().getDeclaredMethod("writeReplace");
method.setAccessible(Boolean.TRUE);
// 1.调用 writeReplace 方法,返回一个 writeReplace 对象
SerializedLambda serializedLambda = (SerializedLambda) method.invoke(func);
String getterMethod = serializedLambda.getImplMethodName();
String fieldName = Introspector.decapitalize(getterMethod.replace("get", ""));
// 2. 获取的Class是字符串,并且包名是“/”分割,需要替换成“.”,才能获取到对应的Class对象
String declaredClass = serializedLambda.getImplClass().replace("/", ".");
Class<?> aClass = Class.forName(declaredClass, false, ClassUtils.getDefaultClassLoader());
// 3.通过Spring 中的反射工具类获取Class中定义的Field
return ReflectionUtils.findField(aClass, fieldName);
} catch (ReflectiveOperationException e) {
logger.error("解析类字段出现异常", e);
throw new MongoPlusException("解析字段时出现异常");
}
}
}

其中 writeReplace() 这个方法,是虚拟机加上去的,虚拟机会自动给实现 Serializable 接口的 Lambda 表达式生成 writeReplace() 方法。如果是被序列化后,实体对象就会有 writeReplace() 方法,调用该方法,会返回 SerializedLambda 对象去做序列化,即被序列化的对象被替换了。

这个时候,返回的 SerializedLambda 对象中包含了 Lambda 表达式中所有的信息,比如函数名implMethodName、函数签名 implMethodSignature 等。我们就可以根据这些信息,获取我们所需要的属性字段了。

Read more »

概述

工具类就是封装平时常用的方法,不需要重复造轮子,今天介绍下谷歌的 guava。

guava 的优点:

  • 高效设计良好的API,被 Google 的开发者设计,实现和使用
  • 遵循高效的Java的语法实践
  • 使代码更刻度,简洁,简单
  • 节约时间,资源,提高生产力

Guava 工具包包含若干个 Google Java项目:

  • 集合
  • 缓存
  • 原生类型支持
  • 并发库
  • 通用注解
  • 字符串处理
  • I/O 等

在使用时,只需要在项目依赖中加入 Google Guava:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>

集合

Google Guava 工具包提供了大量创建和使用集合的方法。

Read more »

最近在项目中碰到一些问题,一个定时任务在运行过程中崩溃,导致定时任务没有执行完成,业务那边想能够在不重启任务的情况下,重启任务继续跑,而且希望在不重启服务的情况下,能够动态的修改定时任务时间。所以有了这一篇文章。

一般情况下,在Spring Boot 项目中,想使用定时任务,只需要使用 @EnableScheduling 注解开启定时任务即可,然后在定时任务调度的任务上,添加@Scheduled,修改自己需要的任务周期。

普通定时任务

下面是一般情况下,在开启@EnableScheduling注解情况下,一个简单的示例:

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class SampleJob {

@Scheduled(cron = "* * * * * ?")
public void printLog() {
long time = System.currentTimeMillis();
log.info("当前时间:{}", time);
}
}

动态修改定时任务

方法一:仅修改任务周期

实现 SchedulingConfigurer 方法,重写 configureTasks 方法,重新制定 Trigger。下面上代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Component
public class DynamicCronJob implements SchedulingConfigurer {

/**
* 默认定时任务执行时间
*/
private String taskCron = GlobalConstants.TASK_DEFAULT_CRON;

@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.addTriggerTask(() -> {
long time = System.currentTimeMillis();
log.info("执行任务,当前时间:{}", time);
}, triggerContext -> {
// 刷新cron时间
CronTrigger cronTrigger = new CronTrigger(taskCron);
Date nextExecuteTime = cronTrigger.nextExecutionTime(triggerContext);
return nextExecuteTime;
});
}

/**
* 修改默认的定时任务时间
* */
public void setTaskCron(String taskCron) {
this.taskCron = taskCron;
}
}

这里的核心方法是 scheduledTaskRegistrar.addTriggerTask,它只接收两个参数,分别是调度任务实例(Runable实例),Trigger实例。如果想修改定时任务的时间,其实修改的就是这里的nextExecutionTime,返回下次执行时间。

Read more »

之前写过一篇线程池,可以大致了解关于线程池的概念以及运行过程。 Java 线程池

线程池常量

1
2
3
4
5
6
7
8
9
10
11
12
13
// 标记线程池昨天,默认RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程个数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor#execute

execute(Runnable command) 方法是提交任务到线程池执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取当前线程池的状态+线程个数遍历组合值
int c = ctl.get();
// 当前线程池小于核心线程池,开启新线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于RUNNING,将任务添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列满了,创建非核心线程执行任务,如果失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

这块代码的逻辑就是线程池如何处理新任务的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 检测队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 循环CAS 增加线程数
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 调用 w.run,实际调用 runWorker方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker() 方法中参数 core 表明当前任务是绑定核心线程还是非核心线程。

非核心线程池回收

Read more »

spring boot 源码版本 2.0.9.RELEASE。spring boot 高版本,没有走AutoConfigurationImportSelector#selectImports方法。

Spring Boot 自动装配

先看下Spring Boot 启动类注释 @SpringBootApplication。在这些注解中,重要的就是两个 @SpringBootConfiguration@EnableAutoConfiguration。从下图可以看到该注解是个组合注解,本文关于自动装配,用了到里面的 @EnableAutoConfiguration 里面的 @Import(AutoConfigurationImportSelector.class)

@Import({AutoConfigurationImportSelector.class}) 主要功能就是自动配置导入选择器。

AutoConfigurationImportSelector#selectImports

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
if (!isEnabled(annotationMetadata)) {
return NO_IMPORTS;
}
AutoConfigurationMetadata autoConfigurationMetadata = AutoConfigurationMetadataLoader
.loadMetadata(this.beanClassLoader);
AnnotationAttributes attributes = getAttributes(annotationMetadata);
// 所以的配置存放在configuration中,获取所有依赖的配置
List<String> configurations = getCandidateConfigurations(annotationMetadata,
attributes);
configurations = removeDuplicates(configurations);
Set<String> exclusions = getExclusions(annotationMetadata, attributes);
checkExcludedClasses(configurations, exclusions);
configurations.removeAll(exclusions);
configurations = filter(configurations, autoConfigurationMetadata);
fireAutoConfigurationImportEvents(configurations, exclusions);
return StringUtils.toStringArray(configurations);
}

参数含义:

  • AnnotationMetadata:SpringBoot启动类上的注释的全限定名,比如 @com.springboot.sample.starter.annotation.EnableSampleServer()
  • AutoConfigurationMetadata:依赖的AutoConfiguration类的元数据

AutoConfigurationImportSelector#getCandidateConfigurations

Read more »

源码版本是 2.7.8

Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分,第一部分是前置工作,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。本篇文章将会对这三个部分代码进行详细的分析。

在 2.7.8 版本中,服务导出的入口已经在 DubboBootstrapApplicationListener onApplicationContextEvent方法是在 onApplicationEvent中引用的,onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷事件后执行服务导出操作。

在DubboBootstrap.start调用中,会调用一个exportServices方法,这个方法中,会调用 export 方法,这是就开始服务导出流程了。

前置工作

export

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public synchronized void export() {
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}

checkAndUpdateSubConfigs();

//init serviceMetadata 初始化服务元数据
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());

//如果不允许暴露,直接放过 <dubbo:provider export="false" />
if (!shouldExport()) {
return;
}
// 如果需要延迟,则延迟暴露
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 直接暴露
doExport();
}
exported();
}

ServiceConfig.export是继承重写了父类ServiceConfigBase.export方法。在export方法中,主要是以下逻辑:

  • 检测 dubbo:service 标签的 interface 属性合法性,不合法则抛出异常
  • 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
  • 检测并处理泛化服务和普通服务类
  • 检测本地存根配置,并进行相应的处理
  • 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常
  • 主要是初始化服务元数据
  • 对export配置项进行检查,判断是否需要导出服务
  • 如果需要延迟,则延迟导出,否则直接导出服务
Read more »

源码版本是 2.7.8

在以前的文章中,介绍过 Java SPI机制,想了解的可以进去了解下。今天我们要讲的是 Dubbo SPI机制。

Dubbo SPI 实例

本实例参考的是dubbo官方给的官方实例。

首先定义一个接口,名称Robot。

1
2
3
public interface Robot {
void sayHello();
}

接下来定义两个实现类,分别是 OptimusPrime 和 Bumblebee。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class OptimusPrime implements Robot {

@Override
public void sayHello() {
System.out.println("Hello, I am Optimus Prime.");
}
}

public class Bumblebee implements Robot {

@Override
public void sayHello() {
System.out.println("Hello, I am Bumblebee.");
}
}

接下来在META-INF/dubbo文件夹下创建一个文件,名称为Robot 的全限定名 com.dubbo.provider.demo.Robot (根据项目实际的全限定名),文件配置内容:

1
2
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee
Read more »

源码版本是 2.7.8

什么是注册中心?

服务治理框架中大致分为服务通信和服务管理两部分,服务管理可以分为服务注册、服务发现以及服务被热加工介入,服务提供者Provider会往注册中心注册服务,而消费者Consumer会从注册中心订阅相关服务,并不会订阅全部的服务。

dubbo-registry 模块

在dubbo中,注册中心相关的代码在dubbo-registry模块下,子模块dubbo-registry-api中定义了注册中心相关的基础代码,而在dubbo-registry-xxx模块中则定义了具体的注册中心类型实现代码,例如dubbo-registry-zookeeper模块则存放了zookeeper注册中心的实现代码。

类关系图:

dubbo-registry-api 相关实现

Read more »