Background

Introduction: Aggregate Programming

Aggregate Programming is a paradigm for the development of collective adaptive systems (CAS). It provides a compositional, functional programming model for expressing the self-organising behaviour of a CAS by a global perspective. Aggregate Computing (AC) is formally grounded in the field calculus (FC), a minimal core language that captures the key mechanisms for bridging local and global behaviour. FC is based on the notion of a (computational) field, a (possibly dynamic) map from a (possibly dynamic) domain of devices to computational values.

Aggregate Computing is based on a logical model that can be mapped diversely onto physical infrastructure.

  • From a structural point of view, an aggregate system is merely a graph or network of devices (also called: nodes, agents, robots). The edges connecting nodes represent logical communication channels that are set up by the aggregate computing platform according to an application-specific neighbouring relationship (which, for situated system, is typically a communication range).
  • From a behavioural point of view, any device continuously interprets the aggregate program against its local context.
  • From an interactional point of view, any device continuously interacts with its neighbours to acquire and propagate context. This is what enables local activity to influence global activity and viceversa.

Execution model

In practice, devices sustain the aggregate computation through asynchronous rounds which conceptually consist of the following steps:

  1. Context update: the device retrieves previous state, environment data (through sensors), and messages from neighbours.
  2. Aggregate program execution: the field computation is executed against the local context; this yields an output.
  3. Export broadcasting to neighbours: from the output, a subset of data (called an export) for neighbour coordination can be automatically derived; the export has to be broadcast to the entire neighbourhood.
  4. Execute actuators: the output of the program can describe a set of actuations to be performed on the environment.

A code example of round execution in ScaFi is shown in Building Aggregate Systems.

ScaFi User Manual

Hello, ScaFi

As another example, consider the following steps.

Step 1: Add the dependency to scafi in your project (e.g., via sbt)

Step 1-A: SBT

val scafi_core  = "it.unibo.scafi" %% "scafi-core"  % "0.3.3"
val scafi_simulator_gui  = "it.unibo.scafi" %% "scafi-simulator-gui"  % "0.3.3"

libraryDependencies ++= Seq(scafi_core, scafi_simulator_gui)

Step 1-B: GRADLE (build.gradle.kts)

plugins {
    java
    scala
}

dependencies {
    implementation("org.scala-lang:scala-library:2.12.2")
    implementation("it.unibo.scafi:scafi-core_2.12:0.3.3")
    implementation("it.unibo.scafi:scafi-simulator-gui_2.12:0.3.3")
    // Note: before ScaFi 0.3.3, the group ID was 'it.unibo.apice.scafiteam'
}

// the following may be needed when running using Java 11
tasks.withType<ScalaCompile> {
    sourceCompatibility = "1.8"
    targetCompatibility = "1.8"
}

Step 2: Use the API (e.g., to set up a simple simulation)

Step 2-1: Import or define an incarnation (a family of types), from which you can import types like AggregateProgram

package experiments

// Method #1: Use an incarnation which is already defined
import it.unibo.scafi.incarnations.BasicSimulationIncarnation.AggregateProgram

// Method #2: Define a custom incarnation and import stuff from it
object MyIncarnation extends it.unibo.scafi.incarnations.BasicAbstractIncarnation
import MyIncarnation._

Step 2-2: Define an AggregateProgram which expresses the global behaviour of an ensemble.

// An "aggregate program" can be seen as a function from a Context to an Export
// The Context is the input for a local computation: includes state
//  from previous computations, sensor data, and exports from neighbours.
// The export is a tree-like data structure that contains all the information needed
//  for coordinating with neighbours. It also contains the output of the computation.
object MyAggregateProgram extends AggregateProgram {
  // Main program expression driving the ensemble
  // This is run in a loop for each agent
  // According to this expression, coordination messages are automatically generated
  // The platform/middleware/simulator is responsible for coordination
  override def main() = gradient(isSource)

  // The gradient is the (self-adaptive) field of the minimum distances from source nodes
  // `rep` is the construct for state transformation (remember the round-by-round loop behaviour)
  // `mux` is a purely functional multiplexer (selects the first or second branch according to condition)
  // `foldhoodPlus` folds over the neighbourhood (think like Scala's fold)
  // (`Plus` means "without self"--with plain `foldhood`, the device itself is folded)
  // `nbr(e)` denotes the values to be locally computed and shared with neighbours
  // `nbrRange` is a sensor that, when folding, returns the distance wrt each neighbour
  def gradient(source: Boolean): Double =
    rep(Double.PositiveInfinity){ distance =>
      mux(source) { 0.0 } {
        foldhoodPlus(Double.PositiveInfinity)(Math.min)(nbr{distance}+nbrRange)
      }
    }

