Write your own analysis

In the previous entry, you learnt how to write your own spout and router to ingest the data. In here, you are to learn how to write an analyser that will run algorithms that you want to test with your data. Depending on the type of algorithm that you want to implement, a set of these modules are to be defined: setup, analyse, returnResults, processResults and defineMaxSteps. Briefly;

setup – A module that is run only for the first superstep and generally implements initial conditions and states of the graph entities;
analyse – This will implement the main block of you algorithm;
returnResults – This returns the result of every partition and formats it for the final stage;
processresults – This is where all returned results of the partitions is processed to be displayed, stored or transferred;
defineMaxSteps – defines the maximum number of supersteps the algorithms iterates over.

Since Raphtory is vertex-centric, a good intuition to keep when building algorithms in Raphtory is to view the process from a vertex perspective.

Six Degrees of Gandalf

To continue with the previous example, we’re going to go over how to write an analyser for the LOTR data that will get the size of the six degrees of separation network for a character; in this case,Gandalf.

Pre-step

First, we need to create a property to store the state of separation and initialize it in setup.

override def setup(): Unit = {
    var sep_state = 0
    view.getVertices().foreach{vertex =>
      val name = vertex.getPropertyValue("name").getOrElse("")
      if (name == "Gandalf"){
        sep_state = SEP //user-defined parameter to determine degree of separation
      }else{
        sep_state = 0
      }
      vertex.setState("separation", sep_state)
      vertex.messageAllNeighbours(sep_state)
    }
}

view represents the graph that is currently in view; meaning the graph that incorporates all updates in the time period of the specified analysis window. Hence, if a character appears outside of this window, the function view.getVertices() will exclude it. Once a vertex state is initialized, it sends its state to its neighbours through messageAllNeighbours.

The bulk

As mentioned before, the analyse module implements the bulk of the algorithm. In here, the state of the source is viewed as a resource that is spread throughout the network and depletes every time it reaches a node until it vanishes (state = 0). The process starts by filtering the vertices that got messages through getMessagedVertices(). Every vertex then processes its messages vertex.messageQueue[Int] to get the state of its neighbours. Comparing that with its own state, it updates it and sends out the update if necessary.

override def analyse(): Unit = {
    view.getMessagedVertices().foreach { vertex =>
      val sep_state = vertex.messageQueue[Int].max -1
      if ((sep_state > 0) & (sep_state > vertex.getState[Int]("separation"))) {
        vertex.setState("separation", sep_state)
        vertex.messageAllNeighbours(sep_state)
      }
    }
  }

The above code runs until no more messages are sent (or the number of steps reaches defineMaxSteps); this means that all nodes in the six degrees of separation network have updated their states with how far they are from the source.

The Return of The King

Now that the algorithm has converged, we need to get the results back and process them if necessary. The following filters the results by only returning the vertices that had their states updated and hence are reachable in under a number of hops from the source Gandalf. It also groups the results by their separation degree and returns the size of each group.

override def returnResults(): Any =
    view.getVertices()
      .filter(vertex => vertex.getState[Int]("separation") > 0)
      .map(v => (v.ID(), v.getState[Int]("separation")))
      .groupBy(f => f._2)
      .map(f => (f._1, f._2.size))

It’s a good place to remember here that Raphtory is a distributed platform and runs the algorithms in parallel with multiple workers (see insert link to raphtory concept page here for more details). This means that the module returnResults only returns the results for the vertices that are stored in every partition/worker.

To put it all together, in the final module processResults the results of every partition are grouped together and some extra processing is performed if necessary. In this case, we group the results to get the size of the network as well as the number of characters that have directly interacted with Gandalf.

override def processResults(results: ArrayBuffer[Any], timestamp: Long, viewCompleteTime: Long): Unit = {
    val endResults = results.asInstanceOf[ArrayBuffer[immutable.ParHashMap[Int, Int]]]
    try {
      val grouped = endResults.flatten.groupBy(f => f._1).mapValues(x => x.map(_._2).sum)
      val direct = if (grouped.size>0) grouped(SEP-1) else 0
      val total = grouped.values.sum
      val text = s"""{"time":$timestamp,"total":${total},"direct":${direct},"viewTime":$viewCompleteTime}"""
      println(text)
    } catch {
      case e: UnsupportedOperationException => println("null")
    }
  }

Running this on the first 5000 sentences of the books, returns the following for a separation degree of 3;

{"time":5000,"total":24,"direct":9,"viewTime":316}

You might be asking HOW exactly do we run an analysis? That’s what the topic of the next entry is! Head on to How to deploy Raphtory locally to learn how to do that on your local machine.