Wednesday, February 8, 2023
HomeArtificial IntelligenceParallelized sampling utilizing exponential variates

Parallelized sampling utilizing exponential variates


As a part of our latest work to assist weighted sampling of Spark information frames in sparklyr, we launched into a journey trying to find algorithms that may carry out weighted sampling, particularly sampling with out substitute, in environment friendly and scalable methods inside a distributed cluster-computing framework, resembling Apache Spark.

Within the curiosity of brevity, “weighted sampling with out substitute” shall be shortened into SWoR for the rest of this weblog put up.

Within the following sections, we’ll clarify and illustrate what SWoR means probability-wise, briefly define some different options we’ve got thought of however weren’t utterly happy with, after which deep-dive into exponential variates, a easy mathematical assemble that made the perfect resolution for this drawback attainable.

Should you can’t wait to leap into motion, there’s additionally a part during which we showcase instance usages of sdf_weighted_sample() in sparklyr. As well as, you may look at the implementation element of sparklyr::sdf_weighted_sample() on this pull request.

How it began

Our journey began from a Github situation inquiring about the potential for supporting the equal of dplyr::sample_frac(..., weight = <weight_column>) for Spark information frames in sparklyr. For instance,

dplyr::sample_frac(mtcars, 0.25, weight = gear, exchange = FALSE)
##                    mpg cyl  disp  hp drat    wt  qsec vs am gear carb
## Merc 280C         17.8   6 167.6 123 3.92 3.440 18.90  1  0    4    4
## Chrysler Imperial 14.7   8 440.0 230 3.23 5.345 17.42  0  0    3    4
## Fiat X1-9         27.3   4  79.0  66 4.08 1.935 18.90  1  1    4    1
## Hornet Sportabout 18.7   8 360.0 175 3.15 3.440 17.02  0  0    3    2
## Valiant           18.1   6 225.0 105 2.76 3.460 20.22  1  0    3    1
## Porsche 914-2     26.0   4 120.3  91 4.43 2.140 16.70  0  1    5    2
## Maserati Bora     15.0   8 301.0 335 3.54 3.570 14.60  0  1    5    8
## Ferrari Dino      19.7   6 145.0 175 3.62 2.770 15.50  0  1    5    6

will randomly choose one-fourth of all rows from a R information body named “mtcars” with out substitute, utilizing mtcars$gear as weights. We had been unable to search out any operate implementing the weighted variations of dplyr::sample_frac amongst Spark SQL built-in capabilities in Spark 3.0 or in earlier variations, which suggests a future model of sparklyr might want to run its personal weighted sampling algorithm to assist such use instances.

What precisely is SWoR

The aim of this part is to mathematically describe the likelihood distribution generated by SWoR when it comes to (w_1, dotsc, w_N), in order that readers can clearly see that the exponential-variate based mostly algorithm introduced in a subsequent part the truth is samples from exactly the identical likelihood distribution. Readers already having a crystal-clear psychological image of what SWoR entails ought to most likely skip most of this part. The important thing take-away right here is given (N) rows (r_1, dotsc, r_N) and their weights (w_1, dotsc, w_N) and a desired pattern measurement (n), the likelihood of SWoR choosing ((r_1, dotsc, r_n)) is (prodlimits_{j = 1}^{n} left( {w_j} center/ {sumlimits_{okay = j}^{N}{w_k}} proper)).

SWOR is conceptually equal to a (n)-step course of of choosing 1 out of ((n – j + 1)) remaining rows within the (j)-th step for (j in {1, dotsc, n}), with every remaining row’s probability of getting chosen being linearly proportional to its weight in any of the steps, i.e.,

samples := {}
inhabitants := {r[1], ..., r[N]}

for j = 1 to n
  choose r[x] from inhabitants with likelihood
    (w[x] / TotalWeight(inhabitants))
  samples := samples + {r[x]}
  inhabitants := inhabitants - {r[x]}

Discover the result of a SWoR course of is the truth is order-significant, which is why on this put up it can at all times be represented as an ordered tuple of parts.

