个性化阅读
专注于IT技术分析

简化软件集成:Apache Camel教程

本文概述

软件很少(如果有的话)存在于信息真空中。至少, 这是我们的软件工程师可以为我们开发的大多数应用程序做出的假设。

无论出于何种原因, 每一种软件都以某种方式与某种其他软件进行通信, 其原因多种多样:从某处获取参考数据, 发送监视信号, 与其他服务保持联系, 同时又是分布式软件的一部分。系统等等。

在本教程中, 你将学习集成大型软件的最大挑战是什么, 以及Apache Camel如何轻松解决它们。

问题:系统集成的体系结构设计

你可能在软件工程生命中至少完成了以下一次:

  • 确定应该启动数据发送的业务逻辑片段。
  • 在同一应用程序层中, 根据接收者的期望编写数据转换。
  • 将数据包装为适合通过网络传输和路由的结构。
  • 使用适当的驱动程序或客户端SDK打开与目标应用程序的连接。
  • 发送数据并处理响应。

为什么这是一个坏举动?

尽管你只有很少的此类连接, 但它仍可管理。随着系统之间关系的不断增加, 应用程序的业务逻辑与集成逻辑混合在一起, 集成逻辑涉及适应数据, 补偿两个系统之间的技术差异以及使用SOAP, REST或更多奇特的请求将数据传输到外部系统。

如果要集成多个应用程序, 将很难在这样的代码中回溯整个依赖关系:数据在哪里产生, 什么服务在使用它?你将在很多地方复制集成逻辑以进行引导。

通过这种方法, 尽管任务在技术上已完成, 但最终导致集成的可维护性和可伸缩性出现了巨大问题。在这个系统中快速重组数据流几乎是不可能的, 更不用说更深层次的问题了, 例如缺乏监控, 电路中断, 费力的数据恢复等。

当在相当大的企业范围内集成软件时, 这尤其重要。处理企业集成意味着要使用一组应用程序, 这些应用程序可以在广泛的平台上运行并且存在于不同的位置。在这样的软件环境中, 数据交换是非常苛刻的。它必须符合行业的高安全性标准, 并提供可靠的数据传输方式。在企业环境中, 系统集成需要单独的, 详尽阐述的体系结构设计。

本文将向你介绍软件集成面临的独特困难, 并为集成任务提供一些经验驱动的解决方案。我们将熟悉Apache Camel, 这是一个有用的框架, 可以减轻集成开发人员最头痛的事情。我们将以一个示例为例, 说明Camel如何帮助在由Kubernetes支持的微服务集群中建立通信。

整合困难

解决此问题的一种广泛使用的方法是在应用程序中分离集成层。它可以存在于同一应用程序中, 也可以作为独立运行的专用软件(在后一种情况下称为中间件)存在。

在开发和支持中间件时, 你通常会遇到什么问题?通常, 你具有以下关键项目:

  • 所有数据通道在某种程度上都不可靠。当数据强度从低到中等时, 可能不会发生由这种不可靠性引起的问题。从应用程序内存到其下层的较低级缓存和设备的每个存储级别都可能发生故障。一些罕见的错误仅在海量数据中出现。甚至成熟的可立即投入生产的供应商产品也都解决了与数据丢失有关的错误跟踪器问题。中间件系统应该能够将这些数据伤亡情况通知你, 并及时提供重新发送消息的信息。
  • 应用程序使用不同的协议和数据格式。这意味着集成系统是数据转换和其他参与者的适配器的帷幕, 并且利用了多种技术。这些可以包括普通的REST API调用, 但也可以访问队列代理, 通过FTP发送CSV订单或将数据批量提取到数据库表中。这是一个很长的清单, 而且永远不会缩短。
  • 数据格式和路由规则的改变是不可避免的。应用程序开发过程中的每个步骤都会更改数据结构, 通常会导致集成数据格式和转换发生变化。有时, 必须通过重组的企业数据流来更改基础架构。例如, 当引入参考数据的单点验证时, 这些变化可能会发生, 该参考数据必须处理整个公司的所有主数据条目。对于N个系统, 我们最终可能会在它们之间最多有近N ^ 2个连接, 因此必须应用更改的地方数量增长很快。就像雪崩一样。为了维持可维护性, 中间件层必须通过通用的路由和数据转换提供清晰的依赖关系图。

