spark basics
from pyspark import SparkContext
sc = SparkContext(appName = "inf551")
linesn = sc.textFile("hello.txt")
Data = sc.parallelize([1,2,3,4,5],2)
Create initial rdd
1. From an external file -- textFile(<path to file>,<optional no. pations>)
E.g. lines = sc.textFile(“./hello.txt”,2)
2. From an python collection (list,tuple,dictionary)
Data = sc.parallelize([1,2,3,4,5],2)
Note: If using spark session. It check immediately
Lines = spark.read.text(“hello.txt”).rdd.map(lambda x:x[0])
Actions
getNumPartitions()
foreachPartition(func)
collect() take(n) count()
sum(), max(), min() , mean()
countByKey()
reduce(func)
aggregate(zeroVal,seqOp,combOp)
takeSample(withReplacement,num,[seed])
1. foreachPartition()
def printf(iterator):
par = list(iterator)
print 'partition:', par
sc.parallelize([1, 2, 3, 4, 5], 2).foreachPartition(printf)
output:
partition: [3, 4, 5]
partition: [1, 2]
2. collect()--Show entire content of RDD
3.take(n)--first n elements
l = [1,2,3,4,5]
rdd = sc.parallelize(l, 2)
rdd.take(3)
=> [1,2,3]
4. count()--no of elements
l = [1,2,3,4,5]
rdd = sc.parallelize(l, 2)
rdd.count()
=> 5
5.sum()-- sums
l = [1,2,3,4,5]
rdd = sc.parallelize(l)
rdd.sum()
=> 15
6.* return !!
countByKey()--Return hashmap (dictionary) of (K, Int) pairs with count of each key
d = [('hello', 1), ('world', 1), ('hello', 2), ('this', 1), ('world',0)]
data = sc.parallelize(d)
data.countByKey()
=> {'this': 1, 'world': 2, 'hello': 2}
7.1 reduce(func)--func to aggregate the elems
func(a,b):--commutative & associative
Takes 2 inputs, a & b, Outputs 1 value, e.g. a + b
reduce(add, [1, 2, 3])=> 6
reduce(lambda a, b: a + b, [1, 2, 3])=> 6
7.2 reduce(func, list, initialValue) = reduce(func, [initialValue] + list)
7.3 reduce(func, list)
x = list[0]
for item in list :
x = func(x, item)
8.aggregate(zeroValue, seqOp, combOp)
8.1 step 1
For each partition p
– reduce(seqOp, p, zeroValue): if p is empty, return zeroValue
# select k, avg(v) from R group by k
step 2
For a list of values, vals, from all partitions:
– reduce(combOp, vals, zeroValue)
8.2 seqOp(), combOp() may not be commutative & associative
seqOp(U, v):
– how to add value v of input RDD into U
– U: accumulator, initial U = zeroValue
– U & v may be different data-type
combOp(U, P):
– how to combine results from multi-partitions
– U: accumulator, initial U = zeroValue
– P: result from a partition
data = sc.parallelize([1], 2)
data.foreachPartition(printf)
P1: []
P2: [1]
data.aggregate(1, add, add)
reduce(add, p, 1)
P1 [] => [1] return zeroValue 1--list [] in rdd, zeroValue is int 1
P2 [1]=> [1] + [1] = [1, 1] => 2
reduce(add, [[1],[2]], 1)=> 4
data.aggregate(2, add, lambda U1, U2: U1 * U2)
[[2][3][2]] =>12
Compute average using aggregate()?
data = sc.parallelize([1, 2, 3, 4, 5])
from operator import add
data.count()
def cf(iterator):
j,sums = 0,0
for i in list[iterator]:
sums += i
j += 1
return (sums, j)
data.aggregate(0, cf, lambda x,y: (x[0]+y[0])/(x[1]+y[1]))
9. takeSample(withReplacement, num, [seed]) = random sample of elems in rdd
withReplacement: True
num: sample size
optional seed: random number generator
l = (x for x in range(100))
sc.parallelize(l).takeSample(False, 5)
– [31, 51, 15, 36, 0]
max() = same in min()
data=[5,4,4,1,2,3,3,1,2,5,4,5]
pdata = sc.parallelize(data)
pdata.reduce(lambda x, y: max(x, y)) => 5
Transformations
• map(func)
• filter(func): a boolean function
• flatMap(func)
• reduceByKey(func, [numTasks])
• groupByKey([numTasks])
• sortByKey([asc], [numTasks])
• distinct([numTasks])
• mapPartitions(func)
• join(rdd,[numTasks]) – leftOuterJoin, rightOuterJoin, fullOuterJoin
• aggregateByKey(zeroValue,seqOp,combOp, [numTasks])
• mapValues(func)
• flatMapValues(func)
• union/intersection/subtract
• subtractByKey
hello.txt
"hello world"
"hello this world"
lines1 = lines.filter(lambda line: "this" in line)
=>['hello this world']
flatMap(func)
func must return a list of elems & flatMap merges lists into a single 1
lines.flatMap(lambda x: x.split())
=>rdd: ['hello', 'world', 'hello', 'this', 'world']
reduceByKey(func)--hadoop: job.setCombinerClass(IntSumReducer);
Input: a collection of (k, v) pairs
Output: a collection of (k, v’) pairs (same k)
• v’:aggregated value of vs in all (k,v)s with same k by applying func
• aggregation function
reduceByKey() returns an RDD– Reduce values per key
reduce() returns a non-RDD value – Reduce all values!
groupByKey()-without func & return (k, Iterable(v))
rddp.groupByKey()
=>[(2, <iterable>), (1, ...), (3, ...)]
rddp.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
=>[(2, [2, 4, 6]), (1, [2, 3, 4, 5]), (3, [5])]
More expensive than reduceByKey(), No local reduction
sortByKey(True/False)
distinct()--Implement it by reduceByKey()
rdd1.join(rdd2)--Joining tuples with same key
ds1 = sc.parallelize([(1,2), (2,3)])
ds2 = sc.parallelize([(2,4), (3,5)])
ds1.join(ds2) = [(2, (3, 4))]
ds1.leftOuterJoin(ds2)– [(1, (2, None)), (2, (3, 4))]
ds1.rightOuterJoin(ds2) – [(2, (3, 4)), (3, (None, 5))]
ds1.fullOuterJoin(ds2)– [(1, (2, None)), (2, (3, 4)), (3, (None, 5))]
mapPartitions(func)
input = 1 iterator (over elems in partition)!!!
func must return 1 iterable = 1 list / yield a generator!!!
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
def sumf(iterator):
sum, count = 0, 0
for v in iterator:
sum += v
count += 1
yield (sum, count) / return [(sum, count)]
rdd.mapPartitions(sumf)
=> rdd: [(3, 2), (12, 3)]
def aggr(x, y):
return (x[0] + y[0], x[1] + y[1])
rdd.mapPartitions(sumf)
reduce(aggr)
=> (15, 5)
rdd.foreachPartition(printf)
output:
patition is: [5, 6, 3, 5]
patition is: [1, 2, 4]
def sumf(iterator):
sum, count = 0, 0
for v in iterator:
sum += v
count += 1
yield (sum, count)
rdd.mapPartitions(sumf).foreachPartition(printf)
output:
patition is: [(1, 1), (3, 2), (7, 3)]
patition is: [(5, 1), (11, 2), (14, 3), (19, 4)]
aggregateByKey(zeroValue,seqOp,combOp)
– Input RDD: a list of (k, v) pairs
– Aggregate values for each key
• Return a value U for each key
– U may be a tuple
– zeroValue = initial value for U
– seqOp(U, v): how to add value v of input RDD into U
– combOp(U1, U2): how to combine 2 Us (created by partitions), no zeroValue engaged
rdd1 = rddp.aggregateByKey((0,0), lambda U,v: (U[0] + v, U[1] + 1), lambda U1,U2: (U1[0] + U2[0], U1[1] + U2[1]))
– [(2, (12, 3)), (1, (14, 4)), (3, (5, 1))]
rdd1.map(lambda (x, (y, z)): (x, float(y)/z))
– [(2, 4.0), (1, 3.5), (3, 5.0)]
data = sc.parallelize([1, 2], 2)
datakv = data.map(lambda x: (1, x))
datakv.foreachPartition(printf)
– [(1, 1)]
– [(1, 2)]
datakv.aggregateByKey(2, add, add).collect()
– [(1, 7)]
zeroValue in aggregateByKey()– Used only seqOp (i.e.reduction within a partition)
zeroValue in aggregate()– Used in both seqOp and combOp (i.e data.aggregate(2, add, add) => 9)
aggregateByKey vs. reduceByKey:
aggregated value may have different type than input RDD
– v is an integer, while U is a tuple <sum, count>
flatMapValues(func)
1. mapValues part
– For each key k, apply func to its value, return a list [i1, i2, ...]
2. flatMap part
– flatten lists into a single list, retain the key, [(k, i1), (k, i2), ..., (k', i1'), (k', i2'), ...]
rdd1.union(rdd2)--Does not remove duplicates (bag union)
rdd1.intersection(rdd2)--Duplicates will be removed!!
rdd1.subtract(rdd2)--neither set nor bag
rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])
rdd2 = sc.parallelize([1, 2, 2, 5])
rdd1.subtract(rdd2)
– [3, 3, 3]
– 1 not included in result (unlike bag difference)
rdd1.subtractByKey(rdd2)
rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3]).map(lambda x: (x, 1))
rdd2 = sc.parallelize([1, 2, 2, 5]).map(lambda x: (x, 1))
rdd1.subtractByKey(rdd2)
– [(3, 1), (3, 1), (3, 1)]
rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4), (3,5), (2, 4), (1, 5), (2, 6)], 2)
rddp.countByKey() => {1: 4, 2: 3, 3: 1}
(patrick, 4)
(matei, 3)
(patrick, 1)
(aaron, 2)
(aaron, 2)
(reynold, 1)
(aaron, 5)
sc.textFile("ratings.txt").map(lambda s: s[1:-1].split(",")).collect()
shuffling method in spark
hashing-based
partition sort-based
full sort-based
each mapper opens R files locally 1 for each IntSumReducer
sc = SparkContext(appName = "inf551")
linesn = sc.textFile("hello.txt")
Data = sc.parallelize([1,2,3,4,5],2)
Create initial rdd
1. From an external file -- textFile(<path to file>,<optional no. pations>)
E.g. lines = sc.textFile(“./hello.txt”,2)
2. From an python collection (list,tuple,dictionary)
Data = sc.parallelize([1,2,3,4,5],2)
Note: If using spark session. It check immediately
Lines = spark.read.text(“hello.txt”).rdd.map(lambda x:x[0])
Actions
getNumPartitions()
foreachPartition(func)
collect() take(n) count()
sum(), max(), min() , mean()
countByKey()
reduce(func)
aggregate(zeroVal,seqOp,combOp)
takeSample(withReplacement,num,[seed])
1. foreachPartition()
def printf(iterator):
par = list(iterator)
print 'partition:', par
sc.parallelize([1, 2, 3, 4, 5], 2).foreachPartition(printf)
output:
partition: [3, 4, 5]
partition: [1, 2]
2. collect()--Show entire content of RDD
3.take(n)--first n elements
l = [1,2,3,4,5]
rdd = sc.parallelize(l, 2)
rdd.take(3)
=> [1,2,3]
4. count()--no of elements
l = [1,2,3,4,5]
rdd = sc.parallelize(l, 2)
rdd.count()
=> 5
5.sum()-- sums
l = [1,2,3,4,5]
rdd = sc.parallelize(l)
rdd.sum()
=> 15
6.* return !!
countByKey()--Return hashmap (dictionary) of (K, Int) pairs with count of each key
d = [('hello', 1), ('world', 1), ('hello', 2), ('this', 1), ('world',0)]
data = sc.parallelize(d)
data.countByKey()
=> {'this': 1, 'world': 2, 'hello': 2}
7.1 reduce(func)--func to aggregate the elems
func(a,b):--commutative & associative
Takes 2 inputs, a & b, Outputs 1 value, e.g. a + b
reduce(add, [1, 2, 3])=> 6
reduce(lambda a, b: a + b, [1, 2, 3])=> 6
7.2 reduce(func, list, initialValue) = reduce(func, [initialValue] + list)
7.3 reduce(func, list)
x = list[0]
for item in list :
x = func(x, item)
8.aggregate(zeroValue, seqOp, combOp)
8.1 step 1
For each partition p
– reduce(seqOp, p, zeroValue): if p is empty, return zeroValue
# select k, avg(v) from R group by k
step 2
For a list of values, vals, from all partitions:
– reduce(combOp, vals, zeroValue)
8.2 seqOp(), combOp() may not be commutative & associative
seqOp(U, v):
– how to add value v of input RDD into U
– U: accumulator, initial U = zeroValue
– U & v may be different data-type
combOp(U, P):
– how to combine results from multi-partitions
– U: accumulator, initial U = zeroValue
– P: result from a partition
data = sc.parallelize([1], 2)
data.foreachPartition(printf)
P1: []
P2: [1]
data.aggregate(1, add, add)
reduce(add, p, 1)
P1 [] => [1] return zeroValue 1--list [] in rdd, zeroValue is int 1
P2 [1]=> [1] + [1] = [1, 1] => 2
reduce(add, [[1],[2]], 1)=> 4
data.aggregate(2, add, lambda U1, U2: U1 * U2)
[[2][3][2]] =>12
Compute average using aggregate()?
data = sc.parallelize([1, 2, 3, 4, 5])
from operator import add
data.count()
def cf(iterator):
j,sums = 0,0
for i in list[iterator]:
sums += i
j += 1
return (sums, j)
data.aggregate(0, cf, lambda x,y: (x[0]+y[0])/(x[1]+y[1]))
9. takeSample(withReplacement, num, [seed]) = random sample of elems in rdd
withReplacement: True
num: sample size
optional seed: random number generator
l = (x for x in range(100))
sc.parallelize(l).takeSample(False, 5)
– [31, 51, 15, 36, 0]
max() = same in min()
data=[5,4,4,1,2,3,3,1,2,5,4,5]
pdata = sc.parallelize(data)
pdata.reduce(lambda x, y: max(x, y)) => 5
Transformations
• map(func)
• filter(func): a boolean function
• flatMap(func)
• reduceByKey(func, [numTasks])
• groupByKey([numTasks])
• sortByKey([asc], [numTasks])
• distinct([numTasks])
• mapPartitions(func)
• join(rdd,[numTasks]) – leftOuterJoin, rightOuterJoin, fullOuterJoin
• aggregateByKey(zeroValue,seqOp,combOp, [numTasks])
• mapValues(func)
• flatMapValues(func)
• union/intersection/subtract
• subtractByKey
hello.txt
"hello world"
"hello this world"
lines1 = lines.filter(lambda line: "this" in line)
=>['hello this world']
flatMap(func)
func must return a list of elems & flatMap merges lists into a single 1
lines.flatMap(lambda x: x.split())
=>rdd: ['hello', 'world', 'hello', 'this', 'world']
reduceByKey(func)--hadoop: job.setCombinerClass(IntSumReducer);
Input: a collection of (k, v) pairs
Output: a collection of (k, v’) pairs (same k)
• v’:aggregated value of vs in all (k,v)s with same k by applying func
• aggregation function
reduceByKey() returns an RDD– Reduce values per key
reduce() returns a non-RDD value – Reduce all values!
groupByKey()-without func & return (k, Iterable(v))
rddp.groupByKey()
=>[(2, <iterable>), (1, ...), (3, ...)]
rddp.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
=>[(2, [2, 4, 6]), (1, [2, 3, 4, 5]), (3, [5])]
More expensive than reduceByKey(), No local reduction
sortByKey(True/False)
distinct()--Implement it by reduceByKey()
rdd1.join(rdd2)--Joining tuples with same key
ds1 = sc.parallelize([(1,2), (2,3)])
ds2 = sc.parallelize([(2,4), (3,5)])
ds1.join(ds2) = [(2, (3, 4))]
ds1.leftOuterJoin(ds2)– [(1, (2, None)), (2, (3, 4))]
ds1.rightOuterJoin(ds2) – [(2, (3, 4)), (3, (None, 5))]
ds1.fullOuterJoin(ds2)– [(1, (2, None)), (2, (3, 4)), (3, (None, 5))]
mapPartitions(func)
input = 1 iterator (over elems in partition)!!!
func must return 1 iterable = 1 list / yield a generator!!!
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
def sumf(iterator):
sum, count = 0, 0
for v in iterator:
sum += v
count += 1
yield (sum, count) / return [(sum, count)]
rdd.mapPartitions(sumf)
=> rdd: [(3, 2), (12, 3)]
def aggr(x, y):
return (x[0] + y[0], x[1] + y[1])
rdd.mapPartitions(sumf)
reduce(aggr)
=> (15, 5)
rdd.foreachPartition(printf)
output:
patition is: [5, 6, 3, 5]
patition is: [1, 2, 4]
def sumf(iterator):
sum, count = 0, 0
for v in iterator:
sum += v
count += 1
yield (sum, count)
rdd.mapPartitions(sumf).foreachPartition(printf)
output:
patition is: [(1, 1), (3, 2), (7, 3)]
patition is: [(5, 1), (11, 2), (14, 3), (19, 4)]
aggregateByKey(zeroValue,seqOp,combOp)
– Input RDD: a list of (k, v) pairs
– Aggregate values for each key
• Return a value U for each key
– U may be a tuple
– zeroValue = initial value for U
– seqOp(U, v): how to add value v of input RDD into U
– combOp(U1, U2): how to combine 2 Us (created by partitions), no zeroValue engaged
rdd1 = rddp.aggregateByKey((0,0), lambda U,v: (U[0] + v, U[1] + 1), lambda U1,U2: (U1[0] + U2[0], U1[1] + U2[1]))
– [(2, (12, 3)), (1, (14, 4)), (3, (5, 1))]
rdd1.map(lambda (x, (y, z)): (x, float(y)/z))
– [(2, 4.0), (1, 3.5), (3, 5.0)]
data = sc.parallelize([1, 2], 2)
datakv = data.map(lambda x: (1, x))
datakv.foreachPartition(printf)
– [(1, 1)]
– [(1, 2)]
datakv.aggregateByKey(2, add, add).collect()
– [(1, 7)]
zeroValue in aggregateByKey()– Used only seqOp (i.e.reduction within a partition)
zeroValue in aggregate()– Used in both seqOp and combOp (i.e data.aggregate(2, add, add) => 9)
aggregateByKey vs. reduceByKey:
aggregated value may have different type than input RDD
– v is an integer, while U is a tuple <sum, count>
flatMapValues(func)
1. mapValues part
– For each key k, apply func to its value, return a list [i1, i2, ...]
2. flatMap part
– flatten lists into a single list, retain the key, [(k, i1), (k, i2), ..., (k', i1'), (k', i2'), ...]
rdd1.union(rdd2)--Does not remove duplicates (bag union)
rdd1.intersection(rdd2)--Duplicates will be removed!!
rdd1.subtract(rdd2)--neither set nor bag
rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])
rdd2 = sc.parallelize([1, 2, 2, 5])
rdd1.subtract(rdd2)
– [3, 3, 3]
– 1 not included in result (unlike bag difference)
rdd1.subtractByKey(rdd2)
rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3]).map(lambda x: (x, 1))
rdd2 = sc.parallelize([1, 2, 2, 5]).map(lambda x: (x, 1))
rdd1.subtractByKey(rdd2)
– [(3, 1), (3, 1), (3, 1)]
rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4), (3,5), (2, 4), (1, 5), (2, 6)], 2)
rddp.countByKey() => {1: 4, 2: 3, 3: 1}
(patrick, 4)
(matei, 3)
(patrick, 1)
(aaron, 2)
(aaron, 2)
(reynold, 1)
(aaron, 5)
sc.textFile("ratings.txt").map(lambda s: s[1:-1].split(",")).collect()
shuffling method in spark
hashing-based
partition sort-based
full sort-based
each mapper opens R files locally 1 for each IntSumReducer
评论
发表评论