0%

消息中间件的作用

消息中间件的作用可以概括如下:

  • 解耦:消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要事先这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束即可。
  • 冗余(储存):有些情况下,处理数据的过程会失败。消息中间件可以把数据持久化直到它们已经完全处理,通过这一范式规避了数据丢失风险。
  • 扩展性:因为消息中间件解耦了应用的处理过程,所以提高了消息入队和处理的效率和容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
  • 消峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
  • 可恢复性:即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
  • 顺序保证:消息中间件支持一定程度上的顺序性
  • 缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
  • 异步通信:消息中间件提供了异步处理机制。

AMQP

AMQP,即 Advanced Message Queuing Protocol,一种提供统一消息服务的应用层标准高级
消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端和消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开放语言等条件的限制。基于此协议的消息中间件有RabbitMQ。

各种MQ的比较

目前业界有很多MQ产品,比如 RabbitMQ、RocketMQ、Kafka、ActivceMQ等。其中ActiceMQ现在社区活跃度不是很高,已经被很多人弃用了。其中RabbitMQ 是基于erlang语言的,虽然其实开源的,但是如果需要定制化的话,维护是一件很麻烦的事情。Kafka 和 RocketMQ ,前者是Scala,后者是阿里出品的,基于Java开发的。所以,我们一般选择MQ的时候,主要从RabbitMQ、Kafka、RocketMQ这几个主流MQ中选择一个适合的。

Kafka

Kafka 主要定位在日志等方面,因为Kafka 设计的初衷就是为了处理日志。

Read more »

最近一段时间在了解领域模型,之前拜读了下《领域驱动设计——软件核心复杂性应对之道》,结果看的云里雾里,晦涩的语句,不明所以的专业术语,加上翻译导致的语句流畅性,可以说观看体验并不是很好。然后同事推荐我先看《实现领域驱动设计》这本书,但是对于这种软件设计的书,稍微之前那本好点了。以前都是“talk is cheap, show me the code”,加上自己在这方面没啥经验积累,看的过程中,没啥共鸣。

接下来主要是自己在看的过程中的一些笔记和理解。

领域模型之贫血模型和充血模型

贫血模型:Model中,仅包含状态属性,不包含个行为,采用这种设计时,需要分离出DB曾,左门用语数据库操作。现在的web软件开发主要使用的就是贫血模式。
优点:系统层次结构清楚,各层之间单向依赖,缺点是不够面向对象

充血模型:Model中即包含状态,也包含行为,是最符合面向对象的设计方式。
优点面向对象,缺点比较复杂,对技术要求更高。
Spring data 的 Repository 是对充血模型的最佳实践

领域、子域和限界上下文

在DDD领域中,一个领域被分为若干个子域,领域模型在界限上下文中进行开发。

限界上下文是一个显示边界,灵越模型便存在边界之内。在边界内,通用语言中的所有术语和词组都有特定的含义,而模型需要准确的反映通用语言。

架构风格

在选择使用框架时,需要明确其使用目的:建立一种可以表达领域模型的实现并且用它来解决重要问题。

Read more »

数据库是一个多用户共享的资源,这样的话对于多个用户在存取同一数据的时候,就会出现问题,举个最经典的问题—-票务系统,如何保证数据的正确性。当只剩下最后1张票的时候,两个用户同时取到数据并更新,那么最后是谁买到票了呢?

数据库事务

数据库事务是指单个逻辑工作单元执行一系列操作,要么完成执行,要么完成不执行。数据库事务必须满足ACID(原子性、一致性、隔离性、持久性)。

  • 原子性:对于其数据的修改,要么全部执行,要么完全不执行。原子性消除了系统处理操作子集的可能。
  • 一致性:事务完成时,必须是所有的数据都保持一致
  • 隔离性:由并发事务所做的修改必须与任何其他事务并发事务所做的修改隔离
  • 持久性:完成事务后,对系统的修改时永久性的

事务隔离级别

  • 未提交读:当前事务未提交、其他事务也能读到
  • 提交读:当前事务提交之后,其他事务才能看到
  • 可重复读:该级别解决了同一事务多次读取同样记录的结果是一致的。但是理论上,还是无法解决另一个幻读问题。幻读 是指当前事务读取某个范围内的记录时,其他事务在该范围又加入新的纪录,之前的事务再次从该范围读取数据时,会产生幻行。这个的换行是对于 insert 操作来说的,对于 update 操作能保证没有幻行问题。
  • 可串行化:最高的事务隔离级别。强制事务串行化执行,避免了幻读问题,这种级别会在数据的每一行都加锁,会产生大量的超时和锁竞争,实际中很少用到,除非要确保数据的一致性且没有并发问题

今天我们所讲的数据库是MySQL。InnoDB支持行/表级锁,默认行级锁。

共享锁

Read more »

现在国内对于数据库分库分表的开源方案,主要是mycat和sharding-sphere,本文主要是自己对于sharding-sphere使用的一些记录。

sharding-sphere简单介绍

