个性化阅读
专注于IT技术分析

微服务通信:Redis的Spring集成教程

本文概述

微服务架构是设计和实现高度可扩展的Web应用程序中非常流行的方法。组件之间的整体应用程序中的通信通常基于同一过程中的方法或函数调用。另一方面, 基于微服务的应用程序是在多台计算机上运行的分布式系统。

这些微服务之间的通信对于拥有稳定且可扩展的系统很重要。有多种方法可以做到这一点。基于消息的通信是可靠地执行此操作的一种方法。

使用消息传递时, 组件通过异步交换消息相互交互。消息通过渠道交换。

便于服务A和服务B之间进行通信的消息传递系统的图形表示

当服务A要与服务B进行通信时, 不是直接发送它, 而是将它发送到特定的通道。当服务B要读取消息时, 它会从特定的消息通道中提取消息。

在本Spring Integration教程中, 你将学习如何使用Redis在Spring应用程序中实现消息传递。你将看到一个示例应用程序, 其中一个服务正在推送队列中的事件, 而另一服务正在逐个处理这些事件。

Spring Integration

Spring Integration项目扩展了Spring框架, 以支持基于Spring的应用程序之间或内部的消息传递。组件通过消息传递范例连接在一起。各个组件可能不知道应用程序中的其他组件。

Spring Integration提供了多种与外部系统进行通信的机制。通道适配器是一种用于单向集成(发送或接收)的机制。网关用于请求/答复方案(入站或出站)。

Apache Camel是广泛使用的替代方法。在现有的基于Spring的服务中, 通常首选Spring集成, 因为它是Spring生态系统的一部分。

Redis

Redis是一个非常快的内存数据存储。它也可以选择持久化到磁盘上。它支持不同的数据结构, 例如简单的键值对, 集合, 队列等。

将Redis用作队列可以使组件之间的数据共享和水平缩放变得更加容易。一个生产者或多个生产者可以将数据推送到队列, 而一个消费者或多个消费者可以拉出数据并处理事件。

多个使用者不能使用同一事件-这确保一个事件被处理一次。

该图显示了生产者/消费者的体系结构

使用Redis作为消息队列的好处:

  • 以非阻塞方式并行执行离散任务
  • 很棒的演出
  • 稳定性
  • 易于监控和调试
  • 易于实施和使用

规则:

  • 将任务添加到队列应该比处理任务本身更快。
  • 消费任务应该比生产任务快(如果没有, 则增加更多的消费者)。

Redis与Spring Integration

下面逐步创建一个示例应用程序, 以解释如何将Spring Integration与Redis结合使用。

假设你有一个允许用户发布帖子的应用程序。你想构建一个关注功能。另一个要求是, 每次有人发布帖子时, 都应该通过某种交流渠道(例如, 电子邮件或推送通知)通知所有关注者。

一种实现方法是在用户发布内容后向每个关注者发送电子邮件。但是, 当用户有1000个关注者时会发生什么?当有1000位用户在10秒内发布内容时, 每个人都有1000位关注者?另外, 发布者的帖子会等到所有电子邮件都发送完之后吗?

分布式系统解决了此问题。

通过使用队列可以解决此特定问题。负责发布帖子的服务A(生产者)将做到这一点。它将发布帖子并推送事件, 其中包含需要接收电子邮件的用户列表以及帖子本身。可以在服务B中获取用户列表, 但是为了简化本示例, 我们将从服务A发送用户列表。

这是一个异步操作。这意味着正在发布的服务将不必等待发送电子邮件。

服务B(使用者)将从队列中提取事件并进行处理。这样, 我们可以轻松地扩展我们的服务, 并且可以让n个消费者发送电子邮件(处理事件)。

因此, 让我们从生产者服务中的实现开始。必需的依赖项是:

<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.data</groupId>
   <artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-redis</artifactId>
</dependency>

这三个Maven依赖项是必需的:

  • Jedis是Redis的客户。
  • Spring Data Redis依赖性使在Java中使用Redis更加容易。它提供了熟悉的Spring概念, 例如用于核心API使用的模板类和轻量级存储库样式的数据访问。
  • Spring Integration Redis提供了Spring编程模型的扩展, 以支持著名的Enterprise Integration Patterns。

接下来, 我们需要配置Jedis客户端:

@Configuration
public class RedisConfig {

   @Value("${redis.host}")
   private String redisHost;

   @Value("${redis.port:6379}")
   private int redisPort;

   @Bean
   public JedisPoolConfig poolConfig() {
       JedisPoolConfig poolConfig = new JedisPoolConfig();
       poolConfig.setMaxTotal(128);
       return poolConfig;
   }

   @Bean
   public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) {
       final JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
       connectionFactory.setHostName(redisHost);
       connectionFactory.setPort(redisPort);
       connectionFactory.setPoolConfig(poolConfig);
       connectionFactory.setUsePool(true);
       return connectionFactory;
   }
}

