Ограничение количества событий за период времени

Мне нужно ограничить количество событий n, разрешенных в течение периода времени deltaT. При любом подходе, который я могу придумать, пространство равно O(m), где m — максимальное количество событийных запросов, отправляемых на deltaT, или O(deltaT/r), где r — приемлемое разрешение.

Редактировать: deltaT - это скользящее временное окно относительно метки времени.

Например: Сохраняйте круговой буфер временных меток событий. При событии обрезаются все временные метки, предшествующие t-deltaT. Отклонить событие, если количество меток времени превышает n. Добавьте метку времени в буфер.

Или создайте кольцевой буфер из целых чисел размером deltaT/r, индексированных по времени относительно текущего с разрешением r. Поддерживать указатель i. По событию увеличивайте i на время, прошедшее с момента последнего события, деленное на r. Обнулить буфер между исходным i и новым. Увеличение на i. Отклонить, если сумма ошибок превышает n.

Какой способ лучше?


Я только что реализовал свое второе предложение выше на С# с фиксированным значением deltaT в 1 с и фиксированным разрешением в 10 мс.

public class EventCap
{
    private const int RES = 10; //resolution in ms

    private int _max;
    private readonly int[] _tsBuffer;
    private int p = 0;
    private DateTime? _lastEventTime;
    private int _length = 1000 / RES;

    public EventCap(int max)
    {
        _max = max;

        _tsBuffer = new int[_length];
    }

    public EventCap()
    {
    }

    public bool Request(DateTime timeStamp)
    {
        if (_max <= 0)
            return true;

        if (!_lastEventTime.HasValue)
        {
            _lastEventTime = timeStamp;
            _tsBuffer[0] = 1;
            return true;
        }

        //A
        //Mutually redundant with B
        if (timeStamp - _lastEventTime >= TimeSpan.FromSeconds(1))
        {
            _lastEventTime = timeStamp;
            Array.Clear(_tsBuffer, 0, _length);
            _tsBuffer[0] = 1;
            p = 0;
            return true;
        }

        var newP = (timeStamp - _lastEventTime.Value).Milliseconds / RES + p;

        if (newP < _length)
            Array.Clear(_tsBuffer, p + 1, newP - p);

        else if (newP > p + _length)
        {
            //B
            //Mutually redundant with A
            Array.Clear(_tsBuffer, 0, _length);
        }
        else
        {
            Array.Clear(_tsBuffer, p + 1, _length - p - 1);
            Array.Clear(_tsBuffer, 0, newP % _length);
        }

        p = newP % _length;
        _tsBuffer[p]++;
        _lastEventTime = timeStamp;

        var sum = _tsBuffer.Sum();

        return sum <= 10;
    }
}

person Martin    schedule 21.09.2012    source источник
comment
я тоже хочу этого .. спасибо за вопрос :)   -  person FUD    schedule 21.09.2012
comment
deltaT — скользящее окно? это означает, что для каждого выбранного deltaT вам нужно максимум n событий, или deltaT идет одно за другим.   -  person Yarneo    schedule 21.09.2012
comment
да, скольжение. @FUD, пока мы не получим ответ, проверьте ниже, может сработать для вас.   -  person Martin    schedule 21.09.2012


Ответы (4)


Как насчет этих переменных: num_events_allowed, time_before, time_now, time_passed

Во время инициализации вы сделаете: time_before = system.timer(), num_events_allowed = n

Когда событие получено, вы делаете следующее:

  time_now = system.timer()
  time_passed = time_now - time_before
  time_before = time_now

  num_events_allowed += time_passed * (n / deltaT);

  if num_events_allowed > n 
      num_events_allowed = n

  if num_events_allowed >= 1
      let event through, num_events_allowed -= 1
  else
      ignore event

Что хорошо в этом алгоритме, так это то, что num_events_allowed фактически увеличивается на время, прошедшее с момента последнего события, и на скорость получения событий, таким образом, вы получаете приращение количества событий, которые вы можете отправить за это time_passed в порядке оставаться в пределах n. Поэтому, если вы получите событие слишком рано, вы увеличите его менее чем на 1, если через слишком много времени вы увеличите его более чем на единицу. Конечно, если событие проходит, вы уменьшаете допуск на 1, так как вы только что получили событие. Если допуск проходит максимальное количество событий, равное n , вы возвращаете его обратно к n , поскольку вы не можете разрешить более n в любой временной фазе. Если припуск меньше 1, вы не можете отправить целое событие, не пропускайте его!