在设计集成和选择最合适的中间件解决方案时, 应牢记这些想法。解决该问题的一种可能方法是利用企业服务总线(ESB)。但是, 主要供应商提供的ESB通常太重, 通常比他们值得的麻烦更多:几乎不可能快速开始使用ESB, 学习曲线非常陡峭, 其灵活性被牺牲了很多。功能和内置工具。我认为, 轻量级开源集成解决方案要优越得多, 它们更具弹性, 易于部署到云中并且易于扩展。

软件集成并不容易。如今, 当我们构建微服务架构并处理大量小服务时, 我们也对微服务的通信效率寄予厚望。

企业整合模式

可以预期, 就像一般的软件开发一样, 数据路由和转换的开发涉及重复的操作。处理集成问题相当长时间的专业人员已经总结并系统化了该领域的经验。结果, 有一组提取的模板, 称为企业集成模式, 用于设计数据流。这些集成方法在Gregor Hophe和Bobby Wolfe的同名书中进行了描述, 这与《 Gang of Four》一书很相似, 但在胶合软件领域非常相似。

举个例子, 规范化模式引入了一个组件, 该组件将具有不同数据格式的语义相等的消息映射到单个规范模型, 或者聚合器是将消息序列组合为一个的EIP。

由于EIP是用于解决架构问题的已建立的, 与技术无关的抽象概念, 因此EIP有助于编写架构设计, 该设计不会深入到代码级别, 但会足够详细地描述数据流。这种用于描述集成路线的符号不仅使设计简洁明了, 而且设置了通用的命名法和通用的语言, 这对于与来自各个业务领域的团队成员解决集成任务非常重要。

介绍Apache Camel

几年前, 我在庞大的杂货零售网络中建立了企业集成, 并且在广泛分布的地点设有商店。我从专有的ESB解决方案开始, 事实证明维护起来非常麻烦。然后, 我们的团队遇到了Apache Camel, 在完成了一些”概念验证”工作之后, 我们迅速重写了Camel路由中的所有数据流。

Apache Camel可以被描述为”中介路由器”, 这是一个实现EIP列表的面向消息的中间件框架, 我对此很熟悉。它利用这些模式, 支持所有常见的传输协议, 并包含大量有用的适配器。 Camel可以处理许多集成例程, 而无需编写你自己的代码。

除此之外, 我还将挑选出以下Apache Camel功能:

  • 集成路由写为由块组成的管道。它创建了一个完全透明的图片, 以帮助跟踪数据流。
  • Camel具有许多流行API的适配器。例如, 从Apache Kafka获取数据, 监视AWS EC2实例, 与Salesforce集成—所有这些任务都可以使用现成的可用组件来解决。

Apache Camel路由可以用Java或Scala DSL编写。 (XML配置也是可用的, 但变得太冗长, 调试功能也更差。)它并没有对通信服务的技术堆栈施加限制, 但是如果你使用Java或Scala编写, 则可以将Camel嵌入到应用程序中独立运行。

骆驼使用的路由符号可以用下面的简单伪代码描述:

from(Source)
   .transform(Transformer)
   .to(Destination)

Source, Transformer和Destination是通过URI引用实现组件的端点。

是什么让Camel解决了我之前描述的集成问题?我们来看一下。首先, 路由和转换逻辑现在仅存在于专用的Apache Camel配置中。其次, 通过简洁自然的DSL结合EIP的使用, 出现了系统之间的依存关系图。它由可理解的抽象构成, 并且路由逻辑易于调整。最后, 我们不必编写大量的转换代码, 因为可能已经包含了适当的适配器。

整合方式

我应该补充一点, Apache Camel是一个成熟的框架, 并且会定期进行更新。它拥有一个强大的社区和相当多的累积知识库。