  // A custom local sensor
  def isSource = sense[Boolean]("source")
  // A custom "neighbouring sensor"
  def nbrRange = nbrvar[Double](NBR_RANGE_NAME)
}

Step 2-3: Use the ScaFi internal simulator to run the program on a predefined network of devices.

import it.unibo.scafi.simulation.gui.{Launcher, Settings}

object SimulationRunner extends Launcher {
  Settings.Sim_ProgramClass = "experiments.MyAggregateProgram"
  Settings.ShowConfigPanel = true
  launch()
}

Alternatively, you can use the approach taken in Building Aggregate Systems. Indeed, an AggregateSystem object can be seen as a function from Context to Export: you give a certain context, and get some export. The export must be passed to neighbours so that they can build their own context and re-interpret the aggregate program.

Step 2-4: After a simple infestigation, you may want to switch to a more sophisticated simulator, like Alchemist. Take a look at Alchemist simulator for details about the use of ScaFi within Alchemist.

ScaFi Architecture

From a deployment perspective, ScaFi consists of the following modules:

  • scafi-commons: provides basic entities (e.g., spatial and temporal abstractions)
  • scafi-core: represents the core of the project and provides an implementation of the ScaFi aggregate programming DSL, together with its standard library
  • scafi-simulator: provides a basic support for simulating aggregate systems
  • scafi-simulator-gui: provides a GUI for visualising simulations of aggregate systems
  • spala: provides an actor-based aggregate computing middleware
  • scafi-distributed: ScaFi integration-layer for spala

The modules to be imported (e.g., via sbt or Gradle) depend on the use case:

  • Development of a real-world aggregate application. Bring scafi-core in for a fine-grained integration. For more straightforward distributed system setup, take a look at scafi-distributed.
  • Play, exercise, and experiment with aggregate programming. Bring scafi-core in for writing aggregate programs as well as scafi-simulator-gui to quickly render an executing system.
  • Set up sophisticated simulations Bring scafi-core in for writing aggregate programs and either (A) leverage the basic machinery provided by scafi-simulator, or (B) leverage the ScaFi support provided by Alchemist.

Aggregate Programming

Here, we explain the basic constructs of the field calculus, which are the core of the aggregate programming paradigm. By combining these constructs, higher-level functions can be defined to capture increasingly complex collective behaviour.

Consider the Constructs trait.

trait Constructs {
  def rep[A](init: => A)(fun: A => A): A
  def nbr[A](expr: => A): A
  def foldhood[A](init: => A)(acc: (A, A) => A)(expr: => A): A
  def aggregate[A](f: => A): A

  // the following (aggregate IF construct) can be defined upon AGGREGATE()
  def branch[A](cond: => Boolean)(th: => A)(el: => A)
  // the following is a variant of REP()
  def share[A](init: => A)(fun: (A, () => A) => A): A

  def mid: ID
  def sense[A](sensorName: String): A
  def nbrvar[A](name: NSNS): A
}

TBD

ScaFi Standard Library

The ScaFi standard library is currently organised into the following modules:

  • FieldUtils: define functionality to simplify aggregation of values from neighbours, accessible through two objects includingSelf or excludingSelf with obvious semantics: sumHood(e), unionHood(e), unionHoodSet(e), mergeHood(e)(overwritePolicy), anyHood(e), everyHood(e)
  • Gradients: defines gradient functions, such as: classicGradient(src,metric)
  • BlockG: defines the gradient-cast (G) building block for propagating information, and related functionality such as: distanceTo(src,metric), broadcast(src,x,metric), channel(src,target,width),
    • BlockC: defines the converge-cast (C) building block for collecting information along a spanning tree
  • BlockS: defines the sparce-choice (s) building block for leader election
  • BlocksWithGC: defines functionality that leverage C and G, such as: summarize(sink,acc,local,Null), average(sink,x)
  • StateManagement: provides utility functions over rep, such as: roundCounter(), remember(x), delay(x), captureChange(x,initially), countChanges(x,initially), goesUp(x), goesDown(x)
  • TimeUtils: provides time-related functionality, such as: T(init,floor,decay) and variants, timer(length), limitedMemory(x,y,timeout), clock(len,dcay), sharedTimeWithDecay(period,dt), cyclicTimerWithDacay(len,decay)
  • CustomSpawn: provides support for aggregate processes through spawn function