Это алгоритм дырявого ведра: https://en.wikipedia.org/wiki/Leaky_bucket

person Yarneo    schedule 21.09.2012
comment
На практике это будет отлично работать в большинстве случаев. Это будет слишком строго для взрывных событий, не так ли? - person Martin; 22.09.2012
comment
Эй, @Martin, ну, если вы хотите ровно максимум n событий, то это будет хороший алгоритм, если вы хотите примерно n, то он будет слишком строгим, так как вы, вероятно, могли бы найти что-то более свободное, например, периодическая проверка функции, а не для каждое входящее событие. Но в целом это хорошо, потому что это делается за O (1) времени и O (1) пространства. Важно только поддерживать синхронизацию функции, потому что, если 2 события случайно входят в одно и то же время, time_passed и num_events_allowed могут использоваться обоими событиями и вызывать проблемы. - person Yarneo; 22.09.2012
comment
Обнаружил, что я решил это почти точно так же два года назад. Я тогда был умнее. :( - person Martin; 27.09.2012
comment
Привет, @Martin, не беспокойся, я все время забываю, как что-то делать, поэтому я иду и смотрю на то, что сделал несколько лет назад, для справки, это случается со всеми нами :) - person Yarneo; 27.09.2012
comment
Я знаю, что это старо, но я только что наткнулся на него, и он удовлетворил мои потребности, но с одним исключением. Переменная num_events_allowed должна быть инициализирована до 0, иначе вы превысите свою ставку на один период времени. То есть, если вы работаете в течение 10 секунд со скоростью 10 в секунду, вы будете запускать 110, если только оно не будет инициализировано до 0. Значение должно накапливаться. Также num_allowed_events › n должен следовать за num_allowed_event ›= 1, чтобы поддерживать дробные значения n. - person AaronM; 04.04.2014
comment
Даже если вы инициализируете его с 0, ничего не делая в течение периода, он перейдет к 100. Затем в коротком пакете используйте все 100 и через несколько мгновений после того, как он позволит использовать еще несколько. ну вот.. Свыше лимита за один период. Этот ответ плохой. - person insp; 07.12.2014
comment
@insp блестящая мысль. Я вижу единственный способ исправить это — использовать n/2 вместо n. Тогда это хороший приблизительный алгоритм. Это позволяет вам запускать максимум n/2 за раз, но вы гарантируете, что окно будет разрешать только n событий в худшем случае. - person ubershmekel; 24.03.2017

Один из способов сохранить скользящее окно и по-прежнему иметь его O (1) + очень маленькое O (n) для каждого входящего запроса — создать массив целых чисел подходящего размера и сохранить его в виде циклического буфера и дискретизировать входящие запросы (запросы как интегрируется с уровнями дискретизации, как в аналого-цифровом преобразователе, или в виде гистограммы, если вы статистик) и следите за суммой кольцевого буфера, как здесь

assumptions: 
"there can be no more than 1000 request per minute" and 
"we discretize on every second"

int[] buffer = new zeroed int-array with 60 zeroes
int request-integrator = 0 (transactional)
int discretizer-integrator = 0 (transactional)

for each request:
    check if request-integrator < 1000 then
         // the following incs could be placed outside 
         // the if statement for saturation on to many
         // requests (punishment)
         request-integrator++                     // important
         discretizer-integrator++
         proceed with request

once every second:                    // in a transactional memory transaction, for God's saké 
    buffer-index++
    if (buffer-index = 60) then buffer-index=0    // for that circular buffer feeling!
    request-integrator -= buffer[buffer-index]    // clean for things happening one minute ago
    buffer[buffer-index] = discretizer-integrator // save this times value
    discretizer-integrator = 0                    // resetting for next sampling period

Обратите внимание, что увеличение интегратора запросов «можно» делать только один раз в секунду, но это оставляет дыру для насыщения его 1000 запросами или хуже в одну секунду примерно раз в минуту в зависимости от поведения наказания.

person claj    schedule 26.09.2012
comment
и! вы, конечно, также можете установить ограничение на основе секунд (т.е. 50 запросов в секунду), просто добавьте блок или в оператор if для запроса. - person claj; 27.09.2012
comment
Интересно, это комбинация моего кода выше (дискретизация) и буфера токенов, предложенного FUD, не так ли? - person Martin; 27.09.2012
comment
Я думаю, этот вдохновлен реализацией цифрового фильтра нижних частот. Я также думаю, что вы могли бы сделать 1 / dT-суммирование, как Yameo выше, что является моим решением, но для событий, происходящих в особое время - это на самом деле лучшее, поскольку оно меньше и точно. Перейдите к решению Yameos, мое менее элегантно. - person claj; 27.09.2012

