YoMo LogoYoMo's Blog

DEBS Grand Challenge: Real-Time Stream Processing with YoMo

2021-05-13
Ivy Guo (Intern)

Introduction

DEBS or the ACM International Conference on Distributed Event Based Systems aims to "provide a forum dedicated to the dissemination of original research, the discussion of practical insights, and the reporting of experiences relevant to event based computing that were previously scattered across several scientific and professional communities" (reference).

The DEBS 2014 Grand Challenge - Smart Grid is the 8th ACM International Conference on Distributed Event Based Systems, focused on two problems which are relevant to the industry: real-time load prediction and anomaly detection. The data for the challenge was collected from a number of smart-home installations deployed in Germany.

In traditional processing, data is often stored in the database at first, and then processed to obtain useful information at a secondary stage. The traditional architecture:

yomo debs2014 storm arch

However, with stream processing, we're swiftly able to run real-time analytics on incoming data streams. By utilizing Yomo, an open-source serverless stream processing framework, we can address DEBS' challenge in a real-time fashion:

yomo debs2014 new arch

Firstly, we will take a look at the type of data we're dealing with. Next, we will introduce two queries that were originally proposed by ACM DEBS. Lastly, we install YoMo (an open-source framework for real-time stream processing) to implement both queries as described.

Data

From Jerzak and Ziekow (2014):

For the DEBS 2014 Grand Challenge we assume a hierarchical structure with a house, identified by a unique house id, being the topmost entity. Every house contains one or more households, identified by a unique household id (within a house). Every household contains one or more smart plugs, each identified by a unique plug id (within a household). Every smart plug contains exactly two sensors: (1) load sensor measuring current load with Watt as unit (2) work sensor measuring total accumulated work since the start (or reset) of the sensor with kWh as unit.

The input stream is defined as follows:

  • id – a unique identifier of the measurement [32 bit unsigned int]
  • timestamp – timestamp of measurement [32 bit unsigned int]
  • value – the measurement [32 bit float]
  • property – type of the measurement: 0 for work or 1 for load [boolean]
  • plug_id – a unique identifier (within a household) of the smart plug [32 bit unsigned int]
  • household_id – a unique identifier of a household (within a house) where the plug is located [32 bit unsigned int]
  • house_id – a unique identifier of a house where the household with the plug is located [32 bit unsigned int]

The complete data file is available under this link. For demonstration purposes, we will generate mock data using this file. In real life, we should be dealing with direct sensor data.

Queries

  1. Load Prediction

    Let's divide the whole period from tstart=1377986401t_{start} = 1377986401 to tend=1380578399t_{end} = 1380578399 covered by the dataset into NN equal slices of s|s| seconds and call them s0s_0, s1s_1, s2s_2, etc.

    The average load for slice si+2s_{i + 2} is given by

    L(si+2)=(avgLoad(si)+median(avgLoad(sj)))/2L(s_{i + 2}) = (avgLoad(s_i) + median({ avgLoad(s_j) })) / 2

    sjs_j is a nonempty set defined by s(i+2nk)s_(i + 2 – n * k), where kk is the number of slices within a 24-hour period and nn is a natural number between 11 and floor((i+2)/k)floor((i + 2) / k).

  2. Outliers

    For this query, we will compute the percentage of plugs which have a median load during the last hour greater than the median load of all plugs.

YoMo – what is it and why do we use it?

YoMo is an open-source serverless streaming framework for building low-latency edge computing applications. Built atop QUIC transport protocol and functional reactive programming interface, it makes real-time data processing reliable, secure, and easy.

Getting Started

  1. Prepare servers on AWS EC2, and users from our community tested YoMo on AWS Wavelength and AWS IoT Greengrass

  2. Install yomo CLI: go install -v github.com/yomorun/yomo/cmd/yomo@latest && yomo --version

  3. Clone this repository by typing git clone git@github.com:yomorun/debs-2014.git && cd debs-2014

Algorithm Implementation