注释@Value表示Spring将在应用程序属性中定义的值注入到字段中。这意味着应在应用程序属性中定义redis.host和redis.port值。

现在, 我们需要定义要发送到队列的消息。一个简单的示例消息如下所示:

@Getter
@Setter
@Builder
public class PostPublishedEvent {
   
   private String postUrl;
   private String postTitle;    
   private List<String> emails;

}

注意:Project Lombok(https://projectlombok.org/)提供了@ Getter, @ Setter, @ Builder和许多其他注释, 以避免使用getter, setter和其他琐碎的东西使代码混乱。你可以从此srcmini文章中了解更多信息。

消息本身将以JSON格式保存在队列中。每次将事件发布到队列时, 消息都会序列化为JSON。当从队列中消费时, 该消息将被反序列化。

定义完消息后, 我们需要定义队列本身。在Spring Integration中, 可以通过.xml配置轻松完成此操作。该配置应放在resources / WEB-INF目录中。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
      xsi:schemaLocation="http://www.springframework.org/schema/integration/redis
      http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
     http://www.springframework.org/schema/integration
     http://www.springframework.org/schema/integration/spring-integration.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans.xsd">

   <int-redis:queue-outbound-channel-adapter
           id="event-outbound-channel-adapter"
           channel="eventChannelJson"
           serializer="serializer"
           auto-startup="true" connection-factory="redisConnectionFactory"
           queue="my-event-queue" />

   <int:gateway id="eventChannelGateway"
                service-interface="org.srcmini.queue.RedisChannelGateway"
                error-channel="errorChannel" default-request-channel="eventChannel">
       <int:default-header name="topic" value="queue"/>
   </int:gateway>

   <int:channel id="eventChannelJson"/>
   <int:channel id="eventChannel"/>
   

   <bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>

   <int:object-to-json-transformer input-channel="eventChannel"
                                   output-channel="eventChannelJson"/>

</beans>

在配置中, 你可以看到” int-redis:queue-outbound-channel-adapter”部分。其属性是:

  • id:组件的bean名称。
  • channel:此终结点从其接收消息的MessageChannel。
  • connection-factory:对RedisConnectionFactory bean的引用。
  • queue:Redis列表的名称, 在该列表上执行基于队列的推送操作以发送Redis消息。此属性与队列表达式互斥。
  • queue-expression:一个SpEL表达式, 用于在运行时使用传入消息作为#root变量来确定Redis列表的名称。此属性与队列互斥。
  • serializer:RedisSerializer bean引用。默认情况下, 它是一个JdkSerializationRedisSerializer。但是, 对于String有效负载, 如果未提供序列化程序引用, 则使用StringRedisSerializer。
  • extract-payload:指定此端点是否应仅将负载发送到Redis队列或整个消息。其默认值为true。
  • left-push:指定此端点应使用向左推送(为true时)还是向右推送(为false时)将消息写入Redis列表。如果为true, 则与默认Redis队列入站通道适配器一起使用时, Redis列表将充当FIFO队列。设置为false可与从列表中读取并弹出左键的软件一起使用, 或实现类似堆栈的消息顺序。其默认值为true。

下一步是定义网关, 这在.xml配置中提到。对于网关, 我们使用org.srcmini.queue包中的RedisChannelGateway类。

StringRedisSerializer用于在Redis中保存之前对消息进行序列化。同样在.xml配置中, 我们定义了网关并将RedisChannelGateway设置为网关服务。这意味着可以将RedisChannelGateway Bean注入其他Bean中。我们定义了属性default-request-channel, 因为还可以通过使用@Gateway批注提供按方法的频道引用。类定义:

public interface RedisChannelGateway {
   void enqueue(PostPublishedEvent event);
}

要将配置连接到我们的应用程序中, 我们必须将其导入。这是在SpringIntegrationConfig类中实现的。

@ImportResource("classpath:WEB-INF/event-queue-config.xml")
@AutoConfigureAfter(RedisConfig.class)
@Configuration
public class SpringIntegrationConfig {
}

@ImportResource批注用于将Spring .xml配置文件导入@Configuration。 @AutoConfigureAfter注释用于提示应在其他指定的自动配置类之后应用自动配置。

现在, 我们将创建一个服务并实现将事件加入Redis队列的方法。

public interface QueueService {

   void enqueue(PostPublishedEvent event);
}
@Service
public class RedisQueueService implements QueueService {

   private RedisChannelGateway channelGateway;

   @Autowired
   public RedisQueueService(RedisChannelGateway channelGateway) {
       this.channelGateway = channelGateway;
   }

   @Override
   public void enqueue(PostPublishedEvent event) {
       channelGateway.enqueue(event);
   }
}

现在, 你可以使用QueueService中的enqueue方法轻松地将消息发送到队列。

Redis队列只是具有一个或多个生产者和消费者的列表。要将消息发布到队列, 生产者可以使用LPUSH Redis命令。并且, 如果你监视Redis(提示:输入redis-cli monitor), 则可以看到消息已添加到队列中:

"LPUSH" "my-event-queue" "{\"postUrl\":\"test\", \"postTitle\":\"test\", \"emails\":[\"test\"]}"

现在, 我们需要创建一个使用者应用程序, 它将从队列中提取这些事件并进行处理。消费者服务需要与生产者服务相同的依赖关系。

现在, 我们可以重用PostPublishedEvent类来反序列化消息。

我们需要创建队列配置, 同样, 它必须放置在resources / WEB-INF目录中。队列配置的内容是:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
      xsi:schemaLocation="http://www.springframework.org/schema/integration/redis
      http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
     http://www.springframework.org/schema/integration
     http://www.springframework.org/schema/integration/spring-integration.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans.xsd">

   <int-redis:queue-inbound-channel-adapter id="event-inbound-channel-adapter"
                                            channel="eventChannelJson" queue="my-event-queue"
                                            serializer="serializer" auto-startup="true"
                                            connection-factory="redisConnectionFactory"/>

   <int:channel id="eventChannelJson"/>

   <int:channel id="eventChannel">
       <int:queue/>
   </int:channel>

   <bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>

   <int:json-to-object-transformer input-channel="eventChannelJson"
                                   output-channel="eventChannel"
                                   type="com.srcmini.integration.spring.model.PostPublishedEvent"/>

   <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService"
                          method="process">
       <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/>
   </int:service-activator>

</beans>

在.xml配置中, int-redis:queue-inbound-channel-adapter可以具有以下属性:

  • id:组件的bean名称。
  • channel:我们从此端点向其发送消息的MessageChannel。
  • 自动启动:SmartLifecycle属性, 用于指定此端点是否应在应用程序上下文启动后自动启动。其默认值为true。
  • 阶段:SmartLifecycle属性, 用于指定将启动此端点的阶段。其默认值为0。
  • connection-factory:对RedisConnectionFactory bean的引用。
  • queue:Redis列表的名称, 在该列表上执行基于队列的弹出操作以获取Redis消息。
  • 错误通道:我们将从端点的侦听任务向其发送带有异常的ErrorMessages的MessageChannel。默认情况下, 基础MessagePublishingErrorHandler使用应用程序上下文中的默认errorChannel。
  • serializer:RedisSerializer Bean参考。它可以是一个空字符串, 表示没有序列化器。在这种情况下, 来自入站Redis消息的原始byte []将作为消息有效负载发送到通道。默认情况下, 它是一个JdkSerializationRedisSerializer。
  • receive-timeout:弹出操作等待队列中的Redis消息的超时时间(以毫秒为单位)。其默认值为1秒。
  • recovery-interval:以毫秒为单位的时间, 在pop操作发生异常之后, 侦听器任务应在重新启动侦听器任务之前进入休眠状态。
  • 期望消息:指定此端点是否希望Redis队列中的数据包含整个消息。如果将此属性设置为true, 则序列化器不能为空字符串, 因为消息需要某种形式的反序列化(默认情况下为JDK序列化)。其默认值为false。
  • task-executor:对Spring TaskExecutor(或标准JDK 1.5+ Executor)bean的引用。它用于基础侦听任务。默认情况下, 使用SimpleAsyncTaskExecutor。
  • right-pop:指定此端点应使用向右弹出(当为true时)还是向左弹出(当为false时)从Redis列表中读取消息。如果为true, 则与默认的Redis队列出站通道适配器一起使用时, Redis列表将充当FIFO队列。设置为false可与通过右推写入列表或实现类似堆栈的消息顺序的软件一起使用。其默认值为true。

重要的部分是”服务激活器”, 它定义了应使用哪种服务和方法来处理事件。”

此外, json-to-object-transformer需要一个type属性才能将JSON转换为对象, 在上面设置为type =” com.srcmini.integration.spring.model.PostPublishedEvent”。

同样, 要连接此配置, 我们将需要SpringIntegrationConfig类, 该类可以与之前相同。最后, 我们需要可以实际处理事件的服务。

public interface EventProcessingService {
   void process(PostPublishedEvent event);
}

@Service("RedisEventProcessingService")
public class RedisEventProcessingService implements EventProcessingService {

   @Override
   public void process(PostPublishedEvent event) {
       // TODO: Send emails here, retry strategy, etc :)
   }

}

运行应用程序后, 你可以在Redis中看到:

"BRPOP" "my-event-queue" "1"

总结

使用Spring Integration和Redis, 构建Spring微服务应用程序并不像往常那样令人生畏。只需少量配置和少量样板代码, 你就可以立即构建微服务架构的基础。

即使你不打算完全擦除当前的Spring项目, 也不打算在Redis的帮助下切换到新的体系结构, 通过队列获得巨大的性能改进也是非常简单的。

赞(0)
未经允许不得转载:srcmini » 微服务通信:Redis的Spring集成教程
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!