Пока читал о различных возможных решениях проблемы. Я наткнулся на алгоритм ведра токенов ( http://en.wikipedia.org/wiki/Token_bucket) . Если я полностью понимаю ваш вопрос, вы можете реализовать алгоритм корзины токенов, фактически не имея корзины с N токенами, вместо этого используя счетчик, который можно соответственно увеличивать и уменьшать. нравится

syncronized def get_token = 
    if count>0 
       { --count, return true }
    else return false

syncronized def add_token = 
    if count==N
       return;
    else ++count

Теперь осталось повторно вызвать add_token во времени deltaT/r.

Чтобы сделать его полностью потокобезопасным, нам потребуется атомарная ссылка для подсчета. Но приведенный выше код должен показать основную идею выполнения этого в памяти O (1).

person FUD    schedule 21.09.2012
comment
Насколько я могу судить, это та же идея, что и у Ярнео, за исключением того, что его реализация добавляет соответствующее количество токенов на событие. - person Martin; 24.09.2012

Я написал приведенный ниже класс (ActionQueue), чтобы ограничить частоту вызовов функций. Одна из приятных вещей заключается в том, что он использует таймер для извлечения объектов из очереди... поэтому ЦП используется минимально (или даже не используется вообще, если очередь пуста)... в отличие от любого метода опроса. .

Пример...

    // limit to two call every five seconds
    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(5), 2);
    public void Test()
    {
        for (var i = 0; i < 10; i++)
        {
            _actionQueue.Enqueue((i2) =>
            {
                Console.WriteLineAction " + i2 + ": " + DateTime.UtcNow);
            }, i);
        }
    }

Пример реального мира...

    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(1), 10);

    public override void SendOrderCancelRequest(Order order, SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var parms = (Tuple<Order, SessionID>)state;
            base.SendOrderCancelRequest(parms.Item1, parms.Item2);
        }, new Tuple<Order, SessionID>(order, sessionID));
    }
    public override void SendOrderMassStatusRequest(SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var sessionID2 = (SessionID)state;
            base.SendOrderMassStatusRequest(sessionID2);
        }, sessionID);
    }

Настоящий класс...

public class ActionQueue
{
    private class ActionState
    {
        public Action<object> Action;
        public object State;
        public ActionState(Action<object> action, object state)
        {
            Action = action;
            State = state;
        }
    }
    Queue<ActionState> _actions = new Queue<ActionState>();
    Queue<DateTime> _times = new Queue<DateTime>();

    TimeSpan _timeSpan;
    int _maxActions;
    public ActionQueue(TimeSpan timeSpan, int maxActions)
    {
        _timeSpan = timeSpan;
        _maxActions = maxActions;           
    }
    public void Enqueue(Action<object> action, object state)
    {
        lock (_times)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);

            if (_times.Count <= _maxActions)
                action(state);
            else
                _actions.Enqueue(new ActionState(action, state));

            CreateDequeueTimerIfNeeded();
        }
    }

    System.Threading.Timer _dequeueTimer;
    protected void CreateDequeueTimerIfNeeded()
    {
        // if we have no timer and we do have times, create a timer
        if (_dequeueTimer == null && _times.Count > 0) 
        {
            var timeSpan = _times.Peek() - DateTime.UtcNow;
            if (timeSpan.TotalSeconds <= 0)
            {
                HandleTimesQueueChange();
            }
            else
            {
                _dequeueTimer = new System.Threading.Timer((obj) =>
                {
                    lock (_times)
                    {
                        _dequeueTimer = null;
                        HandleTimesQueueChange();
                    }
                }, null, timeSpan, System.Threading.Timeout.InfiniteTimeSpan);
            }
        }
    }

    private void HandleTimesQueueChange()
    {
        _times.Dequeue();
        while (_times.Count > 0 && _times.Peek() < DateTime.UtcNow)
            _times.Dequeue();

        while (_actions.Count > 0 && _times.Count < _maxActions)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);
            var actionState = _actions.Dequeue();
            actionState.Action(actionState.State);
        }

        CreateDequeueTimerIfNeeded();
    }
}
person Brian Rice    schedule 28.12.2016