aboutsummaryrefslogtreecommitdiff
path: root/fgc/fc.py
blob: b7f6ed7fc1f98ae3f1db55a0e0964eb2e8698f56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import itertools as it, operator as op, functools as ft



class BInt(object):
	def __init__(self, value=0, limit=1):
		self.value, self._liimit = value, abs(limit)

	@property
	def bounded(self):
		if self.value < -self._limit: self.value = -self._limit
		elif self.value > self._limit: self.value = self._limit
		return self
	@property
	def max(self): return self.value == self.limit
	@property
	def mim(self): return self.value == -self.limit

	def __add__(self, val):
		self.value += self.value
		return self.bounded
	def __sub__(self, val):
		self.value -= self.value
		return self.bounded
	def __nonempty__(self):
		return self.value != 0
	__bool__ = __nonempty__
	def __int__(self): return int(self.value)
	def __long__(self): return long(self.value)
	def __float__(self): return float(self.value)



from collections import deque

class RRQ(deque): # round-robin queue
	dropped = 0
	def __init__(self, limit):
		self._limit = limit
		super(RRQ, self).__init__()
	def _trim(self, size=None):
		if size is None: size = self._limit
		while len(self) > size:
			self.popleft()
			self.dropped += 1
	def append(self, *argz):
		self._trim()
		super(RRQ, self).append(*argz)
	def extend(self, *argz):
		self._trim()
		super(RRQ, self).extend(*argz)
	def flush(self):
		self._trim(0)
		self.dropped = 0
	@property
	def is_full(self):
		return len(self) == self._limit




from time import time, sleep

FC_UNDEF = 0
FC_OK = 1
FC_EMPTY = 2
FC_STARVE = 4

class FC_TokenBucket(object):
	'''Token bucket flow control mechanism implementation.

		Essentially it behaves like a bucket of given capacity (burst),
		which fills by fill_rate (flow) tokens per time unit (tick, seconds).
		Every poll / consume call take tokens to execute, and either
		block until theyre available (consume+block) or return False,
		if specified amount of tokens is not available.
		Blocking request for more tokens when bucket capacity raises an
		exception.

		tick_strangle / tick_free is a functions (or values) to set/adjust
		fill_rate coefficient (default: 1) in case of consequent blocks /
		grabs - cases when bucket fill_rate is constantly lower
		(non-blocking requests doesnt counts) / higher than token
		requests.'''

	_tick_mul = 1
	_spree = FC_UNDEF

	def __init__( self, flow=1, burst=5, tick=1,
			tick_strangle=None, tick_free=None, start=None ):
		'''flow: how many tokens are added per tick;
			burst: bucket size;
			tick (seconds): time unit of operation;
			tick_strangle / tick_free:
				hooks for consequent token shortage / availability,
				can be either int/float/long or a function, accepting
				current flow multiplier as a single argument;
			start:
				starting bucket size, either int/float/long or a function
				of bucket capacity.'''
		self.fill_rate = flow
		self.capacity = burst
		self._tokens = burst if start is None else self._mod(start, burst)
		self._tick = tick
		self._tick_strangle = tick_strangle
		self._tick_free = tick_free
		self._synctime = time()

	_mod = lambda s, method, val: \
		method if isinstance(method, (int, float, long)) else method(s._tick_mul)

	def _flow_adjust(self):
		tc = self.tokens # logic-independent update of the bucket
		if self._spree & FC_STARVE or tc == 0:
			if self._spree & FC_EMPTY: self._strangle()
			self._spree = FC_EMPTY
		else:
			if self._spree & FC_OK: self._free()
			self._spree = FC_OK

	def _free(self): self._tick_mul = self._mod(self._tick_free, self._tick_mul)
	def _strangle(self): self._tick_mul = self._mod(self._tick_strangle, self._tick_mul)
	## Above methods should only be called _right_after_ self._synctime update
	##  (like flow_adjust does), otherwise they'll screw up token flow calculations
	free = lambda s: (s.tokens, s._free) and None
	strangle = lambda s: (s.tokens, s._strangle) and None


	@property
	def tick(self):
		'Current time unit, adjusted by strangle/free functions'
		return self._tick * self._tick_mul

	@property
	def tokens(self):
		'Number of tokens in the bucket at the moment'
		ts = time()
		if self._tokens < self.capacity:
			self._tokens = min( self.capacity,
				self._tokens + self.fill_rate *
					(ts // self.tick - self._synctime // self.tick) )
		self._synctime = ts
		return self._tokens

	def get_eta(self, count=1):
		'Return amount of seconds until the given number of tokens will be available'
		if count > self.capacity:
			## TODO: Implement buffered grab for this case?
			raise ValueError, ( 'Token bucket deadlock:'
				' %s tokens requested, while max capacity is %s'%(count, self.capacity) )
		return self.tick - time() % self.tick

	def consume(self, count=1, block=False):
		'Take tokens from the bucket'
		tc = self.tokens

		if count <= tc: # enough tokens are available
			self._tokens -= count
			self._flow_adjust()
			return True

		elif block: # wait for tokens
			sleep(self.get_eta(count))
			self._spree |= FC_STARVE # to ensure the 'empty' set/check
			return self.consume(count=count, block=block)

		else:
			self._spree = FC_EMPTY | FC_STARVE
			return False

	def poll(self, count=1):
		'Check token availability w/o taking any'
		if count <= self.tokens: return True
		else: return False