Intuitively, SWoR is analogous to throwing darts at a bunch of tiles. For instance, let’s say the dimensions of our pattern house is 5:

  • Think about (r_1, r_2, dotsc, r_5) as 5 rectangular tiles laid out contiguously on a wall with widths (w_1, w_2, dotsc, w_5), with (r_1) overlaying ([0, w_1)), (r_2) covering ([w_1, w_1 + w_2)), …, and (r_5) covering (left[sumlimits_{j = 1}^{4} w_j, sumlimits_{j = 1}^{5} w_jright))

  • Equate drawing a random sample in each step to throwing a dart uniformly randomly within the interval covered by all tiles that are not hit yet

  • After a tile is hit, it gets taken out and remaining tiles are re-arranged so that they continue to cover a contiguous interval without overlapping

If our sample size is 3, then we shall ask ourselves what is the probability of the dart hitting ((r_1, r_2, r_3)) in that order?

In step (j = 1), the dart will hit (r_1) with probability (left. w_1 middle/ left(sumlimits_{k = 1}^{N}w_kright) right.)

step 1 .

After deleting (r_1) from the sample space after it’s hit, step (j = 2) will look like this:

step 2 ,

and the probability of the dart hitting (r_2) in step 2 is (left. w_2 middle/ left(sumlimits_{k = 2}^{N}w_kright) right.) .

Finally, moving on to step (j = 3), we have:

step 3 ,

with the probability of the dart hitting (r_3) being (left. w_3 middle/ left(sumlimits_{k = 3}^{N}w_kright) right.).

So, combining all of the above, the overall probability of selecting ((r_1, r_2, r_3)) is (prodlimits_{j = 1}^{3} left( {w_j} middle/ {sumlimits_{k = j}^{N}{w_k}} right)).

Naive approaches for implementing SWoR

This section outlines some possible approaches that were briefly under consideration. Because none of these approaches scales well to a large number of rows or a non-trivial number of partitions in a Spark data frame, we decided to avoid all of them in sparklyr.

A tree-base approach

One possible way to accomplish SWoR is to have a mutable data structure keeping track of the sample space at each step.

Continuing with the dart-throwing analogy from the previous section, let us say initially, none of the tiles has been taken out yet, and a dart has landed at some point (x in left[0, sumlimits_{k = 1}^{N} w_kright)). Which tile did it hit? This can be answered efficiently if we have a binary tree, pictured as the following (or in general, some (b)-ary tree for integer (b ge 2))

.

To find the tile that was hit given the dart’s position (x), we simply need to traverse down the tree, going through the box containing (x) in each level, incurring a (O(log(N))) cost in time complexity for each sample. To take a tile out of the picture, we update the width of the tile to (0) and propagate this change upwards from leaf level to root of the tree, again incurring a (O(log(N))) cost in time complexity, making the overall time complexity of selecting (n) samples (O(n cdot log(N))), which is not so great for large data sets, and also, not parallelizable across multiple partitions of a Spark data frame.

Rejection sampling

Another possible approach is to use rejection sampling. In term of the previously mentioned dart-throwing analogy, that means not removing any tile that is hit, hence avoiding the performance cost of keeping the sample space up-to-date, but then having to re-throw the dart in each of the subsequent rounds until the dart lands on a tile that was not hit previously. This approach, just like the previous one, would not be performant, and would not be parallelizable across multiple partitions of a Spark data frame either.

A solution that has proven to be much better than any of the naive approaches turns out to be a numerical stable variant of the algorithm described in “Weighted Random Sampling” (Efraimidis and Spirakis 2016) by Pavlos S. Efraimidis and Paul G. Spirakis.

A version of this sampling algorithm implemented by sparklyr does the following to sample (n) out of (N) rows from a Spark data frame (X):

  • For each row (r_j in X), draw a random number (u_j) independently and uniformly randomly from ((0, 1)) and compute the key of (r_j) as (k_j = ln(u_j) / w_j), where (w_j) is the weight of (r_j). Perform this calulation in parallel across all partitions of (X).
  • Select (n) rows with largest keys and return them as the result. This step is also mostly parallelizable: for each partition of (X), one can select up to (n) rows having largest keys within that partition as candidates, and after selecting candidates from all partitions in parallel, simply extract the top (n) rows among all candidates, and return them as the (n) chosen samples.