它确实有其自身的缺点。骆驼不应被视为复杂的集成套件。它是一个没有高级功能(例如业务流程管理工具或活动监视器)的工具箱, 但可用于创建此类软件。

替代系统可能是, 例如, Spring Integration或Mule ESB。对于Spring Integration, 尽管它被认为是轻量级的, 但根据我的经验, 将其放在一起并编写许多XML配置文件可能会出乎意料地复杂, 而且也不容易。 Mule ESB是一个功能强大且功能强大的工具集, 但顾名思义, 它是企业服务总线, 因此属于不同的重量类别。可以将Mule与Fuse ESB进行比较, Fuse ESB是基于Apache Camel的具有丰富功能的类似产品。对我而言, 如今使用Apache Camel进行胶合服务已变得轻而易举。它易于使用, 并且可以清晰地描述行进路线, 同时, 它的功能足以构建复杂的集成。

编写示例路线

让我们开始编写代码。我们将从同步数据流开始, 该数据流将邮件从单一来源路由到收件人列表。路由规则将用Java DSL编写。

我们将使用Maven构建项目。首先将以下依赖项添加到pom.xml中:

<dependencies>
  ...
  <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version>2.20.0</version>
  </dependency>
</dependencies>

或者, 可以在camel-archetype-java原型的顶部构建应用程序。

骆驼路线定义在RouteBuilder.configure方法中声明。

public void configure() {

    errorHandler(defaultErrorHandler().maximumRedeliveries(0));

    from("file:orders?noop=true").routeId("main")
        .log("Incoming File: ${file:onlyname}")
        .unmarshal().json(JsonLibrary.Jackson, Order.class)     // unmarshal JSON to Order class containing List<OrderItem>
        .split().simple("body.items")   // split list to process one by one
        .to("log:inputOrderItem")
    .choice()
        .when().simple("${body.type} == 'Drink'")
            .to("direct:bar")
        .when().simple("${body.type} == 'Dessert'")
            .to("direct:dessertStation")
        .when().simple("${body.type} == 'Hot Meal'")
            .to("direct:hotMealStation")
        .when().simple("${body.type} == 'Cold Meal'")
            .to("direct:coldMealStation")
        .otherwise()
            .to("direct:others");

    from("direct:bar").routeId("bar").log("Handling Drink");
    from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert");
    from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal");
    from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal");
    from("direct:others").routeId("others").log("Handling Something Other");
}

在此定义中, 我们创建了一条路由, 该路由从JSON文件中获取记录, 将其拆分为多个项目, 然后根据消息内容路由至一组处理程序。

让我们在准备好的测试数据上运行它。我们将得到输出:

INFO | Total 6 routes, of which 6 are started
INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds
INFO | Incoming File: order1.json
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='1', type='Drink', name='Americano', qty='1'}]
INFO | Handling Drink
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='2', type='Hot Meal', name='French Omelette', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='3', type='Hot Meal', name='Lasagna', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='4', type='Hot Meal', name='Rice Balls', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='5', type='Dessert', name='Blueberry Pie', qty='1'}]
INFO | Handling Dessert

不出所料, 骆驼将邮件路由到目的地。

数据传输选择

在上面的示例中, 组件之间的交互是同步的, 并通过应用程序内存执行。但是, 当我们处理不共享内存的单独应用程序时, 还有许多通信方式:

  • 文件交换。一个应用程序生成共享数据文件, 供另一个应用程序使用。这是守旧派精神的住所。这种通信方法具有很多后果:缺乏事务和一致性, 性能差以及系统之间的孤立协调。许多开发人员最终编写了自制的集成解决方案, 以使该过程或多或少易于管理。
  • 通用数据库。让应用程序将它们希望共享的数据存储在单个数据库的通用模式中。设计统一模式并处理对表的并发访问是此方法的最大挑战。与文件交换一样, 这很容易成为永久性的瓶颈。
  • 远程API调用。提供一个接口, 使一个应用程序可以与另一个正在运行的应用程序进行交互, 例如典型的方法调用。应用程序通过API调用共享功能, 但是在过程中将它们紧密耦合在一起。
  • 消息传递。使每个应用程序连接到通用消息系统, 并使用消息异步交换数据和调用行为。发送者和接收者都不必同时启动并运行才能传递消息。