To use a library component, just mix in your aggregate program class using keyword with:

class MyProgram extends AggregateProgram with BlockG with BlockS { /*...*/ }

Parts of the ScaFi library are described in the following papers:

TBD

Basic API

TBD

Field-operation utilities

Gradients

Gradient-cast (G)

Collect-cast (C)

Time-decay (T)

Leader election (S)

Block S (Sparse-choice) is used to perform decentralised leader election on a spatial basis.

val leader = S(grain = 10, metric = () => 1)

The previous snippet is used to elect leaders with a mean distance of 10 hops between two leaders. The output is a boolean field that is true in correspondence of devices that are currently leader.

Process API

For an introduction to the aggregate process concept and its API, consider reading the following paper:

TBD

Domain-specific APIs

TBD

Flocking

Crowd management

Simulating Aggregate Systems

Simulation of aggregate systems involves the following:

  1. Defining the structure of an aggregate system: in terms of devices (including their sensors and actuators), connectivity between devices (neighbouring relationship), and the environment in which devices are situated.
  2. Defining the behaviour of an aggregate system: through an aggregate program expressed in the ScaFi DSL.
  3. Defining the simulation setup: including simulation parameters, data to be exported, environment dynamics, and scheduling of computation rounds.

ScaFi simulator

The ScaFi simulator consists of multiple modules:

  • scafi-simulator: provides a basic support for simulating aggregate systems
  • scafi-simulator-gui: provides a Swing GUI for visualising simulations of aggregate systems
  • scafi-simulator-gui-new: provides a JavaFX GUI for visualising simulations of aggregate systems
  • NOTE: there is also on-going work for adding support to 3D simulations, see PR#38

ScaFi simulator engine

The ScaFi simulator engine is quite basic.

The idea for its usage is to leverage a factory, object simulatorFactory, to build a simulation NETWORK object, upon which various exec methods are available for scheduling computation rounds on the devices. On a network, there are also methods like addSensor and chgSensorValue for programming the sensors of devices.

Consider the following example:

object DemoSequenceLauncher extends App {

  val net = simulatorFactory.gridLike(GridSettings(6, 4, stepx = 1, stepy = 1), rng = 1.1)

  net.addSensor(name = "sensor", value = 0)
  net.chgSensorValue(name = "sensor", ids = Set(1), value = 1)
  net.addSensor(name = "source", value = false)
  net.chgSensorValue(name = "source", ids = Set(3), value = true)
  net.addSensor(name = "sensor2", value = 0)
  net.chgSensorValue(name = "sensor2", ids = Set(98), value = 1)
  net.addSensor(name = "obstacle", value = false)
  net.chgSensorValue(name = "obstacle", ids = Set(44,45,46,54,55,56,64,65,66), value = true)
  net.addSensor(name = "label", value = "no")
  net.chgSensorValue(name = "label", ids = Set(1), value = "go")

  var v = java.lang.System.currentTimeMillis()

  net.executeMany(
    node = DemoSequence,//new HopGradient("source"),
    size = 1000000,
    action = (n,i) => {
      if (i % 1000 == 0) {
        println(net)
        val newv = java.lang.System.currentTimeMillis()
        println(newv-v)
        println(net.context(4))
        v=newv
      }
    })
}

Graphical simulator

For the usage of the graphical simulator, consider the following example:

package experiments

import it.unibo.scafi.incarnations.BasicSimulationIncarnation.AggregateProgram

object MyAggregateProgram extends AggregateProgram {

  override def main() = gradient(isSource)

  def gradient(source: Boolean): Double =
    rep(Double.PositiveInfinity){ distance =>
      mux(source) { 0.0 } {
        foldhood(Double.PositiveInfinity)(Math.min)(nbr{distance}+nbrRange)
      }
    }

  def isSource = sense[Boolean]("source")
  def nbrRange = nbrvar[Double](NBR_RANGE_NAME)
}

import it.unibo.scafi.simulation.gui.{Launcher, Settings}

object SimulationRunner extends Launcher {
  Settings.Sim_ProgramClass = "experiments.MyAggregateProgram"
  Settings.ShowConfigPanel = true
  launch()
}

Alchemist simulator

The ScaFi specific part in an Alchemist simulation descriptor is as follows:

incarnation: scafi

pools:
  - pool: &program
      - time-distribution: 1
        type: Event
        actions:
          - type: RunScafiProgram
            parameters: [it.unibo.casestudy.MyProgram, 5.0]
            # 1st argument is the fully-qualified classname of your program (class extending AggregateProgram)
            # 2nd argument is retention time (i.e., amount of simulated time for which exports by neighbours are kept)
      - program: send # this is needed for broadcasting export to neighbours after program execution

NOTE: the send program is needed, otherwise there would be no communication among devices, preventing the unfolding of the aggregate logic.

NOTE: the ScaFi incarnation does not attempt to create defaults for undeclared molecules. So, you need to declare all your molecules (including exports–otherwise, the exporter component will try to export a yet-to-be-created molecule) before accessing them (e.g., be sure to perform a node.put() before a corresponding node.get()), or the following error might occur The molecule does not exist and cannot create empty concentration.

A full Alchemist simulation descriptor is as follows (node: refer to the Alchemist Documentation for more up-to-date information):

variables:
  random: &random
    min: 0
    max: 29
    step: 1
    default: 2

export:
  - time

seeds:
  scenario: *random
  simulation: *random

incarnation: scafi

environment:
  type: Continuous2DEnvironment
  parameters: []

network-model:
  type: ConnectWithinDistance #*connectionType
  parameters: [*range]

pools:
  - pool: &program
      - time-distribution:
          type: ExponentialTime
          parameters: [1]
        type: Event
        actions:
          - type: RunScafiProgram
            parameters: [it.unibo.casestudy.MyProgram, 5.0]
      - program: send
  - pool: &move
      - time-distribution: 1
        type: Event
        actions: []

displacements:
  - in:
      type: Grid
      parameters: [0, 0, 200, 80, 10, 10, 1, 1]
    programs:
      - *move
      - *program
    contents:
      - molecule: test
        concentration: true

Building Aggregate Systems

The major challenge in building aggregate systems is dealing with distribution.

Executing individual computational rounds is very easy, as demonstrated by the following snippet (standalone setup):

import it.unibo.scafi.incarnations.BasicAbstractIncarnation

object MyIncarnation extends BasicAbstractIncarnation

import MyIncarnation._

class BasicUsageProgram extends AggregateProgram  {
  override def main(): Any = rep(0)(_ + 1)
}

object BasicUsage extends App {
  val program = new BasicUsageProgram()
  val c1 = factory.context(selfId = 0, exports = Map(), lsens = Map(), nbsens = Map())
  val e1 = program.round(c1)

  val c2 = factory.context(0, Map(0 -> e1))
  val e2 = program.round(c2)

  println(s"c1=$c1\ne1=$e1\n\nc2=$c2\ne2=$e2")
}

The following activites must be implemented

  • execution of a reactive or temporally-delayed loop for executing computation rounds
  • discovery of neighbours, and corresponding send and reception of exports from neighbours
  • de/serialisation of exports (definition of the format, and transparent de/serialisation)
  • implementation of a logical neighbouring relationship
  • using a time window for retention of exports which also takes into account failure in export delivery
  • implementation of an interface to sensors and actuators

Many of such activities are application- and deployment-specific, so it is not easy to come up with a general middleware solution. However, we are working on it.

Take a look at the following paper for middleware- and deployment-level considerations:

ScaFi Developer Manual

For project contributors

Contributions to this project are welcome. Some advices:

  • As we use git flow, use feature-xxx branches.
  • We recommend forking the project, developing your stuff, then contributing back via pull requests (PRs) directly from the Web interface.
  • Stay in sync with the develop branch: pull often from develop, so that you don’t diverge too much from the main development line.
  • Avoid introducing technical debt. In any case, merge requests will be reviewed before merge.

Contributing process

Follow these steps to contribute.

  1. Fork the official ScaFi repository.
  2. Prepare your environment.
    1. Clone your own fork.
      $ git clone git@github.com:<username>/scafi.git
      $ cd scafi
      $ git checkout -b develop origin/develop
      
    2. Open your project with your favourite IDE, e.g., IntelliJ Idea Community Edition (note: it needs the Scala plugin installed)
      • Open as sbt project
  3. Determine what your contribution will focus on. For instance, look at open issues.
  4. Develop.
    • As we use git flow, use feature-xxx branches.
  5. Merge your contribution.
    • From your fork, open a pull request (PR)
    • Wait for the project maintainers to perform a series of checks and merge your branch into the official repository.
  6. Congratulations! Go back to step 3.

Building the project

Once you have cloned your ScaFi repository, you can build the project using sbt.

You can run tests:

sbt test

Generate the docs:

sbt unidoc

Something missing?