There are at least 4 reasons why this solution is highly appealing and was chosen to be implemented in sparklyr:

  • It is a one-pass algorithm (i.e., only need to iterate through all rows of a data frame exactly once).
  • Its computational overhead is quite low (as selecting top (n) rows at any stage only requires a bounded priority queue of max size (n), which costs (O(log(n))) per update in time complexity).
  • More importantly, most of its required computations can be performed in parallel. In fact, the only non-parallelizable step is the very last stage of combining top candidates from all partitions and choosing the top (n) rows among those candidates. So, it fits very well into the world of Spark / MapReduce, and has drastically better horizontal scalability compared to the naive approaches.
  • Bonus: It is also suitable for weighted reservoir sampling (i.e., can sample (n) out of a possibly infinite stream of rows according to their weights such that at any moment the (n) samples will be a weighted representation of all rows that have been processed so far).

Why does this algorithm work

As an interesting aside, some readers have probably seen this technique presented in a slightly different form under another name. It is in fact equivalent to a generalized version of the Gumbel-max trick which is commonly referred to as the Gumbel-top-k trick. Readers familiar with properties of the Gumbel distribution will no doubt have an easy time convincing themselves the algorithm above works as expected.

In this section, we will also present a proof of correctness for this algorithm based on elementary properties of probability density function (shortened as PDF from now on), cumulative distribution function (shortened as CDF from now on), and basic calculus.

First of all, to make sense of all the (ln(u_j) / w_j) calculations in this algorithm, one has to understand inverse transform sampling. For each (j in {1, dotsc, N}), consider the probability distribution defined on ((-infty, 0)) with CDF (F_j(x) = e^{w_j cdot x}). In order to pluck out a value (y) from this distribution, we first sample a value (u_j) uniformly randomly out of ((0, 1)) that determines the percentile of (y) (i.e., how our (y) value ranks relative to all possible (y) values, a.k.a, the “overall population,” from this distribution), and then apply (F_j^{-1}) to (u_j) to find (y), so, (y = F_j^{-1}(u_j) = ln(u_j) / w_j).

Secondly, after defining all the required CDF functions (F_j(x) = e^{w_j cdot x}) for (j in {1, dotsc, N}), we can also easily derive their corresponding PDF functions (f_j): [f_j(x) = frac{d F_j(x)}{dx} = w_j e^{w_j cdot x}].

Lastly, with a transparent understanding of the household of likelihood distributions concerned, one can show the likelihood of this algorithm choosing a given sequence of rows ((r_1, dotsc, r_n)) is the same as (prodlimits_{j = 1}^{n} left( {w_j} center/ {sumlimits_{okay = j}^{N}{w_k}} proper)), an identical to the likelihood beforehand talked about within the “What precisely is SWoR part, which means the attainable outcomes of this algorithm will observe precisely the identical likelihood distribution as that of a (n)-step SWoR.

In an effort to not deprive our pricey readers the pleasure of finishing this proof by themselves, we’ve got determined to not inline the remainder of the proof (which boils right down to a calculus train) inside this weblog put up, however it’s accessible in this file.

Whereas all earlier sections targeted solely on weighted sampling with out substitute, this part will briefly focus on how the exponential-variate method may also profit the weighted-sampling-with-replacement use case (which can be shortened as SWR any further).

Though SWR with pattern measurement (n) will be carried out by (n) impartial processes every choosing (1) pattern, parallelizing a SWR workload throughout all partitions of a Spark information body (let’s name it (X)) will nonetheless be extra performant if the variety of partitions is far bigger than (n) and greater than (n) executors can be found in a Spark cluster.

An preliminary resolution we had in thoughts was to run SWR with pattern measurement (n) in parallel on every partition of (X), after which re-sample the outcomes based mostly on relative whole weights of every partition. Regardless of sounding deceptively easy when summarized in phrases, implementing such an answer in apply can be a reasonably difficult process. First, one has to use the alias technique or related so as to carry out weighted sampling effectively on every partition of (X), and on high of that, implementing the re-sampling logic throughout all partitions accurately and verifying the correctness of such process may even require appreciable effort.

As compared, with the assistance of exponential variates, a SWR carried out as (n) impartial SWoR processes every choosing (1) pattern is far less complicated to implement, whereas nonetheless being similar to our preliminary resolution when it comes to effectivity and scalability. An instance implementation of it (which takes fewer than 60 traces of Scala) is introduced in samplingutils.scala.