有更多的交互方式, 但是应该记住, 从广义上讲, 交互有两种类型:同步和异步。第一个就像在代码中调用一个函数一样, 执行流程将一直等到它执行并返回一个值。使用异步方法, 可以通过中间消息队列或订阅主题发送相同的数据。异步远程功能调用可以实现为请求-应答EIP。

但是, 异步消息传递并不是万能的。它涉及某些限制。你很少在网络上看到消息传递API。同步REST服务更受欢迎。但是, 消息传递中间件广泛用于企业内部网或分布式系统后端基础结构中。

使用消息队列

让我们将示例异步化。管理队列和订阅主题的软件系统称为消息代理。就像表格和列的RDBMS。队列用作点对点集成, 而主题用于与许多收件人进行发布-订阅通信。我们将Apache ActiveMQ用作可靠且可嵌入的JMS消息代理。

添加以下依赖项。有时向项目中添加包含所有ActiveMQ jar的activemq-all过多, 但是我们将保持应用程序的依赖关系不复杂。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

然后以编程方式启动代理。在Spring Boot中, 我们通过插入spring-boot-starter-activemq Maven依赖项来对此进行自动配置。

使用以下命令运行新的消息代理, 仅指定连接器的端点:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616"); 
broker.start();

并将以下配置代码段添加到configure方法主体:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

现在, 我们可以使用消息队列更新前面的示例。队列将在邮件传递时自动创建。

public void configure() {

    errorHandler(defaultErrorHandler().maximumRedeliveries(0));
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

    from("file:orders?noop=true").routeId("main")
            .log("Incoming File: ${file:onlyname}")
            .unmarshal().json(JsonLibrary.Jackson, Order.class)     // unmarshal JSON to Order class containing List<OrderItem>
            .split().simple("body.items")   // split list to process one by one
            .to("log:inputOrderItem")
            .choice()
        .when().simple("${body.type} == 'Drink'")
            .to("activemq:queue:bar")
        .when().simple("${body.type} == 'Dessert'")
            .to("activemq:queue:dessertStation")
        .when().simple("${body.type} == 'Hot Meal'")
            .to("activemq:queue:hotMealStation")
        .when().simple("${body.type} == 'Cold Meal'")
            .to("activemq:queue:coldMealStation")
        .otherwise()
            .to("activemq:queue:others");

    from("activemq:queue:bar").routeId("barAsync").log("Drinks");
    from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert");
    from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals");
    from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals");
    from("activemq:queue:others").routeId("othersAsync").log("Others");
}

好的, 现在交互已经变得异步了。这些数据的潜在使用者可以随时使用。这是松耦合的示例, 我们尝试在反应式体系结构中实现这种松耦合。其中一项服务的不可用不会阻止其他服务。而且, 消费者可以并行缩放并从队列中读取。队列本身可以扩展和分区。持久队列可以将数据存储在磁盘上, 等待所有参与者都掉下来时等待处理。因此, 该系统具有更高的容错能力。

令人惊讶的事实是CERN使用Apache Camel和ActiveMQ来监视大型强子对撞机(LHC)的系统。还有一篇有趣的硕士论文, 解释了为此任务选择合适的中间件解决方案。因此, 正如他们在主题演讲中所说的那样:”没有JMS, 就没有粒子物理学!”

监控方式

在前面的示例中, 我们在两个服务之间创建了数据通道。这是架构中另一个潜在的故障点, 因此我们必须加以照顾。让我们看一下Apache Camel提供哪些监视功能。基本上, 它通过JMX公开有关通过MBean的路由的统计信息。 ActiveMQ以相同的方式公开队列统计信息。

让我们在应用程序中打开JMX服务器, 以使其能够通过命令行选项运行:

-Dorg.apache.camel.jmx.createRmiConnector=true
-Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel
-Dorg.apache.camel.jmx.rmiConnector.registryPort=1099
-Dorg.apache.camel.jmx.serviceUrlPath=camel

现在运行应用程序, 以使路由完成其工作。打开标准的jconsole工具并连接到应用程序过程。连接到URL服务:jmx:rmi:/// jndi / rmi:// localhost:1099 / camel。转到MBeans树中的org.apache.camel域。

屏幕截图1

我们可以看到有关路由的所有内容都处于受控状态。我们在队列中有运行中消息的数量, 错误计数和消息计数。该信息可以通过管道传输到具有丰富功能的某些监视工具集, 例如Graphana或Kibana。你可以通过实现众所周知的ELK堆栈来实现。

还有一个可插拔和可扩展的网络控制台, 该控制台提供用于管理骆驼, ActiveMQ等的界面, 称为hawt.io。

屏幕截图2

测试路线

Apache Camel具有使用模拟组件编写测试路由的相当广泛的功能。它是一个功能强大的工具, 但是为测试而编写单独的路由是一个耗时的过程。在生产路线上运行测试而不修改其管道会更有效。 Camel具有此功能, 可以使用AdviceWith组件来实现。

让我们在示例中启用测试逻辑并运行示例测试。

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-test</artifactId>
    <version>2.20.0</version>
    <scope>test</scope>
</dependency>

测试类是:

public class AsyncRouteTest extends CamelTestSupport {

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new AsyncRouteBuilder();
    }

    @Before
    public void mockEndpoints() throws Exception {
        context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() {
            @Override
            public void configure() throws Exception {
                // we substitute all actual queues with mock endpoints
                mockEndpointsAndSkip("activemq:queue:bar");
                mockEndpointsAndSkip("activemq:queue:dessertStation");
                mockEndpointsAndSkip("activemq:queue:hotMealStation");
                mockEndpointsAndSkip("activemq:queue:coldMealStation");
                mockEndpointsAndSkip("activemq:queue:others");
                // and replace the route's source with test endpoint
                replaceFromWith("file://testInbox");
            }
        });
    }

    @Test
    public void testSyncInteraction() throws InterruptedException {
        String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}";
        // get mocked endpoint and set an expectation
        MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation");
        mockEndpoint.expectedMessageCount(3);
        // simulate putting file in the inbox folder
        template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json");
        //checks that expectations were met
        assertMockEndpointsSatisfied();
    }

}

现在使用mvn test对应用程序运行测试。我们可以看到我们的路线已成功通过测试建议执行。没有消息通过实际队列传递, 并且测试已通过。

INFO | Route: main started and consuming from: file://testInbox
<...>
INFO | Incoming File: test.json
<...>
 INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

将Apache Camel与Kubernetes集群一起使用

当今的集成问题之一是应用程序不再是静态的。在云基础架构中, 我们处理同时在多个节点上运行的虚拟服务。它通过微小型轻量级服务之间的交互来支持微服务体系结构。这些服务的寿命不可靠, 我们必须动态地发现它们。

将云服务融合在一起是可以使用Apache Camel解决的任务。尤其有趣的是, 它具有EIP风格, 并且Camel具有大量适配器并支持多种协议。最新的2.18版本增加了ServiceCall组件, 该组件引入了调用API并通过集群发现机制解析其地址的功能。当前, 它支持Consul, Kubernetes, Ribbon等。可以轻松找到一些使用Consul配置ServiceCall的代码示例。我们将在这里使用Kubernetes, 因为它是我最喜欢的集群解决方案。

集成方案如下所示:

架构图

Order服务和Inventory服务将是几个简单的Spring Boot应用程序, 它们返回静态数据。我们在这里不局限于特定的技术堆栈。这些服务正在产生我们要处理的数据。

订单服务控制器:

@RestController
public class OrderController {

    private final OrderStorage orderStorage;

    @Autowired
    public OrderController(OrderStorage orderStorage) {
        this.orderStorage = orderStorage;
    }

