gitbook/Java业务开发常见错误100例/docs/263776.md
2022-09-03 22:05:03 +08:00

339 lines
25 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 答疑篇:设计篇思考题答案合集
你好,我是朱晔。
今天我们继续一起分析这门课“设计篇”模块的第21~26讲的课后思考题。这些题目涉及了代码重复、接口设计、缓存设计、生产就绪、异步处理和数据存储这6大知识点。
接下来,我们就一一具体分析吧。
### [21 | 代码重复:搞定代码重复的三个绝招](https://time.geekbang.org/column/article/228964)
**问题1**除了模板方法设计模式是减少重复代码的一把好手观察者模式也常用于减少代码重复并且是松耦合方式Spring 也提供了类似工具(点击[这里](https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/core.html#context-functionality-events-annotation)查看),你能想到观察者模式有哪些应用场景吗?
其实和使用MQ来解耦系统和系统的调用类似应用内各个组件之间的调用我们也可以使用观察者模式来解耦特别是当你的应用是一个大单体的时候。观察者模式除了是让组件之间可以更松耦合还能更有利于消除重复代码。
其原因是,对于一个复杂的业务逻辑,里面必然涉及到大量其它组件的调用,虽然我们没有重复写这些组件内部处理逻辑的代码,但是这些复杂调用本身就构成了重复代码。
我们可以考虑把代码逻辑抽象一下,抽象出许多事件,围绕这些事件来展开处理,那么这种处理模式就从“命令式”变为了“环境感知式”,每一个组件就好像活在一个场景中,感知场景中的各种事件,然后又把发出处理结果作为另一个事件。
经过这种抽象,复杂组件之间的调用逻辑就变成了“事件抽象+事件发布+事件订阅”,整个代码就会更简化。
补充说明一下,除了观察者模式我们还经常听到发布订阅模式,那么它们有什么区别呢?
其实观察者模式也可以叫做发布订阅模式。不过在严格定义上前者属于松耦合后者必须要MQ Broker的介入实现发布者订阅者的完全解耦。
**问题2**关于 Bean 属性复制工具,除了最简单的 Spring 的 BeanUtils 工具类的使用,你还知道哪些对象映射类库吗?它们又有什么功能呢?
答:在众多对象映射工具中,[MapStruct](https://github.com/mapstruct/mapstruct)更具特色一点。它基于JSR 269的Java注解处理器实现你可以理解为它是编译时的代码生成器使用的是纯Java方法而不是反射进行属性赋值并且做到了编译时类型安全。
如果你使用IDEA的话可以进一步安装 [IDEA MapStruct Support插件](https://plugins.jetbrains.com/plugin/10036-mapstruct-support),实现映射配置的自动完成、跳转到定义等功能。关于这个插件的具体功能,你可以参考[这里](https://mapstruct.org/news/2017-09-19-announcing-mapstruct-idea/)。
### [22 | 接口设计:系统间对话的语言,一定要统一](https://time.geekbang.org/column/article/228968)
**问题1**在“接口的响应要明确表示接口的处理结果”这一节的例子中接口响应结构体中的code字段代表执行结果的错误码对于业务特别复杂的接口可能会有很多错误情况code可能会有几十甚至几百个。客户端开发人员需要根据每一种错误情况逐一写if-else进行不同交互处理会非常麻烦你觉得有什么办法来改进吗作为服务端是否有必要告知客户端接口执行的错误码呢
答:服务端把错误码反馈给客户端有两个目的,一是客户端可以展示错误码方便排查问题,二是客户端可以根据不同的错误码来做交互区分。
**对于第一点方便客户端排查问题**,服务端应该进行适当的收敛和规整错误码,而不是把服务内可能遇到的、来自各个系统各个层次的错误码,一股脑地扔给客户端提示给用户。
我的建议是,开发一个错误码服务来专门治理错误码,实现错误码的转码、分类和收敛逻辑,甚至可以开发后台,让产品来录入需要的错误码提示消息。
此外我还建议错误码由一定的规则构成比如错误码第一位可以是错误类型比如A表示错误来源于用户B表示错误来源于当前系统往往是业务逻辑出错或程序健壮性差等问题C表示错误来源于第三方服务第二、第三位可以是错误来自的系统编号比如01来自用户服务02来自商户服务等等后面三位是自增错误码ID。
**对于第二点对不同错误码的交互区分**,我觉得更好的做法是服务端驱动模式,让服务端告知客户端如何处理,说白了就是客户端只需要照做即可,不需要感知错误码的含义(即便客户端显示错误码,也只是用于排错)。
比如服务端的返回可以包含actionType和actionInfo两个字段前者代表客户端应该做的交互动作后者代表客户端完成这个交互动作需要的信息。其中actionType可以是toast无需确认的消息提示、alert需要确认的弹框提示、redirectView转到另一个视图、redirectWebView打开Web视图actionInfo就是toast的信息、alert的信息、redirect的URL等。
由服务端来明确客户端在请求API后的交互行为主要的好处是灵活和统一两个方面。
* 灵活在于两个方面第一在紧急的时候还可以通过redirect方式进行救急。比如遇到特殊情况需要紧急进行逻辑修改的情况时我们可以直接在不发版的情况下切换到H5实现。第二是我们可以提供后台让产品或运营来配置交互的方式和信息而不是改交互改提示还需要客户端发版
* 统一有的时候会遇到不同的客户端比如iOS、Android、前端对于交互的实现不统一的情况如果API结果可以规定这部分内容那就可以彻底避免这个问题。
**问题2**在“要考虑接口变迁的版本控制策略”这一节的例子中,我们在类或方法上标记@APIVersion自定义注解实现了URL方式统一的接口版本定义。你可以用类似的方式也就是自定义RequestMappingHandlerMapping来实现一套统一的基于请求头方式的版本控制吗
我在GitHub上第21讲的源码中更新了我的实现你可以点击[这里](https://github.com/JosephZhu1983/java-common-mistakes/tree/master/src/main/java/org/geekbang/time/commonmistakes/apidesign/headerapiversion)查看。主要原理是定义自己的RequestCondition来做请求头的匹配
```
public class APIVersionCondition implements RequestCondition<APIVersionCondition> {
@Getter
private String apiVersion;
@Getter
private String headerKey;
public APIVersionCondition(String apiVersion, String headerKey) {
this.apiVersion = apiVersion;
this.headerKey = headerKey;
}
@Override
public APIVersionCondition combine(APIVersionCondition other) {
return new APIVersionCondition(other.getApiVersion(), other.getHeaderKey());
}
@Override
public APIVersionCondition getMatchingCondition(HttpServletRequest request) {
String version = request.getHeader(headerKey);
return apiVersion.equals(version) ? this : null;
}
@Override
public int compareTo(APIVersionCondition other, HttpServletRequest request) {
return 0;
}
}
```
并且自定义RequestMappingHandlerMapping来把方法关联到自定义的RequestCondition
```
public class APIVersionHandlerMapping extends RequestMappingHandlerMapping {
@Override
protected boolean isHandler(Class<?> beanType) {
return AnnotatedElementUtils.hasAnnotation(beanType, Controller.class);
}
@Override
protected RequestCondition<APIVersionCondition> getCustomTypeCondition(Class<?> handlerType) {
APIVersion apiVersion = AnnotationUtils.findAnnotation(handlerType, APIVersion.class);
return createCondition(apiVersion);
}
@Override
protected RequestCondition<APIVersionCondition> getCustomMethodCondition(Method method) {
APIVersion apiVersion = AnnotationUtils.findAnnotation(method, APIVersion.class);
return createCondition(apiVersion);
}
private RequestCondition<APIVersionCondition> createCondition(APIVersion apiVersion) {
return apiVersion == null ? null : new APIVersionCondition(apiVersion.value(), apiVersion.headerKey());
}
}
```
### [23 | 缓存设计:缓存可以锦上添花也可以落井下石](https://time.geekbang.org/column/article/231501)
**问题1**在聊到缓存并发问题时,我们说到热点 Key 回源会对数据库产生的压力问题,如果 Key 特别热的话,可能缓存系统也无法承受,毕竟所有的访问都集中打到了一台缓存服务器。如果我们使用 Redis 来做缓存,那可以把一个热点 Key 的缓存查询压力,分散到多个 Redis 节点上吗?
Redis 4.0以上如果开启了LFU算法作为maxmemory-policy那么可以使用hotkeys配合redis-cli命令行工具来探查热点Key。此外我们还可以通过MONITOR命令来收集Redis执行的所有命令然后配合[redis-faina工具](https://github.com/facebookarchive/redis-faina)来分析热点Key、热点前缀等信息。
对于如何分散热点Key对于Redis单节点的压力的问题我们可以考虑为Key加上一定范围的随机数作为后缀让一个Key变为多个Key相当于对热点Key进行分区操作。
当然除了分散Redis压力之外我们也可以考虑再做一层短时间的本地缓存结合Redis的Keyspace通知功能来处理本地缓存的数据同步。
**问题2**大 Key 也是数据缓存容易出现的一个问题。如果一个 Key 的 Value 特别大,那么可能会对 Redis 产生巨大的性能影响,因为 Redis 是单线程模型,对大 Key 进行查询或删除等操作,可能会引起 Redis 阻塞甚至是高可用切换。你知道怎么查询 Redis 中的大 Key以及如何在设计上实现大 Key 的拆分吗?
Redis的大Key可能会导致集群内存分布不均问题并且大Key的操作可能也会产生阻塞。
关于查询Redis中的大Key我们可以使用redis-cli --bigkeys命令来实时探查大Key。此外我们还可以使用redis-rdb-tools工具来分析Redis的RDB快照得到包含Key的字节数、元素个数、最大元素长度等信息的CSV文件。然后我们可以把这个CSV文件导入MySQL中写SQL去分析。
针对大Key我们可以考虑两方面的优化
* 第一是否有必要在Redis保存这么多数据。一般情况下我们在缓存系统中保存面向呈现的数据而不是原始数据对于原始数据的计算我们可以考虑其它文档型或搜索型的NoSQL数据库。
* 第二考虑把具有二级结构的Key比如List、Set、Hash拆分成多个小Key来独立获取或是用MGET获取
此外值得一提的是大Key的删除操作可能会产生较大性能问题。从Redis 4.0开始我们可以使用UNLINK命令而不是DEL命令在后台删除大Key而对于4.0之前的版本我们可以考虑使用游标删除大Key中的数据而不是直接使用DEL命令比如对于Hash使用HSCAN+HDEL结合管道功能来删除。
### [24 | 业务代码写完,就意味着生产就绪了?](https://time.geekbang.org/column/article/231568)
**问题1**Spring Boot Actuator提供了大量内置端点你觉得端点和自定义一个@RestController有什么区别呢你能否根据[官方文档](https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-features.html#production-ready-endpoints-custom),开发一个自定义端点呢?
Endpoint是Spring Boot Actuator抽象出来的一个概念主要用于监控和配置。使用@Endpoint注解自定义端点配合方法上的@ReadOperation、@WriteOperation、@DeleteOperation注解分分钟就可以开发出自动通过HTTP或JMX进行暴露的监控点。
如果只希望通过HTTP暴露的话可以使用@WebEndpoint注解如果只希望通过JMX暴露的话可以使用@JmxEndpoint注解。
而使用@RestController一般用于定义业务接口如果数据需要暴露到JMX的话需要手动开发。
比如,下面这段代码展示了如何定义一个累加器端点,提供了读取操作和累加两个操作:
```
@Endpoint(id = "adder")
@Component
public class TestEndpoint {
private static AtomicLong atomicLong = new AtomicLong();
//读取值
@ReadOperation
public String get() {
return String.valueOf(atomicLong.get());
}
//累加值
@WriteOperation
public String increment() {
return String.valueOf(atomicLong.incrementAndGet());
}
}
```
然后我们可以通过HTTP或JMX来操作这个累加器。这样我们就实现了一个自定义端点并且可以通过JMX来操作
![](https://static001.geekbang.org/resource/image/c4/0a/c46526acec7d7b72714b73073ee42f0a.png)
**问题2**在介绍指标Metrics时我们看到InfluxDB中保存了由Micrometer框架自动帮我们收集的一些应用指标。你能否参考源码中两个Grafana配置的JSON文件把这些指标在Grafana中配置出一个完整的应用监控面板呢
答:我们可以参考[Micrometer源码中的binder包](https://github.com/micrometer-metrics/micrometer/tree/master/micrometer-core/src/main/java/io/micrometer/core/instrument/binder)下面的类来了解Micrometer帮我们自动做的一些指标。
* JVM在线时间process.uptime
* 系统CPU使用system.cpu.usage
* JVM进程CPU使用process.cpu.usage
* 系统1分钟负载system.load.average.1m
* JVM使用内存jvm.memory.used
* JVM提交内存jvm.memory.committed
* JVM最大内存jvm.memory.max
* JVM线程情况jvm.threads.states
* JVM GC暂停jvm.gc.pause、jvm.gc.concurrent.phase.time
* 剩余磁盘disk.free
* Logback日志数量logback.events
* Tomcat线程情况最大、繁忙、当前tomcat.threads.config.max、tomcat.threads.busy、tomcat.threads.current
具体的面板配置方式,[第24讲](https://time.geekbang.org/column/article/231568)中已有说明。这里,我只和你分享在配置时会用到的两个小技巧。
第一个小技巧是把公共的标签配置为下拉框固定在页头显示一般来说我们会配置一个面板给所有的应用使用每一个指标中我们都会保存应用名称、IP地址等信息这个功能可以使用Micrometer的CommonTags实现参考[文档](http://micrometer.io/docs/concepts)的5.2节我们可以利用Grafana的[Variables](https://grafana.com/docs/grafana/latest/variables/templates-and-variables/)功能把应用名称和IP展示为两个下拉框显示同时提供一个adhoc筛选器自由增加筛选条件
![](https://static001.geekbang.org/resource/image/4e/d0/4e6255c68aeecd241cd7629321c5e2d0.png)
来到Variables面板可以看到我配置的三个变量
![](https://static001.geekbang.org/resource/image/49/29/493492d36405c8f9ed31eb2924276729.png)
Application和IP两个变量的查询语句如下
```
SHOW TAG VALUES FROM jvm_memory_used WITH KEY = "application_name"
SHOW TAG VALUES FROM jvm_memory_used WITH KEY = "ip" WHERE application_name=~ /^$Application$/
```
第二个小技巧是利用GROUP BY功能展示一些明细的曲线类似jvm\_threads\_states、jvm.gc.pause等指标中包含了更细节的一些状态区分标签比如jvm\_threads\_states中的state标签代表了线程状态。一般而言我们在展现图表的时候需要按照线程状态分组分曲线显示
![](https://static001.geekbang.org/resource/image/bc/62/bc74c6yy84d233c429258406794a5262.png)
配置的InfluxDB查询语句是
```
SELECT max("value") FROM "jvm_threads_states" WHERE ("application_name" =~ /^$Application$/ AND "ip" =~ /^$IP$/) AND $timeFilter GROUP BY time($__interval), "state" fill(none)
```
这里可以看到application\_name和ip两个条件的值是关联到刚才我们配置的两个变量的在GROUP BY中增加了按照state的分组。
### [25 | 异步处理好用,但非常容易用错](https://time.geekbang.org/column/article/234928)
**问题1**在用户注册后发送消息到MQ然后会员服务监听消息进行异步处理的场景下有些时候我们会发现虽然用户服务先保存数据再发送MQ但会员服务收到消息后去查询数据库却发现数据库中还没有新用户的信息。你觉得这可能是什么问题呢又该如何解决呢
答:我先来分享下,我遇到这个问题的真实情况。
当时我们是因为业务代码把保存数据和发MQ消息放在了一个事务中收到消息的时候有可能事务还没有提交完成。为了解决这个问题开发同学当时的处理方式是收MQ消息的时候Sleep 1秒再去处理。这样虽然解决了问题但却大大降低了消息处理的吞吐量。
更好的做法是先提交事务完成后再发MQ消息。但是这又引申出来一个问题MQ消息发送失败怎么办如何确保发送消息和本地事务有整体事务性这就需要进一步考虑建立本地消息表来确保MQ消息可补偿把业务处理和保存MQ消息到本地消息表的操作放在相同事务内处理然后异步发送和补偿消息表中的消息到MQ。
**问题2**除了使用Spring AMQP实现死信消息的重投递外RabbitMQ 2.8.0 后支持的死信交换器DLX也可以实现类似功能。你能尝试用DLX实现吗并比较下这两种处理机制
其实RabbitMQ的[DLX死信交换器](https://www.rabbitmq.com/dlx.html)和普通交换器没有什么区别只不过它有一个特点是可以把其它队列关联到这个DLX交换器上然后消息过期后自动会转发到DLX交换器。那么我们就可以利用这个特点来实现延迟消息重投递经过一定次数之后还是处理失败则作为死信处理。
实现结构如下图所示:
![](https://static001.geekbang.org/resource/image/41/36/4139d9cbefdabbc793340ddec182a936.png)
关于这个实现架构图,我需要说明的是:
* 为了简单起见,图中圆柱体代表交换器+队列并省去了RoutingKey。
* WORKER作为DLX用于处理消息BUFFER用于临时存放需要延迟重试的消息WORKER和BUFFER绑定在一起。
* DEAD用于存放超过重试次数的死信。
* 在这里WORKER其实是一个DLX我们把它绑定到BUFFER实现延迟重试。
通过RabbitMQ实现具有延迟重试功能的消息重试以及最后进入死信队列的整个流程如下
1. 客户端发送记录到WORKER
2. Handler收到消息后处理失败
3. 第一次重试发送消息到BUFFER
4. 3秒后消息过期自动转发到WORKER
5. Handler再次收到消息后处理失败
6. 第二次重试发送消息到BUFFER
7. 3秒后消息过期还是自动转发到WORKER
8. Handler再次收到消息后处理失败达到最大重试次数
9. 发送消息到DEAD作为死信消息
10. DeadHandler收到死信处理比如进行人工处理
整个程序的日志输出如下,可以看到输出日志和我们前面贴出的结构图、详细解释的流程一致:
```
[21:59:48.625] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.r.DeadLetterController:24 ] - Client 发送消息 msg1
[21:59:48.640] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.rabbitmqdlx.MQListener:27 ] - Handler 收到消息msg1
[21:59:48.641] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.rabbitmqdlx.MQListener:33 ] - Handler 消费消息msg1 异常准备重试第1次
[21:59:51.643] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.rabbitmqdlx.MQListener:27 ] - Handler 收到消息msg1
[21:59:51.644] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.rabbitmqdlx.MQListener:33 ] - Handler 消费消息msg1 异常准备重试第2次
[21:59:54.646] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.rabbitmqdlx.MQListener:27 ] - Handler 收到消息msg1
[21:59:54.646] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.rabbitmqdlx.MQListener:40 ] - Handler 消费消息msg1 异常,已重试 2 次,发送到死信队列处理!
[21:59:54.649] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [ERROR] [o.g.t.c.a.rabbitmqdlx.MQListener:62 ] - DeadHandler 收到死信消息: msg1
```
接下来,我们再对比下这种实现方式和[第25讲](https://time.geekbang.org/column/article/234928)中Spring重试的区别。其实这两种实现方式的差别很大体现在下面两点。
第一点Spring的重试是在处理的时候在线程内休眠进行延迟重试消息不会重发到MQ我们这个方案中处理失败的消息会发送到RMQ由RMQ做延迟处理。
第二点Spring的重试方案只涉及普通队列和死信队列两个队列或者说交换器我们这个方案的实现中涉及工作队列、缓冲队列用于存放等待延迟重试的消息和死信队列真正需要人工处理的消息三个队列。
当然了如果你希望把存放正常消息的队列和把存放需要重试处理消息的队列区分开的话可以把我们这个方案中的队列再拆分下变为四个队列也就是工作队列、重试队列、缓冲队列关联到重试队列作为DLX和死信队列。
这里我再强调一下虽然说我们利用了RMQ的DLX死信交换器的功能但是我们把DLX当做了工作队列来使用因为我们利用的是其能自动从BUFFER缓冲队列接收过期消息的特性。
这部分源码比较长我直接放在GitHub上了。感兴趣的话你可以点击[这里的链接](https://github.com/JosephZhu1983/java-common-mistakes/tree/master/src/main/java/org/geekbang/time/commonmistakes/asyncprocess/rabbitmqdlx)查看。
### [26 | 数据存储NoSQL与RDBMS如何取长补短、相辅相成](https://time.geekbang.org/column/article/234930)
**问题1**我们提到InfluxDB不能包含太多tag。你能写一段测试代码来模拟这个问题并观察下InfluxDB的内存使用情况吗
我们写一段如下的测试代码向InfluxDB写入大量指标每一条指标关联10个Tag每一个Tag都是100000以内的随机数这种方式会造成[high series cardinality问题](https://docs.influxdata.com/influxdb/v1.7/concepts/schema_and_data_layout/#don-t-have-too-many-serieshigh%20series%20cardinality)从而大量占用InfluxDB的内存。
```
@GetMapping("influxdbwrong")
public void influxdbwrong() {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder()
.connectTimeout(1, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS);
try (InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086", "root", "root", okHttpClientBuilder)) {
influxDB.setDatabase("performance");
//插入100000条记录
IntStream.rangeClosed(1, 100000).forEach(i -> {
Map<String, String> tags = new HashMap<>();
//每条记录10个tagtag的值是100000以内随机值
IntStream.rangeClosed(1, 10).forEach(j -> tags.put("tagkey" + i, "tagvalue" + ThreadLocalRandom.current().nextInt(100000)));
Point point = Point.measurement("bad")
.tag(tags)
.addField("value", ThreadLocalRandom.current().nextInt(10000))
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.build();
influxDB.write(point);
});
}
}
```
不过因为InfluxDB的默认参数配置限制了Tag的值数量以及数据库Series数量
```
max-values-per-tag = 100000
max-series-per-database = 1000000
```
所以这个程序很快就会出错无法形成OOM你可以把这两个参数改为0来解除这个限制。
继续运行程序我们可以发现InfluxDB占用大量内存最终出现OOM。
**问题2**文档数据库MongoDB也是一种常用的NoSQL。你觉得MongoDB的优势和劣势是什么呢它适合用在什么场景下呢
MongoDB是目前比较火的文档型NoSQL。虽然MongoDB 在4.0版本后具有了事务功能但是它整体的稳定性相比MySQL还是有些差距。因此MongoDB不太适合作为重要数据的主数据库但可以用来存储日志、爬虫等数据重要程度不那么高但写入并发量又很大的场景。
虽然MongoDB的写入性能较高但复杂查询性能却相比Elasticsearch来说没啥优势虽然MongoDB有Sharding功能但是还不太稳定。因此我个人建议在数据写入量不大、更新不频繁并且不需要考虑事务的情况下使用Elasticsearch来替换MongoDB。
以上就是咱们这门课的第21~26讲的思考题答案了。
关于这些题目,以及背后涉及的知识点,如果你还有哪里感觉不清楚的,欢迎在评论区与我留言,也欢迎你把今天的内容分享给你的朋友或同事,一起交流。