How do we all know sparklyr::sdf_weighted_sample() is working as anticipated? Whereas the rigorous reply to this query is introduced in full within the testing part, we thought it will even be helpful to first present some histograms that may assist readers visualize what that check plan is. Subsequently on this part, we’ll do the next:

  • Run dplyr::slice_sample() a number of instances on a small pattern house, with every run utilizing a special PRNG seed (pattern measurement can be decreased to (2) right here in order that there’ll fewer than 100 attainable outcomes and visualization can be simpler)
  • Do the identical for sdf_weighted_sample()
  • Use histograms to visualise the distribution of sampling outcomes

All through this part, we’ll pattern (2) parts out of ({0, dotsc, 7}) with out substitute in keeping with some weights, so, step one is to arrange the next in R:

library(sparklyr)

sc <- spark_connect(grasp = "native")

# `octs` can be our pattern house
octs <- information.body(
  x = seq(0, 7),
  weight = c(1, 4, 2, 8, 5, 7, 1, 4)
)
# `octs_sdf` can be our pattern house copied right into a Spark information body
octs_sdf <- copy_to(sc, octs)

sample_size <- 2

In an effort to tally up and visualize the sampling outcomes effectively, we will map every attainable consequence to an octal quantity (e.g., (6, 7) will get mapped to (6 cdot 8^0 + 7 cdot 8^1)) utilizing a helper operate to_oct in R:

to_oct <- operate(pattern) sum(8 ^ seq(0, sample_sz - 1) * pattern$x)

We additionally have to tally up sampling outcomes from dplyr::slice_sample() and sparklyr::sdf_weighted_sample() in 2 separate arrays:

max_possible_outcome <- to_oct(record(x = seq(8 - sample_sz, 7)))

sdf_weighted_sample_outcomes <- rep(0, max_possible_outcome)
dplyr_slice_sample_outcomes <- rep(0, max_possible_outcome)

Lastly, we will run each dplyr::slice_sample() and sparklyr::sdf_weighted_sample() for arbitrary variety of iterations and examine tallied outcomes from each:

num_sampling_iters <- 1000  # truly we'll differ this worth from 500 to 5000

for (x in seq(num_sampling_iters)) {
  sample1 <- octs_sdf %>%
    sdf_weighted_sample(
      okay = sample_size, weight_col = "weight", substitute = FALSE, seed = seed
    ) %>%
    accumulate() %>%
    to_oct()
  sdf_weighted_sample_outcomes[[sample1]] <-
      sdf_weighted_sample_outcomes[[sample1]] + 1

  seed <- x * 97
  set.seed(seed) # set random seed for dplyr::sample_slice()
  sample2 <- octs %>%
    dplyr::slice_sample(
      n = sample_size, weight_by = weight, exchange = FALSE
    ) %>%
    to_oct()
  dplyr_slice_sample_outcomes[[sample2]] <-
      dplyr_slice_sample_outcomes[[sample2]] + 1
}

After all of the exhausting work above, we will now take pleasure in plotting the sampling outcomes from dplyr::slice_sample() and people from sparklyr::sdf_weighted_sample() after 500, 1000, and 5000 iterations and observe how the distributions of each begin converging after a lot of iterations.

Sampling outcomes after 500, 1000, and 5000 iterations, proven in 3 histograms:


(you’ll most likely have to view it in a separate tab to see every part clearly)

Whereas parallelized sampling based mostly on exponential variates seems incredible on paper, there are nonetheless loads of potential pitfalls in relation to translating such thought into code, and as normal, testing plan is important to make sure implementation correctness.

For example, numerical instability points from floating level numbers come up if (ln(u_j) / w_j) had been changed by (u_j ^ {1 / w_j}) within the aforementioned computations.

One other extra refined supply of error is the utilization of PRNG seeds. For instance, contemplate the next:

  def sampleWithoutReplacement(
    rdd: RDD[Row],
    weightColumn: String,
    sampleSize: Int,
    seed: Lengthy
  ): RDD[Row] = {
    val sc = rdd.context
    if (0 == sampleSize) {
      sc.emptyRDD
    } else {
      val random = new Random(seed)
      val mapRDDs = rdd.mapPartitions { iter =>
        for (row <- iter) {
          val weight = row.getAs[Double](weightColumn)
          val key = scala.math.log(random.nextDouble) / weight
          <after which make sampling choice for `row` based mostly on its `key`,
           as described within the earlier part>
        }
        ...
      }
      ...
    }
  }

