相信负责过“搜索服务”的伙伴,最害怕的一句话就是:“数据怎么又搜索不出来了!!!”。每当收到这句话,都会心中一颤,因为面对几千万甚至几亿的索引数据,我真的无从下手,不知道业务要搜索什么,也不知道是哪些数据出了问题….
目前,“搜索”已经成为后端管理平台的必备功能,在这个业务场景中,很多人都会基于 elasticsearch 强大的检索能力构建自己的搜索服务。但实际开发中,elasticsearch 的引入是非常小的一部分,往往大头是索引模型的数据管理,在整个过程中,我们
如此繁琐的事情,哪一环出现问题都会收到业务的投诉。
对搜索场景中的最佳实践进行封装,从而:
首先,增加对 spring data elasticsearch 的支持,具体 maven 坐标如下:
org.springframework.boot
spring-boot-starter-data-elasticsearch
在 application.yml 中添加 es 的配置信息,具体如下:
spring:
elasticsearch:
uris: http://localhost:9200
connection-timeout: 10s
socket-timeout: 30s
新建 SpringESConfiguration 配置信息,指定 ES Repository 的包信息,居然如下:
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.geekhalo.lego.wide.es")
public class SpringESConfiguration {
}
最后,引入 lego-starter,具体如下:
com.geekhalo.lego
lego-starter
0.1.14-wide-SNAPSHOT
至此,就完成了项目的准备工具,可以着手构建索引模型。
构造模型之前,需要构建一个 Enum 用以管理模型中所有关联数据,具体如下:
public enum WideOrderType implements WideItemType {
ORDER, // 订单主数据
USER, // 用户数据
ADDRESS, // 用户地址数据
PRODUCT // 购买商品数据
}
WideOrderType 枚举实现 WideItemType 接口,用于与框架进行集成。
接下来,构建待索引的宽表模型,具体如下:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName = "wide_order")
public class WideOrder extends BindFromBasedWide {
@org.springframework.data.annotation.Id
private Long id;
@BindFrom(sourceClass = Order.class, field = "userId")
private Long userId;
@BindFrom(sourceClass = Order.class, field = "addressId")
private Long addressId;
@BindFrom(sourceClass = Order.class, field = "productId")
private Long productId;
@BindFrom(sourceClass = Order.class, field = "descr")
private String orderDescr;
@BindFrom(sourceClass = User.class, field = "name")
private String userName;
@BindFrom(sourceClass = Address.class, field = "detail")
private String addressDetail;
@BindFrom(sourceClass = Product.class, field = "name")
private String productName;
@BindFrom(sourceClass = Product.class, field = "price")
private Integer productPrice;
public WideOrder(Long orderId){
setId(orderId);
}
@Override
public Long getId() {
return id;
}
@Override
public boolean isValidate() {
return userId != null && addressId != null && productId != null;
}
@Override
public List getItemsKeyByType(WideOrderType wideOrderType) {
switch (wideOrderType){
case ORDER:
return Collections.singletonList(new WideItemKey(wideOrderType, getId()));
case USER:
return Collections.singletonList(new WideItemKey(wideOrderType, getUserId()));
case ADDRESS:
return Collections.singletonList(new WideItemKey(wideOrderType, getAddressId()));
case PRODUCT:
return Collections.singletonList(new WideItemKey(wideOrderType, getProductId()));
}
return Collections.emptyList();
}
}
该模型有如下几个特点:
至此,模型就建立完毕。
有了模型后,我们需要构建一些组件用于为“宽表”提供数据,这就是 WideItemDataProvider 体系。
我们以 OrderProvider 为例,具体如下:
@Component
@org.springframework.core.annotation.Order(value = Ordered.HIGHEST_PRECEDENCE)
public class OrderProvider implements WideItemDataProvider {
@Autowired
private OrderDao orderDao;
@Override
public List apply(List key) {
return orderDao.findAllById(key);
}
@Override
public WideOrderType getSupportType() {
return WideOrderType.ORDER;
}
}
该类有如下特点:
每一类关联数据都会提供自己的数据提供器,简单看下 UserProvider 实现,具体如下:
@Component
public class UserProvider implements WideItemDataProvider {
@Autowired
private UserDao userDao;
@Override
public List apply(List key) {
return userDao.findAllById(key);
}
@Override
public WideOrderType getSupportType() {
return WideOrderType.USER;
}
}
和 OrderProvider 没有本质区别,当然,demo 中还提供了多种实现,如:
数据都准备好了,需要将 “宽表” 进行持久化,将其放入最合适的存储引擎,以便更好的处理查询请求。
基于 ElasticsearchRepository 的 WideOrderRepository 具体如下:
@Repository
public class WideOrderRepository implements WideCommandRepository {
@Autowired
private WideOrderESDao wideOrderDao;
@Override
public void save(List wides) {
wideOrderDao.saveAll(wides);
}
@Override
public List findByIds(List masterIds) {
return Lists.newArrayList(wideOrderDao.findAllById(masterIds));
}
@Override
public void consumeByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer) {
switch (wideOrderType){
case PRODUCT:
this.wideOrderDao.findByProductId((Long) key).forEach(wideConsumer);
case ADDRESS:
this.wideOrderDao.findByAddressId((Long) key).forEach(wideConsumer);
case ORDER:
this.wideOrderDao.findById((Long) key).ifPresent(wideConsumer);
case USER:
this.wideOrderDao.findByUserId((Long) key).forEach(wideConsumer);
}
}
@Override
public boolean supportUpdateFor(WideOrderType wideOrderType) {
return false;
}
@Override
public void updateByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer) {
Consumer updateAndSave = wideConsumer.andThen(wideOrder -> wideOrderDao.save(wideOrder));
switch (wideOrderType){
case PRODUCT:
this.wideOrderDao.findByProductId((Long) key).forEach(updateAndSave);
case ADDRESS:
this.wideOrderDao.findByAddressId((Long) key).forEach(updateAndSave);
case ORDER:
this.wideOrderDao.findById((Long) key).ifPresent(updateAndSave);
case USER:
this.wideOrderDao.findByUserId((Long) key).forEach(updateAndSave);
}
}
@Override
public void updateByItem(WideOrderType wideOrderType, KEY key, WideItemData item) {
}
}
仓库具有如下特征:
所依赖的 WideOrderESDao 基于 ElasticsearchRepository 实现,具体如下:
public interface WideOrderESDao extends ElasticsearchRepository {
List findByProductId(Long productId);
List findByAddressId(Long addressId);
List findByUserId(Long userId);
}
所有组件都已准备好,现在需要将他们整合在一起。
@Configuration
public class WideOrderConfiguration extends WideConfigurationSupport {
@Autowired
private WideOrderRepository wideOrderRepository;
@Autowired
private List>> wideItemDataProviders;
@Bean
public WideIndexService createWideIndexService(){
return super.createWideIndexService();
}
@Bean
public WideOrderPatrolService wideOrderPatrolService(){
return new WideOrderPatrolService(createWidePatrolService());
}
@Bean
protected WideService createWideService(
WideIndexService wideIndexService,
WideOrderPatrolService wideOrderPatrolService){
return super.createWideService(wideIndexService, wideOrderPatrolService);
}
@Override
protected WideFactory getWideFactory() {
return WideOrder::new;
}
@Override
protected WideCommandRepository getWideCommandRepository() {
return this.wideOrderRepository;
}
@Override
protected List>> getWideItemProviders() {
return this.wideItemDataProviders;
}
}
WideOrderConfiguration 具有如下特点:
其中自定义巡检 WideOrderPatrolService 代码如下:
public class WideOrderPatrolService implements WidePatrolService {
private final WidePatrolService widePatrolService;
public WideOrderPatrolService(WidePatrolService widePatrolService) {
this.widePatrolService = widePatrolService;
}
@Override
@DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "SingleIndex", consumerGroup = "order_patrol_group", delayLevel = 2)
public void index(Long aLong) {
this.widePatrolService.index(aLong);
}
@Override
public void index(List longs) {
WideOrderPatrolService wideOrderPatrolService = ((WideOrderPatrolService)AopContext.currentProxy());
longs.forEach(id -> wideOrderPatrolService.index(id));
}
@Override
public void updateItem(WideOrderType wideOrderType, KEY key) {
((WideOrderPatrolService)AopContext.currentProxy()).updateItem(wideOrderType, (Long) key);
}
@DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "UpdateByItem", consumerGroup = "order_patrol_group", delayLevel = 2)
public void updateItem(WideOrderType wideOrderType, Long id){
this.widePatrolService.updateItem(wideOrderType, id);
}
@Override
public void setReindexConsumer(Consumer> consumer) {
this.widePatrolService.setReindexConsumer(consumer);
}
}
WideOrderPatrolService 具体实现如下:
万事具备只欠东风,写个测试用例测试下功能。
首先,对数据进行索引,示例如下:
// 保存 User
this.user = new User();
this.user.setName("测试");
this.userDao.save(this.user);
// 保存 Address
this.address = new Address();
this.address.setDetail("详细地址");
this.address.setUserId(this.user.getId());
this.addressDao.save(this.address);
// 保存 Product
this.product = new Product();
this.product.setName("商品名称");
this.product.setPrice(100);
this.productDao.save(this.product);
// 保存 Order
this.order = new Order();
this.order.setUserId(this.user.getId());
this.order.setAddressId(this.address.getId());
this.order.setProductId(this.product.getId());
this.order.setDescr("我的订单");
this.orderDao.save(this.order);
// 进行索引
this.wideOrderService.index(this.order.getId());
// 比对数据
Optional optional = wideOrderDao.findById(this.order.getId());
Assertions.assertTrue(optional.isPresent());
WideOrder wideOrder = optional.get();
Assertions.assertEquals(order.getId(), wideOrder.getId());
Assertions.assertEquals(order.getAddressId(), wideOrder.getAddressId());
Assertions.assertEquals(order.getProductId(), wideOrder.getProductId());
Assertions.assertEquals(order.getUserId(), wideOrder.getUserId());
Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr());
Assertions.assertEquals(user.getName(), wideOrder.getUserName());
Assertions.assertEquals(address.getDetail(), wideOrder.getAddressDetail());
Assertions.assertEquals(product.getName(), wideOrder.getProductName());
Assertions.assertEquals(product.getPrice(), wideOrder.getProductPrice());
单测成功运行后,数据已经成功写入到 ES,具体如下:
image
更新操作,具体单测如下:
// 更新订单描述
this.order.setDescr("订单详情");
this.orderDao.save(this.order);
// 触发索引更新
this.wideOrderService.updateOrder(this.order.getId());
// 验证更新结果
Optional optional = wideOrderDao.findById(this.order.getId());
Assertions.assertTrue(optional.isPresent());
WideOrder wideOrder = optional.get();
Assertions.assertEquals(order.getId(), wideOrder.getId());
Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr());
单测成功运行后,数据已经完成更新,ES 数据具体如下:
image
仔细观察日志,会发现存在一组 Delay Task 日志,具体如下:
[ main] c.g.l.core.delay.DelayMethodInterceptor : success to sent Delay Task to RocketMQ for [126]
[MessageThread_2] c.g.l.c.w.s.SimpleWidePatrolService : id 126 is same
整体架构设计如下:
image
从功能角度,整体可分为如下几部分:
wide 为宽表提供了索引和巡检能力支持,但在实际业务中需要处理多种情况,常见如下:
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/Wide%20%E5%AE%BD%E8%A1%A8
页面更新:2024-02-22
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号