Sharding-JDBC 采用在 JDBC 层扩展分库分表,支持读写分离,是一个以 jar 形式提供服务的轻量级组件,其核心思路是小而美地完成最核心的事情,基于 JDBC 层进行分片的好处是轻量、简单、兼容性好以及无需额外的运维工作。缺点是无法跨语言,目前仅支持 Java。

Sharding-Sphere的3个产品的数据分片主要流程是一致的。核心思想是:SQL解析 => 执行器优化 => SQL路由 => SQL改写 => SQL执行 => 结果归并的流程组成。(详情查看官网文档)

分库分表

开发环境: 项目搭建使用的是 Spring Boot + Sharding-Sphere + MyBatis。

注意:Sharding-Sphere 对于Spring-Boot 好像还不支持2.x版本以上,请选择1.x版本,详细源码,请查看相应的demo。

分别创建2个数据库,然后创建相应的表,创建数据库DDL如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE `t_order` (
`order_id` bigint(20) NOT NULL AUTO_INCREMENT,
`status` varchar(255) DEFAULT NULL,
`user_id` int(11) DEFAULT NULL,
PRIMARY KEY (`order_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=344805296301932545 DEFAULT CHARSET=utf8;

CREATE TABLE `t_order_item` (
`order_item_id` bigint(20) NOT NULL,
`order_id` bigint(20) DEFAULT NULL,
`status` varchar(255) DEFAULT NULL,
`user_id` int(11) DEFAULT NULL,
PRIMARY KEY (`order_item_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Read more »

今天在看dubbo源码的时候,看到大量的SPI,对于SPI不是很明白,于是网上看资料和例子,有了这篇文章。

SPI(Service Provider Interface),是Java提供的一套用来被第三方实现或者扩展的API,可以用来启动框架扩展和替换组件。

使用场景

  • 数据库驱动加载
  • dubbo
  • 日志门面模式实现不同日志

SPI 的使用

定义接口并实现接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface Spi {

/**
* spi接口
* */
void sayHello();
}
public class Cat implements Spi {

@Override
public void sayHello() {
System.out.println("Hello World! This is a Cat");
}
}

public class Dog implements Spi {

@Override
public void sayHello() {
System.out.println("Hello World! This is a Dog");
}
}

src/main/resources/ 创建文件

在src/main/resources/ 目录下创建 /META-INF/services文件(关于services文件夹,如果使用Java自带的需要使用这个名字,如果自己实现可以自定义),并在文件夹中创建与接口同名的文件——com.example.spi.Spi

Read more »

最近在看与RPC相关的东西,在GitHub上看到一个使用Java实现的简单RPC框架,于是自己也想用Java实现一个简单的RPC,以便加深对于RPC框架的理解。本篇文章主要是记录如何使用ZooKeeper作为RPC框架的注册中心,实现服务的注册和发现。

什么是RPC?

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。正式的描述是:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

基于ZooKeeper实现的服务注册中心

如果对于dubbo这款国产RPC框架有一定的了解,就知道最开始它是基于ZooKeeper实现服务的注册和发现的。关于服务的注册和发现,主要是把服务名以及服务相关的服务器IP地址注册到注册中心,在使用服务的时候,只需要根据服务名,就可以得到所有服务地址IP,然后根据一定的负载均衡策略来选择IP地址。

下图是服务的注册和发现接口:

服务的注册

在ZooKeeper的节点概念中,Znode有四种类型,PERSISTENT(持久节点)、PERSISTENT_SEQUENTIAL(持久的连续节点)、EPHEMERAL(临时节点)、EPHEMERAL_SEQUENTIAL(临时的连续节点)。Znode的类型在创建时确定并且之后不能再修改。

关于服务的注册,其实就是把服务和IP注册到ZooKeeper的节点中。

Read more »

如果有什么错误的地方,希望指出。

前几天有个面试,在面试最后的时候,面试官说问个比较偏僻的知识点,问了关于Java引用的。于是我就把四种引用说了下。然后又问,你知道引用队列嘛?然后我懵逼了,只能说我不知道。

关于Java中的引用,可以看上面的链接,引用主要用于GC中的。

引用队列 ReferenceQueue 是用来配合引用工作的,没有 ReferenceQueue 一样可以运行。创建引用的时候可以指定关联的队列,当GC释放对象内存的时候,会将引用加入到引用队列的队列末尾,这相当于是一种通知机制。当关联的引用队列中有数据的时候,意味着引用指向的堆内存中的对象被回收。通过这种方式,JVM允许我们在对象被销毁后,做一些我们自己想做的事情。JVM提供了一个ReferenceHandler线程,将引用加入到注册的引用队列中。

关于引用队列,其类位于ref中,如图所示:

1
2
3
4
5
6
7
8
9
10
11
// 引用队列
ReferenceQueue<String> rq = newReferenceQueue<String>();
// 软引用
SoftReference<String> sr = newSoftReference<String>(new String("Soft"),rq);
// 弱引用
WeakReference<String> wr = newWeakReference<String>(new String("Weak"),rq);
// 幽灵引用
PhantomReference<String> pr = newPhantomReference<String>(new String("Phantom"),rq);

// 从引用队列中弹出一个对象引用
Reference<? extends String> ref = rq.poll();

ReferenceQueue 提供了三种方法来移除队列:

  • poll():用于移除并返回该队列中的下一个引用对象,如果队列为空,则返回null
  • remove():用于移除并返回该队列中的下一个引用对象,该方法会在队列返回可用引用对象之前一直阻塞
  • remove (long timeout):用于移除并返回队列中的下一个引用对象。该方法会在队列返回可用引用对象之前一直阻塞,或者在超出指定超时后结束。如果超出指定超时,则返回null。如果指定超时为0,意味着将无限期地等待。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ReferenceQueueDemo {

private static ReferenceQueue<byte[]> referenceQueue = new ReferenceQueue<>();
private static int _1M = 1024 * 1024;

public static void main(String[] args) {

Object object = new Object();
Map<Object, Object> map = new HashMap<>();

Thread thread = new Thread(() -> {
try {
int cnt = 0;
WeakReference<byte[]> k;
while ((k = (WeakReference) referenceQueue.remove()) != null) {
System.out.println((cnt++) + "回收了:" + k);
}
} catch (InterruptedException e) {
//结束循环
}
});
thread.setDaemon(true);
thread.start();

for (int i = 0; i < 10000; i++) {
byte[] bytes = new byte[_1M];
WeakReference<byte[]> weakReference = new WeakReference<>(bytes, referenceQueue);
map.put(weakReference, object);
}
System.out.println("map.size->" + map.size());

}
}
Read more »

在对于MySQL的优化,网上有很多小技巧,比如加索引。不过前几天在极客时间上买了门《MySQL实战45讲》。这篇文章主要是在学习过程中关于MySQL原理的一些笔记。

在学习如何优化的过程中,最好对于MySQL查询的过程有一定的理解,这样有利于如何进行优化。下面这张图片是MySQL的逻辑框架:

MySQL从图中可以看出,一般分为三部分:客户端、核心服务、存储引擎。客户端这个就不说了,主要是Java这些客户端;而关于存储引擎的,在之前整理的一篇文章有简绍——MySQL的存储引擎 —— InnoDB和MyIsAM。所以今天主要是讲解下关于核心服务。

MySQL优化原理

MySQL查询过程

mysql> select * from T where ID=10;

当我们输入上面这一条SQL查询语句的时候,发生了什么?

这里面主要涉及的是核心服务中的模块:连接器、查询缓存、分析器、优化器、执行器等,以及所有的内置函数(如日期、时间、数学和加密函数等),所有跨存储引擎的功能都在这一层实现,比如存储过程、触发器、视图等。

连接器

Read more »

如果本文有错,希望在下面的留言区指正。

在开篇,先提出一个问题,在Java中,通过继承 Thread 或者实现 Runable 创建一个线程的时候,如何获取该线程的返回结果呢?

在并发编程中,使用非阻塞模式的时候,就是出现上面的问题。这个时候就需要用到这次所讲的内容了——Future。

Future 主要功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Future<V> {

//使用该方法来取消一个任务,若取消成功,则返回true,否则返回false
boolean cancel(boolean mayInterruptIfRunning);

//判断任务是否已经取消
boolean isCancelled();

//判断任务是否已经完成
boolean isDone();

//当任务结束返回一个结果,如果调用时,为返回结果,则阻塞
V get() throws InterruptedException, ExecutionException;

//在指定时间内获取指定结果,如果没有获取,则返回null
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future 例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class FutureTest {

public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();

try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}

System.out.println("主线程在执行任务");

try {
System.out.println("task运行结果" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

System.out.println("所有任务执行完毕");
}

}
public class Task implements Callable<Integer> {

@Override
public Integer call() throws Exception {

System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 100; i++)
sum += i;
return sum;
}
}

Future 适用场景

在之前的一篇关于线程池中,详细介绍了Java的一些线程池知识点。那么对于使用线程池,除了管理线程资源外,如何能够实现节约时间呢?

比如现在一个请求中,给前端的返回结果,需要通过查询A、B、C,最后返回给前端,这三个查询分别耗时 10ms、20ms、10ms。如果正常的查询需要耗时40ms(忽略别的影响查询时间的因素)。但是如果把这三个查询交给线程池进行异步查询,那么,它的最终耗时是由最大耗时的那个查询决定的,这时就会发现查询变快了,只耗时20ms。

Read more »

如果有错希望指出。本文是在看到一些关于JVM参数调优文章后的一些内容摘要。

堆大小设置

-Xms

设置JVM 初始内存,即JVM启动时分配的内存。此值可以设置与 -Xmx 相同,以避免每次垃圾回收完成后 JVM 重新分配内存。

-Xmx

设置JVM 运行过程中分配的最大可用内存。

-Xss

-Xss128k:设置每个线程的堆栈大小。

-Xmn

设置年轻代大小。

Read more »