Как да направите просто приложение с Akka Cluster

Ако сте прочели предишната ми история за Scalachain, вероятно сте забелязали, че тя далеч не е разпределена система. Липсват му всички функции за правилна работа с други възли. Добавете към него, че блокчейн, съставен от един възел, е безполезен. Поради тази причина реших, че е време да се работи по въпроса.

Тъй като Scalachain се захранва от Akka, защо не се възползвате от шанса да играете с Akka Cluster? Създадох прост проект, за да поработя малко с Akka Cluster и в тази история ще споделя моите знания. Ще създадем клъстер от три възли, като използваме Cluster Aware Routers, за да балансира товара между тях. Всичко ще се изпълнява в контейнер на Docker и ние ще използваме docker-compose за лесно внедряване.

Добре, да се търкаляме! ?

Бързо въведение в Akka Cluster

Akka Cluster предоставя голяма подкрепа за създаването на разпределени приложения. Най-добрият случай е когато имате възел, който искате да репликирате N пъти в разпределена среда. Това означава, че всички N възли са връстници с един и същ код. Akka Cluster ви предоставя незабавно откриването на членове в същия клъстер. Използвайки Cluster Aware Routers е възможно да се балансират съобщенията между участниците в различни възли. Също така е възможно да изберете политиката за балансиране, като направите балансиране на товара парче торта!

Всъщност можете да избирате между два вида рутери:

Групов маршрутизатор - Актьорите, на които да се изпращат съобщенията - наречени маршрути - се определят, използвайки техния път на актьора. Рутерите споделят маршрутите, създадени в клъстера. В този пример ще използваме групов рутер.

Pool Router - Маршрутите се създават и разгръщат от рутера, така че те са негови деца в йерархията на актьора. Рутерите не се споделят между рутери. Това е идеално за сценарий с първична реплика, където всеки рутер е основният, а маршрутите му - репликите.

Това е само върхът на айсберга, затова ви каня да прочетете официалната документация за повече информация.

Клъстер за математически изчисления

Нека си представим сценарий на случай на употреба. Да предположим да проектираме система за изпълнение на математически изчисления при поискване. Системата е внедрена онлайн, така че се нуждае от REST API, за да получава заявките за изчисление. Вътрешен процесор обработва тези заявки, изпълнява изчислението и връща резултата.

В момента процесорът може да изчисли само числото на Фибоначи. Решаваме да използваме клъстер от възли, за да разпределим товара между възлите и да подобрим производителността. Akka Cluster ще се справи с клъстерната динамика и балансирането на натоварването между възлите. ОК, звучи добре!

Йерархия на актьора

Първо, първо: трябва да дефинираме йерархията на актьорите. Системата може да бъде разделена на три функционални части: бизнес логика , управление на клъстера и самия възел . Има и сървър, но той не е актьор и ще работим по-късно.

Бизнес логика

Приложението трябва да прави математически изчисления. Можем да определим прост Processorактьор, който да управлява всички изчислителни задачи. Всяко изчисление, което поддържаме, може да бъде приложено в конкретен участник, който ще бъде дете на Processorтози. По този начин приложението е модулно и е по-лесно да се разширява и поддържа. В момента единственото дете на Processorще бъде ProcessorFibonacciактьорът. Предполагам, че можете да познаете каква е неговата задача. Това трябва да е достатъчно, за да започнете.

Управление на клъстери

За да управляваме клъстера, ни е необходим a ClusterManager. Звучи просто, нали? Този актьор се справя с всичко, свързано с клъстера, като връщане на членовете му, когато бъде попитан. Би било полезно да регистрираме какво се случва вътре в клъстера, затова дефинираме ClusterListenerактьор. Това е дъщерно на ClusterManagerи се абонира за клъстерни събития, които ги регистрират.

Възел

На Nodeактьора е коренът на нашата йерархия. Това е входната точка на нашата система, която комуникира с API. The Processorи the ClusterManagerса децата му, заедно с ProcessorRouterактьора. Това е балансиращото натоварване на системата, разпределяйки товара между Processors. Ще го конфигурираме като рутер с клъстер, така че всеки ProcessorRouterможе да изпраща съобщения до Processors на всеки възел.

Изпълнение на актьора

Време е да внедрим нашите актьори! Първо прилагаме действащите лица, свързани с бизнес логиката на системата. След това преминаваме към актьорите за управление на клъстера и Nodeнакрая основния актьор ( ).

Процесор Fibonacci

Този актьор изпълнява изчислението на числото на Фибоначи. Той получава Computeсъобщение, съдържащо номера за изчисляване и референцията на актьора, на който трябва да отговори. Препратката е важна, тъй като може да има различни искащи участници. Не забравяйте, че ние работим в разпределена среда!

След като Computeсъобщението е получено, fibonacciфункцията изчислява резултата. Увиваме го в ProcessorResponseобект, за да предоставим информация за възела, изпълнил изчислението. Това ще бъде полезно по-късно, за да се види действието на политиката за кръг.

След това резултатът се изпраща на актьора, на когото трябва да отговорим. Лесна работа.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Процесор

В Processorактьор успява специфичните под-процесори, като този на Фибоначи. Той трябва да създаде екземпляр на подпроцесорите и да препрати заявките към тях. В момента имаме само един подизпълнител, така че Processorполучава един вид съобщение: ComputeFibonacci. Това съобщение съдържа числото на Фибоначи за изчисляване. След като бъде получен, номерът за изчисляване се изпраща на a FibonacciProcessor, заедно с референцията на sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Бихме искали да регистрираме полезна информация за това, което се случва в клъстера. Това може да ни помогне да отстраним грешките на системата, ако има нужда. Това е целта на ClusterListenerактьора. Преди да започне, той се абонира за съобщенията за събития на клъстера. Реагира Актьорът към съобщенията обичат MemberUp, UnreachableMemberили MemberRemoved, като влезете в съответния случай. Когато ClusterListenerбъде спрян, той се отписва от клъстерните събития.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

Актьорът, отговорен за управлението на клъстера, е ClusterManager. Той създава ClusterListenerактьора и предоставя списък на членовете на клъстера при поискване. Може да се разшири, за да добави повече функционалности, но в момента това е достатъчно.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

Балансирането на натоварването между процесорите се обработва от ProcessorRouter. Създава се от Nodeактьора, но този път цялата необходима информация е предоставена в конфигурацията на системата.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Нека анализираме съответната част във application.confфайла.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

Първото нещо е да укажете пътя към рутера актьор, който е /node/processorRouter. Вътре в това свойство можем да конфигурираме поведението на рутера:

  • router: това е политиката за балансиране на натоварването на съобщенията. Избрах round-robin-group, но има много други.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?