By default, the Handler function in debs-flow/app.go should look as follows.

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(decoder).
        Map(printer).
        Encode(0x11)

    return stream
}
  • Subscribe(0x10): Subscribe to the input stream. 0x10 is the key. It is defined in debs-source/main.go.
  • OnObserve(decoder): Decode []byte to interface{}. Empty interfaces are often used by code that handles values of unknown type.
  • Map(printer): Print out the data.

For query #1, we need to define two functions in addition to what we discussed above. We will call them average and predict.

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(decoder).
        Map(printer).
        BufferWithTime(ss * 1e3). // ss stands for slice size
        Map(average).
        Map(predict).
        Encode(0x11)

    return stream
}
// compute the average load for each plug and save the values to db, which is a global variable in this example
func average(_context.Context, i interface{}) (interface{}, error) {
    // convert interface{} to []interface{}
    lst, ok := i.([]interface{})
    if !ok {
        err := fmt.Sprintf("expected type '[]interface{}', got '%v' instead",
            reflect.TypeOf(i))
        fmt.Printf("[average] %v\n", err)
        return nil, fmt.Errorf(err)
    }

    // plug # -> value
    total := make(map[string]float32)
    count := make(map[string]float32)

    for _, elem := range lst {
        // convert interface{} to measurement
        x, ok := elem.(Measurement)
        if !ok {
            err := fmt.Sprintf("expected type 'measurement', got '%v' instead",
                reflect.TypeOf(elem))
            fmt.Printf("[average] %v\n", err)
            return nil, fmt.Errorf(err)
        }

        if x.Property { // load
            plug := x.toString()
            total[plug] += x.Value
            count[plug] += 1.0
        }
    }

    // save to db
    fmt.Println("*** average ***")
    for plug, v := range total {
        avg := v / count[plug]
        fmt.Printf("[s_%v] %v %v\n", idx, plug, avg)

        _, ok := db[plug]
        if !ok {
            db[plug] = make(map[uint32]float32)
        }
        db[plug][idx] = avg
    }
    fmt.Println("***************")
    return i, nil
}

// make predictions based on what we have in db
func predict(_context.Context, i interface{}) (interface{}, error) {
    k := t / ss

    fmt.Println("*** predict ***")
    l := (idx + 2) / k
    if l == 0 {
        fmt.Println("not enough data")
    } else {
        // possible values for j
        lst := make([]uint32, l)
        for m := range lst {
            n := uint32(m + 1)
            j := idx + 2 - n*k
            lst[m] = j
        }

        for plug := range db {
            // average load for s_j
            data := make([]float32, l)
            for m, j := range lst {
                data[m] = db[plug][j]
            }
            pred := (db[plug][idx] + median(data)) / 2
            fmt.Printf("[s_%v] %v %v\n", idx+2, plug, pred)
        }
    }
    fmt.Println("***************")

    idx += 1 // slice #
    return 0.0, nil
}

For query #2, we will define a function called outliers.

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(decoder).
        Map(printer).
        BufferWithTime(ss * 1e3).
        Map(outliers).
        Encode(0x11)

    return stream
}
// which plugs have a median load greater than the median load of all plugs?
func outliers(_context.Context, i interface{}) (interface{}, error) {
    // convert interface{} to []interface{}
    lst, ok := i.([]interface{})
    if !ok {
        err := fmt.Sprintf("expected type '[]interface{}', got '%v' instead",
            reflect.TypeOf(i))
        fmt.Printf("[outliers] %v\n", err)
        return nil, fmt.Errorf(err)
    }

    all := make([]float32, 0, len(lst))
    indiv := make(map[string][]float32) // plug # -> values

    for _, elem := range lst {
        // convert interface{} to measurement
        x, ok := elem.(Measurement)
        if !ok {
            err := fmt.Sprintf("expected type 'measurement', got '%v' instead",
                reflect.TypeOf(elem))
            fmt.Printf("[outliers] %v\n", err)
            return nil, fmt.Errorf(err)
        }

        if x.Property { // load
            all = append(all, x.Value)

            plug := x.toString()
            indiv[plug] = append(indiv[plug], x.Value)
        }
    }

    v := median(all)
    fmt.Printf("all plugs: %v\n", v)

    fmt.Println("*** outliers ***")
    for plug, vs := range indiv {
        m := median(vs)
        if m > v {
            fmt.Printf("[w_%v] %v %v\n", idx, plug, m)
        }
    }
    fmt.Println("****************")

    idx += 1
    return 0.0, nil
}

Now, to run the code, we need to:

  1. Start yomo-zipper: yomo wf run workflow.yaml

  2. Start streaming function load-prediction: go run cmd/stream-fn-realtime-load-prediction/load-prediction.go

  3. Start mock data-source to generate IoT data: go run cmd/mock-data-source/mock-data.go

  4. Start streaming function outliers: go run cmd/stream-fn-realtime-outliers/outliers.go

You might want to try a different set of hyperparameters.

Results

For query #1, you should see something similar to the following.

...
[1620461050] 9.910085 0-1-2 load
[1620461050] 8.087268 0-1-2 work
[1620461050] 13.468374 3-1-2 load
[1620461050] 7.742124 3-1-2 work
[1620461051] 13.738256 0-1-2 load
[1620461051] 16.59261 0-1-2 work
[1620461051] 12.84997 3-1-2 load
[1620461051] 10.838872 3-1-2 work
*** average ***
[s_18] 0-1-2 11.824171
[s_18] 3-1-2 13.159172
***************
*** predict ***
[s_20] 0-1-2 10.375039
[s_20] 3-1-2 12.0406475
***************
...

For query #2, something like this:

...
[1620461271] 6.921172 0-1-2 load
[1620461271] 1.8683584 0-1-2 work
[1620461271] 17.251171 3-1-2 load
[1620461271] 9.761936 3-1-2 work
[1620461272] 10.758014 0-1-2 load
[1620461272] 18.668419 0-1-2 work
[1620461272] 5.806175 3-1-2 load
[1620461272] 1.8562717 3-1-2 work
[1620461273] 0.11624338 0-1-2 load
[1620461273] 5.579194 0-1-2 work
[1620461273] 17.249205 3-1-2 load
[1620461273] 4.9580107 3-1-2 work
[1620461274] 8.087428 0-1-2 load
[1620461274] 7.49426 0-1-2 work
[1620461274] 4.6709924 3-1-2 load
[1620461274] 1.793222 3-1-2 work
[1620461275] 4.8114495 0-1-2 load
[1620461275] 1.9070174 0-1-2 work
[1620461275] 19.199306 3-1-2 load
[1620461275] 9.054778 3-1-2 work
all plugs: 7.5043
*** outliers ***
[s_5] 3-1-2 17.249205
****************
...

About Author

Ivy Guo is a Computer Science student at the University of Washington. If you have any questions, please email Ivy at zhifeig@cs.washington.edu

Further Reading

  • Rohit Gupta, Rinku Shah, and Apurva Mhetre. 2014. In-Memory, High Speed Stream Processing. In Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems (DEBS '14). Association for Computing Machinery, New York, NY, USA, 306–309. DOI: https://doi.org/10.1145/2611286.2611332.

  • Abhinav Sunderrajan, Heiko Aydt, and Alois Knoll. 2014. Real-Time Load Prediction and Outliers Detection using STORM. DEBS 2014 - Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems. 10.1145/2611286.2611327. ResearchGate

  • ACM DEBS Grand Challenge 2014 implementation using Apache Flink: Github

  • DEBS '14: Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems all research papers

References

  • Zbigniew Jerzak and Holger Ziekow. 2014. The DEBS 2014 grand challenge. In Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems (DEBS '14). Association for Computing Machinery, New York, NY, USA, 266–269. DOI: https://doi.org/10.1145/2611286.2611333.

All the source code can be found at: https://github.com/yomorun/debs-2014

More about YoMo

https://github.com/yomorun/yomo