1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
import itertools as it, operator as op, functools as ft
class BInt(object):
def __init__(self, value=0, limit=1):
self.value, self._liimit = value, abs(limit)
@property
def bounded(self):
if self.value < -self._limit: self.value = -self._limit
elif self.value > self._limit: self.value = self._limit
return self
@property
def max(self): return self.value == self.limit
@property
def mim(self): return self.value == -self.limit
def __add__(self, val):
self.value += self.value
return self.bounded
def __sub__(self, val):
self.value -= self.value
return self.bounded
def __nonempty__(self):
return self.value != 0
__bool__ = __nonempty__
def __int__(self): return int(self.value)
def __long__(self): return long(self.value)
def __float__(self): return float(self.value)
from collections import deque
class RRQ(deque): # round-robin queue
dropped = 0
def __init__(self, limit):
self._limit = limit
super(RRQ, self).__init__()
def _trim(self, size=None):
if size is None: size = self._limit
while len(self) > size:
self.popleft()
self.dropped += 1
def append(self, *argz):
self._trim()
super(RRQ, self).append(*argz)
def extend(self, *argz):
self._trim()
super(RRQ, self).extend(*argz)
def flush(self):
self._trim(0)
self.dropped = 0
@property
def is_full(self):
return len(self) == self._limit
from time import time, sleep
FC_UNDEF = 0
FC_OK = 1
FC_EMPTY = 2
FC_STARVE = 4
class FC_TokenBucket(object):
'''Token bucket flow control mechanism implementation.
Essentially it behaves like a bucket of given capacity (burst),
which fills by fill_rate (flow) tokens per time unit (tick, seconds).
Every poll / consume call take tokens to execute, and either
block until theyre available (consume+block) or return False,
if specified amount of tokens is not available.
Blocking request for more tokens when bucket capacity raises an
exception.
tick_strangle / tick_free is a functions (or values) to set/adjust
fill_rate coefficient (default: 1) in case of consequent blocks /
grabs - cases when bucket fill_rate is constantly lower
(non-blocking requests doesnt counts) / higher than token
requests.'''
_tick_mul = 1
_spree = FC_UNDEF
def __init__( self, flow=1, burst=5, tick=1,
tick_strangle=None, tick_free=None, start=None ):
'''flow: how many tokens are added per tick;
burst: bucket size;
tick (seconds): time unit of operation;
tick_strangle / tick_free:
hooks for consequent token shortage / availability,
can be either int/float/long or a function, accepting
current flow multiplier as a single argument;
start:
starting bucket size, either int/float/long or a function
of bucket capacity.'''
self.fill_rate = flow
self.capacity = burst
self._tokens = burst if start is None else self._mod(start, burst)
self._tick = tick
self._tick_strangle = tick_strangle
self._tick_free = tick_free
self._synctime = time()
_mod = lambda s, method, val: \
method if isinstance(method, (int, float, long)) else method(s._tick_mul)
def _flow_adjust(self):
tc = self.tokens # logic-independent update of the bucket
if self._spree & FC_STARVE or tc == 0:
if self._spree & FC_EMPTY: self._strangle()
self._spree = FC_EMPTY
else:
if self._spree & FC_OK: self._free()
self._spree = FC_OK
def _free(self): self._tick_mul = self._mod(self._tick_free, self._tick_mul)
def _strangle(self): self._tick_mul = self._mod(self._tick_strangle, self._tick_mul)
## Above methods should only be called _right_after_ self._synctime update
## (like flow_adjust does), otherwise they'll screw up token flow calculations
free = lambda s: (s.tokens, s._free) and None
strangle = lambda s: (s.tokens, s._strangle) and None
@property
def tick(self):
'Current time unit, adjusted by strangle/free functions'
return self._tick * self._tick_mul
@property
def tokens(self):
'Number of tokens in the bucket at the moment'
ts = time()
if self._tokens < self.capacity:
self._tokens = min( self.capacity,
self._tokens + self.fill_rate *
(ts // self.tick - self._synctime // self.tick) )
self._synctime = ts
return self._tokens
def get_eta(self, count=1):
'Return amount of seconds until the given number of tokens will be available'
if count > self.capacity:
## TODO: Implement buffered grab for this case?
raise ValueError, ( 'Token bucket deadlock:'
' %s tokens requested, while max capacity is %s'%(count, self.capacity) )
return self.tick - time() % self.tick
def consume(self, count=1, block=False):
'Take tokens from the bucket'
tc = self.tokens
if count <= tc: # enough tokens are available
self._tokens -= count
self._flow_adjust()
return True
elif block: # wait for tokens
sleep(self.get_eta(count))
self._spree |= FC_STARVE # to ensure the 'empty' set/check
return self.consume(count=count, block=block)
else:
self._spree = FC_EMPTY | FC_STARVE
return False
def poll(self, count=1):
'Check token availability w/o taking any'
if count <= self.tokens: return True
else: return False
|