    @RequestMapping("/info")
    public String info() {
        return "Order Service UUID = " + OrderApplication.serviceID;
    }

    @RequestMapping("/orders")
    public List<Order> getAll() {
        return orderStorage.getAll();
    }

    @RequestMapping("/orders/{id}")
    public Order getOne(@PathVariable Integer id) {
        return orderStorage.getOne(id);
    }
}

它产生以下格式的数据:

[{"id":1, "items":[2, 3, 4]}, {"id":2, "items":[5, 3]}]

库存服务控制器与订购服务的绝对相似:

@RestController
public class InventoryController {

    private final InventoryStorage inventoryStorage;

    @Autowired
    public InventoryController(InventoryStorage inventoryStorage) {
        this.inventoryStorage = inventoryStorage;
    }

    @RequestMapping("/info")
    public String info() {
        return "Inventory Service UUID = " + InventoryApplication.serviceID;
    }

    @RequestMapping("/items")
    public List<InventoryItem> getAll() {
        return inventoryStorage.getAll();
    }

    @RequestMapping("/items/{id}")
    public InventoryItem getOne(@PathVariable Integer id) {
        return inventoryStorage.getOne(id);
    }

}

InventoryStorage是用于存储数据的通用存储库。在此示例中, 它返回静态的预定义对象, 这些对象将编组为以下格式。

[{"id":1, "name":"Laptop", "description":"Up to 12-hours battery life", "price":499.9}, {"id":2, "name":"Monitor", "description":"27-inch, response time: 7ms", "price":200.0}, {"id":3, "name":"Headphones", "description":"Soft leather ear-cups", "price":29.9}, {"id":4, "name":"Mouse", "description":"Designed for comfort and portability", "price":19.0}, {"id":5, "name":"Keyboard", "description":"Layout: US", "price":10.5}]

让我们写一个连接它们的网关路由, 但在此步骤中没有ServiceCall:

rest("/orders")
        .get("/").description("Get all orders with details").outType(TestResponse.class)
        .route()
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .to("http4://localhost:8082/orders?bridgeEndpoint=true")
        .unmarshal(formatOrder)
        .enrich("direct:enrichFromInventory", new OrderAggregationStrategy())
        .to("log:result")
        .endRest();

from("direct:enrichFromInventory")
        .transform().simple("${null}")
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .to("http4://localhost:8081/items?bridgeEndpoint=true")
        .unmarshal(formatInventory);

现在想象一下, 每个服务不再是特定的实例, 而是作为一个实例运行的实例云。我们将使用Minikube在本地尝试Kubernetes集群。

配置网络路由以在本地查看Kubernetes节点(给定的示例适用于Mac / Linux环境):

# remove existing routes
sudo route -n delete 10/24 > /dev/null 2>&1
# add routes
sudo route -n add 10.0.0.0/24 $(minikube ip)  
# 172.17.0.0/16 ip range is used by docker in minikube                                                                                                        
sudo route -n add 172.17.0.0/16 $(minikube ip)                                                                                                            
ifconfig 'bridge100' | grep member | awk '{print $2}’ 
# use interface name from the output of the previous command 
# needed for xhyve driver, which I'm using for testing
sudo ifconfig bridge100 -hostfilter en5

使用以下Dockerfile配置将服务包装在Docker容器中:

FROM openjdk:8-jdk-alpine
VOLUME /tmp
ADD target/order-srv-1.0-SNAPSHOT.jar app.jar
ADD target/lib lib
ENV JAVA_OPTS=""
ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar

构建服务映像并将其推送到Docker注册表。现在运行本地Kubernetes集群中的节点。

Kubernetes.yaml部署配置:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: inventory
spec:
  replicas: 3
  selector:
    matchLabels:
      app: inventory
  template:
    metadata:
      labels:
        app: inventory
    spec:
      containers:
      - name: inventory
        image: inventory-srv:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 8081

将这些部署作为服务公开在集群中:

kubectl expose deployment order-srv --type=NodePort
kubectl expose deployment inventory-srv --type=NodePort

现在我们可以检查请求是否由集群中随机选择的节点处理。依次多次运行curl -X http://192.168.99.100:30517/info以访问minikube NodePort以获得公开服务(使用主机和端口)。在输出中, 我们看到已经实现了请求平衡。

Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5
Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda
Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5
Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

在项目的pom.xml中添加camel-kubernetes和camel-netty4-http依赖项。然后将ServiceCall组件配置为使用Kubernetes主节点发现, 该发现为路由定义之间的所有服务调用共享:

KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration();
kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443");
kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt");
kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key");
kubernetesConfiguration.setNamespace("default");

ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition();
config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration));
context.setServiceCallConfiguration(config);

ServiceCall EIP很好地补充了Spring Boot。大多数选项可以直接在application.properties文件中进行配置。

使用ServiceCall组件为Camel路由赋权:

rest("/orders")
        .get("/").description("Get all orders with details").outType(TestResponse.class)
        .route()
        .hystrix()
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .serviceCall("customer-srv", "http4:customer-deployment?bridgeEndpoint=true")
        .unmarshal(formatOrder)
        .enrich("direct:enrichFromInventory", new OrderAggregationStrategy())
        .to("log:result")
        .endRest();

from("direct:enrichFromInventory")
        .transform().simple("${null}")
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .serviceCall("order-srv", "http4:order-srv?bridgeEndpoint=true")
        .unmarshal(formatInventory);

我们还在该路线中激活了断路器。这是一个集成挂钩, 可以在发生发送错误或收件人不可用时暂停远程系统调用。这样做是为了避免级联系统故障。 Hystrix组件通过实现断路器模式来帮助实现这一目标。

让我们运行它并发送测试请求;我们将从两种服务中汇总响应。

[{"id":1, "items":[{"id":2, "name":"Monitor", "description":"27-inch, response time: 7ms", "price":200.0}, {"id":3, "name":"Headphones", "description":"Soft leather ear-cups", "price":29.9}, {"id":4, "name":"Mouse", "description":"Designed for comfort and portability", "price":19.0}]}, {"id":2, "items":[{"id":5, "name":"Keyboard", "description":"Layout: US", "price":10.5}, {"id":3, "name":"Headphones", "description":"Soft leather ear-cups", "price":29.9}]}]

结果是预期的。

其他用例

我展示了Apache Camel如何将微服务集成到集群中。该框架还有哪些其他用途?通常, 它在可能基于规则的路由解决方案的任何地方都非常有用。例如, Apache Camel可以通过Eclipse Kura适配器成为物联网的中间件。它可以通过传送来自各种组件和服务的日志信号来处理监视, 例如在CERN系统中。它也可以是企业SOA的集成框架, 也可以是批处理数据的管道, 尽管它在该领域与Apache Spark竞争不佳。

总结

你会发现系统集成不是一个容易的过程。我们很幸运, 因为已经积累了很多经验。正确应用它以构建灵活且容错的解决方案很重要。

为了确保正确应用, 我建议准备一份重要集成方面的清单。必备物品包括:

  • 是否有单独的集成层?
  • 是否有集成测试?
  • 我们知道预期的峰值数据强度吗?
  • 我们知道预期的数据交付时间吗?
  • 消息关联重要吗?如果序列中断怎么办?
  • 我们应该以同步还是异步的方式来做?
  • 格式和路由规则在哪里更频繁地更改?
  • 我们是否有办法监控流程?

在本文中, 我们尝试了Apache Camel, 这是一个轻量级的集成框架, 可帮助你节省解决集成问题的时间和精力。正如我们所展示的, 它可以充当工具, 通过全面负责微服务之间的数据交换来支持相关的微服务架构。

如果你有兴趣了解有关Apache Camel的更多信息, 我强烈建议该框架的创建者Claus Ibsen写一本书” Camel in Action”。官方文档位于camel.apache.org。

赞(0)
未经允许不得转载:srcmini » 简化软件集成:Apache Camel教程

评论 抢沙发

评论前必须登录!