Regardless that it would look OK upon first look, rdd.mapPartitions(...) from the above will trigger the identical sequence of pseudorandom numbers to be utilized to a number of partitions of the enter Spark information body, which is able to trigger undesired bias (i.e., sampling outcomes from one partition may have non-trivial correlation with these from one other partition when such correlation must be negligible in an accurate implementation).

The code snippet under is an instance implementation during which every partition of the enter Spark information body is sampled utilizing a special sequence of pseudorandom numbers:

  def sampleWithoutReplacement(
    rdd: RDD[Row],
    weightColumn: String,
    sampleSize: Int,
    seed: Lengthy
  ): RDD[Row] = {
    val sc = rdd.context
    if (0 == sampleSize) {
      sc.emptyRDD
    } else {
      val mapRDDs = rdd.mapPartitionsWithIndex { (index, iter) =>
        val random = new Random(seed + index)

        for (row <- iter) {
          val weight = row.getAs[Double](weightColumn)
          val key = scala.math.log(random.nextDouble) / weight
          <after which make sampling choice for `row` based mostly on its `key`,
           as described within the earlier part>
        }

        ...
      }
    ...
  }
}

An instance check case during which a two-sided Kolmogorov-Smirnov check is used to match distribution of sampling outcomes from dplyr::slice_sample() with that from sparklyr::sdf_weighted_sample() is proven in this file. Such exams have confirmed to be efficient in surfacing non-obvious implementation errors resembling those talked about above.

Please be aware the sparklyr::sdf_weighted_sample() performance will not be included in any official launch of sparklyr but. We’re aiming to ship it as a part of sparklyr 1.4 in about 2 to three months from now.

In the mean time, you may attempt it out with the next steps:

First, be sure remotes is put in, after which run

to put in sparklyr from supply.

Subsequent, create a check information body with numeric weight column consisting of non-negative weight for every row, after which copy it to Spark (see code snippet under for example):

library(sparklyr)

sc <- spark_connect(grasp = "native")

example_df <- information.body(
  x = seq(100),
  weight = c(
    rep(1, 50),
    rep(2, 25),
    rep(4, 10),
    rep(8, 10),
    rep(16, 5)
  )
)
example_sdf <- copy_to(sc, example_df, repartition = 5, overwrite = TRUE)

Lastly, run sparklyr::sdf_weighted_sample() on example_sdf:

sample_size <- 5

samples_without_replacement <- example_sdf %>%
  sdf_weighted_sample(
    weight_col = "weight",
    okay = sample_size,
    substitute = FALSE
  )

samples_without_replacement %>% print(n = sample_size)
## # Supply: spark<?> [?? x 2]
##       x weight
##   <int>  <dbl>
## 1    48      1
## 2    22      1
## 3    78      4
## 4    56      2
## 5   100     16
samples_with_replacement <- example_sdf %>%
  sdf_weighted_sample(
    weight_col = "weight",
    okay = sample_size,
    substitute = TRUE
  )

samples_with_replacement %>% print(n = sample_size)
## # Supply: spark<?> [?? x 2]
##       x weight
##   <int>  <dbl>
## 1    86      8
## 2    97     16
## 3    91      8
## 4   100     16
## 5    65      2

In the beginning, the writer needs to thank @ajing for reporting the weighted sampling use instances weren’t correctly supported but in sparklyr 1.3 and suggesting it must be a part of some future model of sparklyr on this Github situation.

Particular thanks additionally goes to Javier (@javierluraschi) for reviewing the implementation of all exponential-variate based mostly sampling algorithms in sparklyr, and to Mara (@batpigandme), Sigrid (@Sigrid), and Javier (@javierluraschi) for his or her invaluable editorial strategies.

We hope you’ve got loved studying this weblog put up! Should you want to be taught extra about sparklyr, we suggest visiting sparklyr.ai, spark.rstudio.com, and a number of the earlier launch posts resembling sparklyr 1.3 and sparklyr 1.2. Additionally, your contributions to sparklyr are greater than welcome. Please ship your pull requests by right here and file any bug report or function request in right here.

Thanks for studying!

Efraimidis, Pavlos, and Paul (Pavlos) Spirakis. 2016. “Weighted Random Sampling.” In Encyclopedia of Algorithms, edited by Ming-Yang Kao, 2365–67. New York, NY: Springer New York. https://doi.org/10.1007/978-1-4939-2864-4_478.



Supply hyperlink

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments