注:本文隶属于《理解ASP.NET Core》系列文章,请查看置顶博客或点击此处查看全文目录

概述

在微服务化的架构设计中,网关扮演着重要的看门人角色,它所提供的功能之一就是限流。而对于众多非微服务化的系统来说,可能并不会部署网关(无论是因为成本还是复杂度),在这种场景下,为了实现限流,微软在 .NET 7 中提供了官方的限流中间件。下面我们一起来看一下。

注册限流策略

首先,确保你的应用依赖的 SDK 版本 >= 7,接着通过AddRateLimiter扩展方法注册限流服务,并添加限流策略,然后通过UseRateLimiter启用限流中间件,最后配置某个路由的请求使用限流策略:

builder.Services.AddRateLimiter(limiterOptions =>
{
    // 配置限流策略
});

app.UseRateLimiter();

app.MapGet("LimitTest", async () =>
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return Results.Ok($"Limiter");
}).RequireRateLimiting("my_policy");

微软为我们提供了 4 种常用的限流算法:

  • FixedWindowLimiter:固定窗口限流器
  • SlidingWindowLimiter:滑动窗口限流器
  • TokenBucketLimiter:令牌桶限流器
  • ConcurrencyLimiter:并发限流器

我们通常会注册一个命名限流策略,并在该策略内指定限流算法,以及其他限流逻辑。

另外,需要关注一下UseRateLimiter的调用位置。若限流行为作用于特定路由,则限流中间件必须放置在UseRouting之后。

FixedWindowLimiter

固定窗口限流器是一种简单的限流方式:

  • 工作原理:使用固定的时间长度来限制请求数量。假设固定窗口长度为10s,则每10s就会切换(销毁并创建)一个新的窗口,在每个单独的窗口内,限制请求流量。
  • 特点:
    • 优点:实现简单,占用内存低
    • 缺点:
      • 当窗口中请求流量到达阈值时,流量会被瞬间切断,不能平滑地处理突发流量(实际应用中理想效果是让流量平滑地进入系统中)
      • 窗口切换时可能会出现 2 倍请求流量。比如窗口大小为 1s,阈值为100,窗口 1 在后 500ms 内处理了 100 个请求,窗口 2 在前 500ms 内也处理了 100 个请求,这样就导致在 1s 内处理了 200 个请求
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddFixedWindowLimiter(policyName: "fixed", fixedOptions =>
    {
        fixedOptions.PermitLimit = 4;
        fixedOptions.Window = TimeSpan.FromSeconds(60);
        fixedOptions.QueueLimit = 2;
        fixedOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        fixedOptions.AutoReplenishment = true;
    });
});

public sealed class FixedWindowRateLimiterOptions
{
    public TimeSpan Window { get; set; } = TimeSpan.Zero;

    public bool AutoReplenishment { get; set; } = true;

    public int PermitLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我们通过AddFixedWindowLimiter添加了一个固定窗口限流策略,并指定策略名为fixed。它的含义是窗口时间长度为60s,在每个窗口时间范围内,最多允许4个请求被处理。

各配置项含义如下:

  • PermitLimit:窗口阈值,即每个窗口时间范围内,最多允许的请求个数。这里指定为最多允许4个请求。该值必须 > 0
  • Window:窗口大小,即时间长度。这里设置为 60 s。该值必须 > TimeSpan.Zero
  • QueueLimit
    • 当窗口请求数达到最大时,后续请求会进入排队,用于设置队列的大小(即允许几个请求在里面排队等待)
    • 这里设置为队列中最多允许 2 个请求排队,也就是说,在这个窗口内,可以最多有6个请求,4个会被处理,2个则在排队,其他的则会在一定时间后拒绝返回 RejectionStatusCode
    • 该值必须 >= 0
  • QueueProcessingOrder:排队请求的处理顺序。这里设置为优先处理先来的请求
  • AutoReplenishment:指示开启新窗口时是否自动重置请求限制,该值默认为true。如果设置为false,则需要手动调用 FixedWindowRateLimiter.TryReplenish来重置

SlidingWindowLimiter

滑动窗口限流器是固定窗口限流器的升级版:

  • 工作原理:
    • 在固定窗口限流器的基础上,它再将每个窗口划分为多个段,每经过一个段的时间间隔(= 窗口时间 / 窗口段的个数),窗口就会向后滑动一段,所以称为滑动窗口(窗口大小仍是固定的)。
    • 当窗口滑动后,会“吃进”一个段(称为当前段),并“吐出”一个段(称为过期段),过期段会被回收,回收的请求数可以用于当前段。
  • 特点:
    • 优点:按段滑动处理,相对于固定窗口来说,可以对流量进行更精准的控制,更平滑的处理突发流量,并且段划分的越多,移动更平滑。
    • 缺点:对时间精度要求高,比固定窗口实现复杂,内存占用更高
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddSlidingWindowLimiter(policyName: "sliding", slidingOptions =>
    {
        slidingOptions.PermitLimit = 100;
        slidingOptions.Window = TimeSpan.FromSeconds(30);
        slidingOptions.QueueLimit = 2;
        slidingOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        slidingOptions.AutoReplenishment = true;
        slidingOptions.SegmentsPerWindow = 3;
    });
});

public sealed class SlidingWindowRateLimiterOptions
{
    public TimeSpan Window { get; set; } = TimeSpan.Zero;

    public int SegmentsPerWindow { get; set; }

    public bool AutoReplenishment { get; set; } = true;

    public int PermitLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我们通过AddSlidingWindowLimiter添加了一个滑动窗口限流策略,并指定策略名为sliding。它的含义是窗口时间长度为30s,在每个窗口时间范围内,最多允许100个请求,窗口段数为 3,每个段的时间间隔为 30s / 3 = 10s,即窗口每 10s 滑动一段。

各配置项含义如下:

  • PermitLimit:窗口阈值,即每个窗口时间范围内,最多允许的请求个数。这里指定为最多允许100个请求。该值必须 > 0
  • Window:窗口大小,即时间长度。这里设置为 30 s。该值必须 > TimeSpan.Zero
  • QueueLimit
    • 当窗口请求数达到最大时,后续请求会进入排队,用于设置队列的大小(即允许几个请求在里面排队等待)
    • 这里设置为队列中最多允许 2 个请求排队
    • 该值必须 >= 0
  • QueueProcessingOrder:排队请求的处理顺序。这里设置为优先处理先来的请求
  • AutoReplenishment:指示开启新窗口时是否自动重置请求限制,该值默认为true。如果设置为false,则需要手动调用 SlidingWindowRateLimiter.TryReplenish来重置
  • SegmentsPerWindow:每个窗口的段的个数,通过它可以计算出每个段滑动的时间间隔。这里设置段数为 3,时间间隔为 10s。该值必须 > 0

为了更好地理解滑动窗口限流器的工作原理,下面我会借用官方文档提供的一张图来详细解释一下:

image

假设:限制每个窗口的请求数为 100,窗口时间为 30s,每个窗口的段数为 3,那么每个段的时间间隔就是 30s / 3 = 10s。
定义:当前段结存请求数 = 当前段可用请求数 - 处理请求数 + 回收请求数

限流器工作流程:

  1. 在第 1 个段时(0s~10s),当前段可用请求数为 100,处理了 20 个请求,回收请求 0 个,那么结存请求数 = 100 - 20 + 0 = 80
  2. 在第 2 个段时(10s~20s),当前段可用请求数为 80,处理了 30 个请求,回收请求 0 个,那么结存请求数 = 80 - 30 + 0 = 50
  3. 在第 3 个段时(20s~30s),当前段可用请求数为 50,处理了 40 个请求,回收请求 0 个,那么结存请求数 = 50 - 40 + 0 = 10
  4. 在第 4 个段时(30s~40s),当前段可用请求数为 10,处理了 30 个请求,回收第 1 个段的请求 20 个,那么结存请求数 = 10 - 30 + 20 = 0
  5. 在第 5 个段时(40s~50s),当前段可用请求数为 0,处理了 10 个请求,回收第 2 个段的请求 30 个,那么结存请求数 = 0 - 10 + 30 = 20
  6. 在第 6 个段时(50s~60s),当前段可用请求数为 20,处理了 10 个请求,回收第 3 个段的请求 40 个,那么结存请求数 = 20 - 10 + 40 = 50

TokenBucketLimiter

令牌桶限流器是一种限制数据平均传输速率的限流算法:

  • 工作原理:想象有一个桶,每个固定时间段会向桶内放入固定数量的令牌(token),当桶内令牌装满时,新的令牌将会被丢弃。当请求流量进入时,会先从桶内拿 1 个令牌,拿到了则该请求会被处理,没拿到则会在队列中等待,若队列已满,则会被限流拒绝处理。
  • 特点:可以限制数据的平均传输速率,还可以一次性耗尽令牌应对突发流量,并平滑地处理后续流量,是一种通用的算法

以下图为例,桶内有 3 个令牌(token),进来了 5 个请求,前三个请求可以拿到令牌(token),它们会被处理,后面两个就只能排队或被限流拒绝。

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddTokenBucketLimiter(policyName: "token_bucket", tokenBucketOptions =>
    {
        tokenBucketOptions.TokenLimit = 4;
        tokenBucketOptions.ReplenishmentPeriod = TimeSpan.FromSeconds(10);
        tokenBucketOptions.TokensPerPeriod = 2;
        tokenBucketOptions.QueueLimit = 2;
        tokenBucketOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        tokenBucketOptions.AutoReplenishment = true;
    });
});

public sealed class TokenBucketRateLimiterOptions
{
    public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;

    public int TokensPerPeriod { get; set; }

    public bool AutoReplenishment { get; set; } = true;

    public int TokenLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我们通过AddTokenBucketLimiter添加了一个令牌桶限流策略,并指定策略名为token_bucket。它的含义是桶最多可以装 4 个令牌,每 10s 发放一次令牌,每次发放 2 个令牌,所以在一个发放周期内,最多可以处理 4 个请求,至少可以处理 2 个请求

各配置项含义如下:

  • TokenLimit:桶最多可以装的令牌数,发放的多余令牌会被丢弃。这里设置为最多装 4 个令牌。该值必须 > 0
  • ReplenishmentPeriod:令牌发放周期,即多长时间发放一次令牌。这里设置为 10 s。该值必须 > TimeSpan.Zero
  • TokensPerPeriod:每个周期发放的令牌数,即每个周期向桶内放入的令牌数(若超过桶可装令牌数的最大值,则会被丢弃)。这里设置为 2 个。该值必须 > 0
  • QueueLimit
    • 当桶内的令牌全部被拿完(token 数为 0)时,后续请求会进入排队,用于设置队列的大小(即允许几个请求在里面排队等待)
    • 这里设置为队列中最多允许 2 个请求排队
    • 该值必须 >= 0
  • QueueProcessingOrder:排队请求的处理顺序。这里设置为优先处理先来的请求
  • AutoReplenishment:指示当进入新的令牌发放周期时,是否自动发放令牌,该值默认为true。如果设置为false,则需要手动调用 TokenBucketRateLimiter.TryReplenish来发放

ConcurrencyLimiter

并发限流器不是限制一段时间内的最大请求数,而是限制并发数:

  • 工作原理:限制同一时刻并发请求的数量
  • 特点:可以充分利用服务器性能,当出现突发流量时,服务器负载可能会持续过高。
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddConcurrencyLimiter(policyName: "concurrency", concurrencyOptions =>
    {
        concurrencyOptions.PermitLimit = 4;
        concurrencyOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        concurrencyOptions.QueueLimit = 2;
    });
});

public sealed class ConcurrencyLimiterOptions
{
    public int PermitLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我们通过AddConcurrencyLimiter添加了一个并发限流策略,并指定策略名为concurrency。它的含义是最多可以并发4个请求被处理。

各配置项含义如下:

  • PermitLimit:最多并发的请求数。该值必须 > 0
  • QueueLimit
    • 当并发请求数达到最大时,后续请求会进入排队,用于设置队列的大小(即允许几个请求在里面排队等待)
    • 这里设置为队列中最多允许 2 个请求排队
    • 该值必须 >= 0
  • QueueProcessingOrder:排队请求的处理顺序。这里设置为优先处理先来的请求

RateLimiterOptions

上面已经把常用的限流算法介绍完了,下面来看一下可以通过limiterOptions进行哪些配置:

public sealed class RateLimiterOptions
{
    // 仅保留了常用的配置项,其他相关代码均忽略

    // 全局限流器
    public PartitionedRateLimiter<HttpContext>? GlobalLimiter { get; set; }

    // 当请求被限流拒绝时执行
    public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; set; }

    // 当期你去被限流拒绝时的 Http 响应状态码
    public int RejectionStatusCode { get; set; } = StatusCodes.Status503ServiceUnavailable;
}

GlobalLimiter

通过GlobalLimiter,我们可以设置全局限流器,更准确的说法是全局分区限流器,该限流器会应用于所有请求。执行顺序为先执行全局限流器,再执行特定于路由终结点的限流器(如果存在的话)。

需要注意的是,相对于上面注册的限流策略来说,GlobalLimiter已经是一个限流器实例了,所以需要分配给他一个分区限流器实例,通过PartitionedRateLimiter.Create来创建。

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, IPAddress>(context =>
    {
        IPAddress? remoteIpAddress = context.Connection.RemoteIpAddress;

        // 针对非回环地址限流
        if (!IPAddress.IsLoopback(remoteIpAddress!))
        {
            return RateLimitPartition.GetTokenBucketLimiter
            (remoteIpAddress!, _ =>
                new TokenBucketRateLimiterOptions
                {
                    TokenLimit = 4,
                    QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                    QueueLimit = 2,
                    ReplenishmentPeriod = TimeSpan.FromSeconds(10),
                    TokensPerPeriod = 10,
                    AutoReplenishment = true
                });
        }

        // 若为回环地址,则不限流
        return RateLimitPartition.GetNoLimiter(IPAddress.Loopback);
    });
});

链式组合的限流器

它并不是一个新类型的限流器,而是可以将我们上面提到的分区限流器进行组合而得到一个新的分区限流器。

例如我可以将包含固定窗口限流逻辑的分区限流器和将包含并发限流逻辑的分区限流器组合进行组合,那么应用该限流器的请求就会先被固定窗口限流器处理,再被并发限流器处理,任意一个被限流,就会被拒绝。

var chainedLimiter = PartitionedRateLimiter.CreateChained(
    PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
    {
        var userAgent = httpContext.Request.Headers.UserAgent.ToString();

        return RateLimitPartition.GetFixedWindowLimiter
        (userAgent, _ =>
            new FixedWindowRateLimiterOptions
            {
                AutoReplenishment = true,
                PermitLimit = 4,
                Window = TimeSpan.FromSeconds(2)
            });
    }),
    PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
    {
        var userAgent = httpContext.Request.Headers.UserAgent.ToString();

        return RateLimitPartition.GetConcurrencyLimiter
        (userAgent, _ =>
            new ConcurrencyLimiterOptions
            {
                PermitLimit = 4,
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = 2
            });
    })
);

目前链式组合的限流器只能用于全局限流器,而不能用于终结点限流器。

RejectionStatusCode

通过RejectionStatusCode,我们可以设置请求被限流拒绝后,http默认的响应状态码。默认为 503 服务不可用,我们可以指定为 429 过多的请求。

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.RejectionStatusCode = StatusCodes.Status429TooManyRequests;
});

另外,该状态码可以在OnRejected中被重写,具体参见下小节。

OnRejected

当请求被限流时,会触发回调OnRejected,通过该委托我们可以针对 http 响应进行自定义配置:

  • RetryAfter:设置响应头Retry-After,指示多长时间后重试请求。需要注意的是,并发限流器无法获取到 RetryAfter,因为它不是时间段的限流,而是限制的并发数
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.OnRejected = (context, cancellationToken) =>
    {
        if (context.Lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
        {
            context.HttpContext.Response.Headers.RetryAfter =
                ((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo);
        }

        // 可以重新设置响应状态码,会覆盖掉上面设置的 limiterOptions.RejectionStatusCod
        context.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
        context.HttpContext.RequestServices.GetService<ILoggerFactory>()?
            .CreateLogger("Microsoft.AspNetCore.RateLimitingMiddleware")
            .LogWarning("OnRejected: {GetUserEndPoint}", GetUserEndPoint(context.HttpContext));

        return ValueTask.CompletedTask;
    };
});

自定义限流策略

上述提到的限流策略,并不能满足我们所有的需求,所以了解如何自定义限流策略是我们的必修课。

在开始编码之前,你需要了解以下内容:

  • 上述使用AddXXXLimiter添加的限流策略,内部实际上调用了AddPolicy(后面的部分会详细介绍)
  • 上述使用AddXXXLimiter添加的限流策略,每种策略只有一个分区,即使用了该限流策略的路由共享一个分区。例如通过AddFixedWindowLimiter添加了限流策略“fixed”,窗口阈值为 10,并有 10 个路由使用了该策略,那么在一个窗口内,这 10 个路由总的请求数达到 10,那这 10 个路由后续的请求都会被限流。

下面我们就借助AddPolicy,分别使用两种方式添加一个自定义策略“my_policy”:一个用户一个分区,匿名用户共享一个分区

通过委托创建自定义限流策略

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddPolicy(policyName: "my_policy", httpcontext =>
    {
        var userId = "anonymous user";
        if (httpcontext.User.Identity?.IsAuthenticated is true)
        {
            userId = httpcontext.User.Claims.First(c => c.Type == "id").Value;
        }

        return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new 
            FixedWindowRateLimiterOptions
            {
                PermitLimit = 3,
                Window = TimeSpan.FromSeconds(60),
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = 0
            });
    });
});

通过实现 IRateLimiterPolicy 创建自定义限流策略

public interface IRateLimiterPolicy<TPartitionKey>
{
    // 若不为空,则执行它(不会执行全局的),如果它为空,则执行全局的
    Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }

    // 获取限流分区
    RateLimitPartition<TPartitionKey> GetPartition(HttpContext httpContext);
}

public class MyRateLimiterPolicy : IRateLimiterPolicy<string>
{
    // 可以通过依赖注入参数
    public MyRateLimiterPolicy(ILogger<MyRateLimiterPolicy> logger)
    {
        // 可以设置自己的限流拒绝回调逻辑,而不使用上面全局设置的 limiterOptions.OnRejected
        OnRejected = (ctx, token) =>
        {
            ctx.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;

            logger.LogWarning($"Request rejected by {nameof(MyRateLimiterPolicy)}");

            return ValueTask.CompletedTask;
        };
    }

    public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }

    public RateLimitPartition<string> GetPartition(HttpContext httpContext)
    {
        var userId = "anonymous user";
        if (httpContext.User.Identity?.IsAuthenticated is true)
        {
            userId = httpContext.User.Claims.First(c => c.Type == "id").Value;
        }

        return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new 
            FixedWindowRateLimiterOptions
            {
                PermitLimit = 3,
                Window = TimeSpan.FromSeconds(60),
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = 0
            });
    }
}

// 记得注册它
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddPolicy<string, MyRateLimiterPolicy>(policyName: "my_policy");
}

应用限流策略

RequireRateLimiting & DisableRateLimiting

可以一次性为所有 controller 应用限流策略

app.MapControllers().RequireRateLimiting("fixed");

也可以为指定路由应用限流策略

app.MapGet("LimitTest", () =>{ }).RequireRateLimiting("fixed");

实质上,RequireRateLimitingDisableRateLimiting是通过向终结点元数据中EnableRateLimitingDisableRateLimiting两个特性来实现的。

public static class RateLimiterEndpointConventionBuilderExtensions
{
    public static TBuilder RequireRateLimiting<TBuilder>(this TBuilder builder, string policyName) where TBuilder : IEndpointConventionBuilder
    {
        builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(policyName)));
        return builder;
    }

    public static TBuilder RequireRateLimiting<TBuilder, TPartitionKey>(this TBuilder builder, IRateLimiterPolicy<TPartitionKey> policy) where TBuilder : IEndpointConventionBuilder
    {
        builder.Add(endpointBuilder =>
        {
            endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(new 
                DefaultRateLimiterPolicy(
                    RateLimiterOptions.ConvertPartitioner<TPartitionKey>(null, policy.GetPartition), policy.OnRejected)));
        });
        return builder;
    }
        
    public static TBuilder DisableRateLimiting<TBuilder>(this TBuilder builder) where TBuilder : IEndpointConventionBuilder
    {
        builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(DisableRateLimitingAttribute.Instance));
        return builder;
    }
}

EnableRateLimitingAttribute & DisableRateLimitingAttribute

Controller层面,我们可以方便的使用特性来标注使用或禁用限流策略。这两个特性可以标注在Controller类上,也可以标注在类的方法上。

但需要注意的时,如果前面使用了RequireRateLimitingDisableRateLimiting扩展方法,由于它们在元数据中添加特性比直接使用特性标注要晚,所以它们的优先级很高,会覆盖掉这里使用的策略。建议不要针对所有 Controller 使用RequireRateLimitingDisableRateLimiting

下面是一个应用示例:

[EnableRateLimiting("fixed")]   // 针对整个 Controller 使用限流策略 fixed
public class WeatherForecastController : ControllerBase
{
    // 会使用 Controller 类上标注的 fixed 限流策略
    [HttpGet(Name = "GetWeatherForecast")]
    public string Get() => "Get";
    
    [HttpGet("Hello")]
    [EnableRateLimiting("my_policy")]   // 会使用 my_policy 限流策略,而不会使用 fixed
    public string Hello() => "Hello";
    
    [HttpGet("disable")]
    [DisableRateLimiting]   // 禁用任何限流策略
    public string Disable() => "Disable";
}

设计原理

为了方便理解接下来的内容,先明确几个容易混淆的类型的概念:

  • RateLimitPartition:限流分区,TKey表示分区的 Key,被同一限流分区作用的请求会互相影响,不同限流分区则不影响。
  • RateLimitPartition:非泛型的,它只是个静态类,用来快速创建限流分区RateLimitPartition<TKey>
  • PartitionedRateLimiter:分区限流器,即包含了限流分区的限流器,内部会使用各个限流分区对不同请求进行限流。TResource表示被限流的资源类型,比如 Http 请求类型为HttpContext。限流中间件就是通过它来进行限流操作的。
  • PartitionedRateLimiter:非泛型的,同样只是个静态类,用来快速创建分区限流器PartitionedRateLimiter<TResource>

篇幅所限,下方示例列出的源码会忽略一部分非核心代码。

AddRateLimiter

AddRateLimiter很简单,只是单纯的进行选项配置:

public static class RateLimiterServiceCollectionExtensions
{
    public static IServiceCollection AddRateLimiter(this IServiceCollection services, Action<RateLimiterOptions> configureOptions)
    {
        services.Configure(configureOptions);
        return services;
    }
}

AddXXXLimiter

以下仅以AddFixedWindowLimiter为例进行讲解,其他三个都是类似的。

public static class RateLimiterOptionsExtensions
{
    public static RateLimiterOptions AddFixedWindowLimiter(this RateLimiterOptions options, string policyName, Action<FixedWindowRateLimiterOptions> configureOptions)
    {
        var key = new PolicyNameKey() { PolicyName = policyName };
        var fixedWindowRateLimiterOptions = new FixedWindowRateLimiterOptions();
        configureOptions.Invoke(fixedWindowRateLimiterOptions);
        fixedWindowRateLimiterOptions.AutoReplenishment = false;
        return options.AddPolicy(policyName, context =>
        {
            return RateLimitPartition.GetFixedWindowLimiter(key,
                _ => fixedWindowRateLimiterOptions);
        });
    }
}

首先是配置选项,可以看到它把AutoReplenishment强制设置为了false,不对啊,如果这样设置岂不是要我来手动调用TryReplenish来重置次数了。其实不然,我们一会看GetFixedWindowLimiter的实现就知道原因了。

接着就是调用AddPolicy,传入策略名和一个委托来添加策略,该委托会返回一个限流分区,分区内可以通过工厂获取限流器实例。可以看到该策略的分区 key 是固定不变的,即该策略共享一个限流分区。

public static class RateLimitPartition
{
    public static RateLimitPartition<TKey> GetFixedWindowLimiter<TKey>(
        TKey partitionKey,
        Func<TKey, FixedWindowRateLimiterOptions> factory)
    {
        return Get(partitionKey, key =>
        {
            FixedWindowRateLimiterOptions options = factory(key);
            if (options.AutoReplenishment is true)
            {
                options = new FixedWindowRateLimiterOptions
                {
                    PermitLimit = options.PermitLimit,
                    QueueProcessingOrder = options.QueueProcessingOrder,
                    QueueLimit = options.QueueLimit,
                    Window = options.Window,
                    AutoReplenishment = false
                };
            }
            return new FixedWindowRateLimiter(options);
        });
    }
    
    public static RateLimitPartition<TKey> Get<TKey>(
        TKey partitionKey,
        Func<TKey, RateLimiter> factory)
    => new RateLimitPartition<TKey>(partitionKey, factory);

可以看到,如果AutoReplenishmenttrue,会重新new一个新选项,这个新的选项仅仅是将AutoReplenishment设置为false。为什么呢?这是因为如果它为true,那么每一个FixedWindowRateLimiters实例(即限流分区)都会有一个自己的定时器来定时补充许可,这无疑是很浪费的。所以将它设置为false,由分区限流器中的的定时器来统一管理其下的所有分区,降低资源消耗,不用担心,微软已经帮我们实现好了(具体在RateLimitingMiddleware小节中会介绍),不需要自己实现。

策略被保存到RateLimiterOptionsPolicyMapUnactivatedPolicyMap中,其中:

  • PolicyMap是指已经创建了创建了策略实例的限流策略集
  • UnactivatedPolicyMap是指还未创建策略实例的限流策略集,它保存的不是策略实例,而是创建策略的委托。这种一般是实现了IRateLimiterPolicy<TPartitionKey>接口的策略,我们需要在运行时向它的构造函数注入一些参数。
    它们俩都是用于提供限流策略,只不过前者已经构造好了实例,可以直接拿来用,后者则需要在运行时创建实例,然后才能用。

我们的固定窗口限流器策略显然是存放到PolicyMap中,:

public sealed class RateLimiterOptions
{
    internal Dictionary<string, DefaultRateLimiterPolicy> PolicyMap { get; }
        = new Dictionary<string, DefaultRateLimiterPolicy>(StringComparer.Ordinal);

    internal Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>> UnactivatedPolicyMap { get; }
        = new Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>>(StringComparer.Ordinal);

    public RateLimiterOptions AddPolicy<TPartitionKey>(string policyName, Func<HttpContext, RateLimitPartition<TPartitionKey>> partitioner)
    {
        // 策略名不能重复
        if (PolicyMap.ContainsKey(policyName) || UnactivatedPolicyMap.ContainsKey(policyName))
        {
            throw ...;
        }

        PolicyMap.Add(policyName, new DefaultRateLimiterPolicy(ConvertPartitioner<TPartitionKey>(policyName, partitioner), null));

        return this;
    }
}

可以看到,承载策略的实例类型均为DefaultRateLimiterPolicy,即使你是注册的IRateLimiterPolicy<TPartitionKey>类型的策略,最终也是会转化为DefaultRateLimiterPolicy

RateLimiter

现在限流器实例的获取方式已经知道了,那接下来详细看一下FixedWindowRateLimiter的详细设计吧。

首先,所有限流器均继承自抽象类RateLimiter

public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
    public abstract RateLimiterStatistics? GetStatistics();

    public abstract TimeSpan? IdleDuration { get; }

    public RateLimitLease AttemptAcquire(int permitCount = 1)
    => AttemptAcquireCore(permitCount);

    protected abstract RateLimitLease AttemptAcquireCore(int permitCount);

    public ValueTask<RateLimitLease> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default)
    => AcquireAsyncCore(permitCount, cancellationToken);

    protected abstract ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken);
}

其中,我们需要重点关注以下成员:

  • IdleDuration:空闲周期,即该限流器有多长时间始终保持着最大可用许可数了。
    • 例如,一个限流器在时间点A,重新发放了许可,没有一个请求来获取许可,那么它的空闲周期就是当前时间 - A,当有请求获取许可时,空闲周期就会被置为null
    • 限流管理器会通过它来清理未使用的限流器
  • GetStatistics:获取统计数据,主要包含当前有多少可用的许可、当前排队的请求个数、许可出租成功的总次数以及许可出租失败的总次数。
  • AttemptAcquire:尝试获取许可,当请求获取到许可时,则会被处理,否则会被限流拒绝。
    • 它接收一个permitCount参数,表示想要获取的许可数量,默认值为 1。它所允许的值范围是 >= 0,当传入 0 时,表示查看是否还能获取到许可(不会消耗许可数)。
    • 返回值类型RateLimitLease拥有一个bool IsAcquired属性,表示许可是否获取成功
  • AcquireAsync:异步获取许可,它会一直等待,直到成功获取到许可,或者无法获取足够的许可(比如排队队列装不下),才会返回结果。
    • 它接收一个permitCount参数,表示想要获取的许可数量,默认值为 1。它所允许的值范围是 >= 0,当传入 0 时,它会一直等待,直到可以获取到许可,或者再也不能获取到许可了(不会消耗许可数)。
    • 同样的,返回值类型RateLimitLease拥有一个bool IsAcquired属性,表示许可是否获取成功

接着,对于FixedWindowLimiterSlidingWindowLimiterTokenBucketLimiter来说,它们都是时间范围的限流算法,都具备Replenish性质,所以又抽象出一层ReplenishingRateLimiter

public abstract class ReplenishingRateLimiter : RateLimiter
{
    // 许可发放周期
    public abstract TimeSpan ReplenishmentPeriod { get; }
    
    // 是否自动补充许可
    public abstract bool IsAutoReplenishing { get; }

    // 尝试补充许可
    // 当 AutoReplenishment == true 时,不会执行补充许可的逻辑,因为它是自动的,不允许手动干预
    public abstract bool TryReplenish();
}

ConcurrencyLimiter 直接继承自 RateLimiter

最后具体看一下FixedWindowRateLimiter的详细实现,先来看构造函数以及一些常用属性:


public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter
{
    // 用于重新补充许可的定时器
    private readonly Timer? _renewTimer;
    // 选项,会 clone 一份构造函数传进来的 options
    private readonly FixedWindowRateLimiterOptions _options;

    // 指示许可租赁成功的结果
    private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null);
    // 指示许可租赁失败的结果
    private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null);

    // 空闲周期
    public override TimeSpan? IdleDuration => ...;

    // 是否自动补充许可
    public override bool IsAutoReplenishing => _options.AutoReplenishment;

    // 许可发放周期,对于固定窗口来说,就是窗口大小
    public override TimeSpan ReplenishmentPeriod => _options.Window;

    public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
    {
        // 省略部分代码...
        
        // 如果 AutoReplenishment == true,则会创建定时器,用于定时补充许可
        // 不过我们从前面可以得知,传递到这里的是 false,所以定时器并不会被创建
        if (_options.AutoReplenishment)
        {
            _renewTimer = new Timer(Replenish, this, _options.Window, _options.Window);
        }
    }
}

接下来是补充许可TryReplenish的实现:

public override bool TryReplenish()
{
    // 当 AutoReplenishment == true 时,不会执行补充许可的逻辑,因为它是自动的,不允许手动干预
    if (_options.AutoReplenishment)
    {
        return false;
    }
    Replenish(this);
    return true;
}

private static void Replenish(object? state)
{
    FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!;

    // 获取当前时间
    long nowTicks = Stopwatch.GetTimestamp();
    limiter!.ReplenishInternal(nowTicks);
}

private void ReplenishInternal(long nowTicks)
{
    // 如果当前时间距离上次许可发放时间还没达到窗口大小,则直接返回
    if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
    {
        return;
    }

    int availablePermitCounters = _permitCount;
    if (availablePermitCounters >= _options.PermitLimit)
    {
        // 如果当前可用许可数 >= 限流器配置的最大许可数,则无须重新发放,直接返回
        return;
    }

    // 补充许可
    _permitCount = _options.PermitLimit;

    // 先处理排队的请求
    while (_queue.Count > 0)
    {
        // 根据 QueueProcessingOrder 从队列中找到(Peek)最老或最新的请求
        RequestRegistration nextPendingRequest =
              _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
              ? _queue.PeekHead()
              : _queue.PeekTail();

        // 若请求已完成处理,则只需要将它移出队列(Dequeue),并释放资源即可。
        // 请求已完成可能的原因如下:
        //  1. 已被取消
        //  2. 当 QueueProcessingOrder 设置为 NewestFirst 时,新来的请求把老的踢出了队列
        if (nextPendingRequest.Tcs.Task.IsCompleted)
        {
            nextPendingRequest =
                _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                ? _queue.DequeueHead()
                : _queue.DequeueTail();
            nextPendingRequest.CancellationTokenRegistration.Dispose();
        }
        // 若可用的许可数足够,则从队列中取出请求并处理
        else if (_permitCount >= nextPendingRequest.Count)
        {
            nextPendingRequest =
                _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                ? _queue.DequeueHead()
                : _queue.DequeueTail();

            // 扣减
            _queueCount -= nextPendingRequest.Count;
            _permitCount -= nextPendingRequest.Count;

            // 向请求补充许可
            // 若发放失败,这还原扣减
            if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
            {
                _permitCount += nextPendingRequest.Count;
                _queueCount += nextPendingRequest.Count;
            }
            
            // 释放资源
            nextPendingRequest.CancellationTokenRegistration.Dispose();
        }
        else
        {
            // 请求无法被处理,直接跳出
            break;
        }
    }

    if (_permitCount == _options.PermitLimit)
    {
        // 当可用许可数等于配置的最大许可数,则开始计算空闲周期
        _idleSince = Stopwatch.GetTimestamp();
    }
}

下面一起看一下许可是如何租出去的。由于异步的AcquireAsyncCore基本包含了同步的AttemptAcquireCore的处理逻辑,所以下面就只看AcquireAsyncCore。需要着重说一下的是,同步的AttemptAcquireCore是不会进行入队操作的。

源码里面其实有很多锁,为了便于理解我都删除了。

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
    // 当申请的许可数 == 0,并且可用许可数 > 0 时,则直接返回 SuccessfulLease,表示限流器还有可用许可
    // 对于同步的 AttemptAcquireCore 方法来说,若此时可用许可数为 0,则会直接返回 FailedLease,表示限流器没有可用许可
    if (permitCount == 0 && _permitCount > 0)
    {
        return new ValueTask<RateLimitLease>(SuccessfulLease);
    }

    // 尝试租赁
    if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
    {
        return new ValueTask<RateLimitLease>(lease);
    }

    // 如果队列装不下要申请许可的所有请求
    if (_options.QueueLimit - _queueCount < permitCount)
    {
        // 如果优先处理新来的,并且要申请许可的请求数没有超过队列的大小限制,
        // 则将队列中老的请求踢出队列,直到为新来的请求留出足够的空间,准备将新来的请求加进去
        if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
        {
            do
            {
                RequestRegistration oldestRequest = _queue.DequeueHead();
                _queueCount -= oldestRequest.Count;
                // 设置老请求申请许可失败
                if (!oldestRequest.Tcs.TrySetResult(FailedLease))
                {
                    _queueCount += oldestRequest.Count;
                }
                oldestRequest.CancellationTokenRegistration.Dispose();
            }
            while (_options.QueueLimit - _queueCount < permitCount);
        }
        else
        {   
            // 如果优先处理后来的,则只能返回 失败
            return new ValueTask<RateLimitLease>(CreateFailedWindowLease(permitCount));
        }
    }

    // 这部分代码不用太在意
    CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
    CancellationTokenRegistration ctr = default;
    if (cancellationToken.CanBeCanceled)
    {
        ctr = cancellationToken.Register(static obj =>
        {
            ((CancelQueueState)obj!).TrySetCanceled();
        }, tcs);
    }

    RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr);
    // 将新请求加入到队尾
    _queue.EnqueueTail(registration);
    _queueCount += permitCount;

    // 异步可等待,直到 Task 执行完成获取到结果(可能是申请成功,也可能是失败)
    return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}

TryLeaseUnsynchronized具体逻辑如下:

private bool TryLeaseUnsynchronized(int permitCount, out RateLimitLease? lease)
{
    // 若可用的许可数足够,且不为 0
    if (_permitCount >= permitCount && _permitCount != 0)
    {
        // 租赁的许可为0,则直接返回 成功
        if (permitCount == 0)
        {
            lease = SuccessfulLease;
            return true;
        }

        // 若:
        //  1. 没有排队的请求
        //  2. 或有排队的请求,但是 QueueProcessingOrder 被设置为 NewestFirst
        // 则租赁成功,其他则租赁失败(因为要先把排队的处理完)
        if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
        {
            // 许可租赁出去了,也就表示该限流器不空闲了
            _idleSince = null;
            _permitCount -= permitCount;
            lease = SuccessfulLease;
            return true;
        }
    }

    // 租赁失败
    lease = null;
    return false;
}

RateLimitingMiddleware

现在,我们已经掌握了限流器补充许可和租赁许可的细节逻辑了,并且也得知并没有使用限流器内部的定时器去定时补充许可,那这是由谁补充的呢?又是由谁为请求申请的许可呢?

没错,这都是RateLimitingMiddleware负责的。

在构造方法中,我们需要重点关注下CreateEndpointLimiter,它创建了终结点分区限流器,与全局限流器一起提供限流服务。

internal sealed partial class RateLimitingMiddleware
{
    // 默认被限流拒绝回调的委托,取自 options.OnRejected
    private readonly Func<OnRejectedContext, CancellationToken, ValueTask>? _defaultOnRejected;
    // 全局限流器,取自 options.GlobalLimiter
    private readonly PartitionedRateLimiter<HttpContext>? _globalLimiter;
    // 终结点限流器
    private readonly PartitionedRateLimiter<HttpContext> _endpointLimiter;
    // 限流响应状态码,取自 options.RejectionStatusCode
    private readonly int _rejectionStatusCode;
    // 限流策略集,取自 options.PolicyMap 和 options.UnactivatedPolicyMap
    private readonly Dictionary<string, DefaultRateLimiterPolicy> _policyMap;
    
    public RateLimitingMiddleware(RequestDelegate next, ILogger<RateLimitingMiddleware> logger, IOptions<RateLimiterOptions> options, IServiceProvider serviceProvider, RateLimitingMetrics metrics)
    {
        // ...省略一堆代码
        
        _endpointLimiter = CreateEndpointLimiter();
    }
}

CreateEndpointLimiter方法中,创建了分区限流器,里面包含了各种各样的限流分区,用于不同终结点请求的限流。

private PartitionedRateLimiter<HttpContext> CreateEndpointLimiter()
{
    // 创建分区限流器
    return PartitionedRateLimiter.Create<HttpContext, DefaultKeyType>(context =>
    {
        DefaultRateLimiterPolicy? policy;
        var enableRateLimitingAttribute = context.GetEndpoint()?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
        // 如果不需要限流,则返回 NoLimiter
        if (enableRateLimitingAttribute is null)
        {
            return RateLimitPartition.GetNoLimiter<DefaultKeyType>(_defaultPolicyKey);
        }
        
        // 根据限流策略取限流分区
        policy = enableRateLimitingAttribute.Policy;
        if (policy is not null)
        {
            return policy.GetPartition(context);
        }
        var name = enableRateLimitingAttribute.PolicyName;
        if (name is not null)
        {
            if (_policyMap.TryGetValue(name, out policy))
            {
                return policy.GetPartition(context);
            }
            else
            {
                throw new InvalidOperationException($"This endpoint requires a rate limiting policy with name {name}, but no such policy exists.");
            }
        }
        // 虽然策略名或策略不可能为空,但是加一下判断更好
        else
        {
            throw new InvalidOperationException("This endpoint requested a rate limiting policy with a null name.");
        }
    }, new DefaultKeyTypeEqualityComparer());
}

咦?怎么还是没看到在哪自动补充的许可?实际上它就隐藏在PartitionedRateLimiter.Create中的DefaultPartitionedRateLimiter里面,藏得太深了:

public static class PartitionedRateLimiter
{
    public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
        Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
        IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
    {
        return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
    }
}

下面是DefaultPartitionedRateLimiter启动定时器执行心跳的核心代码:

internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
{
    // 限流器集合
    private readonly Dictionary<TKey, Lazy<RateLimiter>> _limiters;
    // 限流分区委托,可通过资源获取到分区
    private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
    // 定时器,主要作用是每 100ms 进行一次心跳,即执行 Heartbeat 方法
    private readonly TimerAwaitable _timer;
    private readonly Task _timerTask;

    public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
        IEqualityComparer<TKey>? equalityComparer = null)
    {
        _limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
        _partitioner = partitioner;

        var timerInterval = TimeSpan.FromMilliseconds(100);
        _timer = new TimerAwaitable(timerInterval, timerInterval);
        _timerTask = RunTimer();
    }
    
    private async Task RunTimer()
    {
        _timer.Start();
        // 只要 timer 不被停止,则一直返回 true,即 timer 仍在运行中
        while (await _timer)
        {
            try
            {
                await Heartbeat().ConfigureAwait(false);
            }
            catch { }
        }
        _timer.Dispose();
    }
}

TimerAwaitable是一个可异步等待的类型(实现了GetAwaiterINotifyCompletionIsCompletedGetResult),内部设计非常有意思。在它内部,启动了一个定时器,每 100ms(传入的timerInterval) Tick 一次,每次 Tick 就会把 IsCompleted设置为true,将任务状态切换为已完成。外部通过await获取结果时(静默调用GetResult),又会将IsCompleted设置为false,再将其转换为未完成状态。外部再配合while以达到定时执行的效果。

为什么不直接用Timer而又弄出一个TimerAwaitable?我认为TimerAwaitable有以下优点:

  1. 优雅的书写异步代码
  2. 它的定时执行不会出现重入。即不会因为上一次定时任务执行耗时超过定时间隔还未完成,这一次又执行了定时任务,导致同时有两个甚至多个线程在执行定时任务。

通过定时器,每 100ms 执行一次心跳,心跳过程中检查各个限流器是否需要补充许可,如果需要,则补充,并回收空闲限流器等。以下是简化的心跳逻辑:

private async Task Heartbeat()
{
    if (_cacheInvalid)
    {
        _cachedLimiters.Clear();
        _cachedLimiters.AddRange(_limiters);
    }

    // 遍历所有缓存的限流器
    foreach (KeyValuePair<TKey, Lazy<RateLimiter>> rateLimiter in _cachedLimiters)
    {
        // 如果限流器还未被实例化,则跳过
        if (!rateLimiter.Value.IsValueCreated) continue;
        
        // 如果限流器空闲周期超过了空闲时间限制(默认10s),则回
        if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > s_idleTimeLimit)
        {
            lock (Lock)
            {
                // 双重检测,确保限流器确实是空闲的
                idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero;
                if (idleDuration > s_idleTimeLimit)
                {
                    _cacheInvalid = true;
                    // 回收该限流器
                    _limiters.Remove(rateLimiter.Key);

                    // 保存下来,后面一起释放资源
                    _limitersToDispose.Add(rateLimiter.Value.Value);
                }
            }
        }
        // 如果限流器可补充许可,则尝试补充
        else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter)
        {
            try
            {
                replenishingRateLimiter.TryReplenish();
            }
            catch (Exception ex) { ... }
        }
    }

    // 释放回收的限流器资源
    foreach (RateLimiter limiter in _limitersToDispose)
    {
        try
        {
            await limiter.DisposeAsync().ConfigureAwait(false);
        }
        catch (Exception ex) { ... }
    }
    _limitersToDispose.Clear();
}

好了,我们已经了解了限流器的管理,让我们再次回到RateLimitingMiddleware,看看他是如何工作的吧:

public Task Invoke(HttpContext context)
{
    var endpoint = context.GetEndpoint();
    // 如果终结点包含禁用限流标记,则不限流
    if (endpoint?.Metadata.GetMetadata<DisableRateLimitingAttribute>() is not null)
    {
        return _next(context);
    }
    
    var enableRateLimitingAttribute = endpoint?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
    // 如果终结点没有启用限流标记,并且全局限流器也是空的,则同样不限流
    if (enableRateLimitingAttribute is null && _globalLimiter is null)
    {
        return _next(context);
    }

    return InvokeInternal(context, enableRateLimitingAttribute);
}

private async Task InvokeInternal(HttpContext context, EnableRateLimitingAttribute? enableRateLimitingAttribute)
{
    var policyName = enableRateLimitingAttribute?.PolicyName;

    // 尝试获取许可
    using var leaseContext = await TryAcquireAsync(context);

    // 如果获取到了许可,则处理请求
    if (leaseContext.Lease?.IsAcquired == true)
    {
        await _next(context);
    }
    // 没有获取到许可,则限流拒绝
    else
    {
        // 如果请求是被取消的,则不要执行 OnRejected 回调,应该直接返回
        if (leaseContext.RequestRejectionReason == RequestRejectionReason.RequestCanceled)
        {
            return;
        }
        var thisRequestOnRejected = _defaultOnRejected;
        context.Response.StatusCode = _rejectionStatusCode;

        // 如果请求是被终结点限流器限流拒绝的
        if (leaseContext.RequestRejectionReason == RequestRejectionReason.EndpointLimiter)
        {
            // 若策略有自己的 OnRejected,则使用策略的,如果没有,则使用 _defaultOnRejected
            
            // 这里我感觉是个 bug,应该判断 policy?.OnRejected is not null 才赋值
            DefaultRateLimiterPolicy policy = enableRateLimitingAttribute?.Policy;
            if (policy is not null)
            {
                thisRequestOnRejected = policy.OnRejected;
            }
            else
            {
                // 对于策略名,当 OnRejected 不为空时,才使用策略的 OnRejected
                if (policyName is not null && _policyMap.TryGetValue(policyName, out policy) && policy.OnRejected is not null)
                {
                    thisRequestOnRejected = policy.OnRejected;
                }
            }
        }
        
        // 执行回调
        if (thisRequestOnRejected is not null)
        {
            await thisRequestOnRejected(new OnRejectedContext() { HttpContext = context, Lease = leaseContext.Lease! }, context.RequestAborted);
        }
    }
}

TryAcquireAsync会先从全局限流器获取许可,如果获取到了,则会继续在终结点限流器中获取许可,如果获取到了,请求才会被处理:

异步的 CombinedWaitAsync 与同步的 CombinedAcquire 类似,只不过前面调用的是异步方法,后面是同步,故下方仅列出 CombinedAcquire 简化源码。

private async ValueTask<LeaseContext> TryAcquireAsync(HttpContext context, MetricsContext metricsContext)
{
    // 组合获取,即按顺序从全局限流器和终结点限流器中获取许可
    var leaseContext = CombinedAcquire(context);
    // 如果获取到了,则直接返回
    if (leaseContext.Lease?.IsAcquired == true)
    {
        return leaseContext;
    }

    // 异步等待再次获取许可
    return await CombinedWaitAsync(context, context.RequestAborted);
}

private LeaseContext CombinedAcquire(HttpContext context)
{
    // 全局限流器不为空,则先从其中获取许可
    if (_globalLimiter is not null)
    {
        var globalLease = _globalLimiter.AttemptAcquire(context);
        // 未获取许可,直接返回
        if (!globalLease.IsAcquired)
        {
            return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
        }
    }
    
    // 从终结点限流器中获取许可
    var endpointLease = _endpointLimiter.AttemptAcquire(context);
    // 未获取许可,直接返回
    if (!endpointLease.IsAcquired)
    {
        globalLease?.Dispose();
        return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
    }

    return globalLease is null 
        ? new LeaseContext() { Lease = endpointLease } 
        : new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
}

总结

  1. ASP.NET Core 为我们提供了限流功能,通过AddRateLimiter注册限流服务,通过UseRateLimiter启用限流功能。
  2. 默认提供了4种限流算法,分别是:
    • FixedWindowLimiter:固定窗口限流器
    • SlidingWindowLimiter:滑动窗口限流器
    • TokenBucketLimiter:令牌桶限流器
    • ConcurrencyLimiter:并发限流器
  3. 可以通过options.AddPolicy添加限流策略,作用于某些终结点,这些策略最终组成的分区限流器称为终结点限流器
    • 可以通过options.AddXXXLimiter的方式快捷添加限流策略
    • 也可以自定义限流策略,限流逻辑可以通过委托直接传入,也可以通过实现接口IRateLimiterPolicy<TPartitionKey>
  4. 可以通过options.GlobalLimiter设置全局限流器,当请求进入应用时,会先执行全局限流器,再执行终结点限流器。
  5. 可以通过options.RejectionStatusCode设置限流拒绝的响应状态码,还可以通过OnRejected编写更多的响应逻辑。
  6. 可以通过PartitionedRateLimiter.CreateChained将多个分区限流器进行链式组合
  7. 目前 ASP.NET Core 提供的限流功能还不够成熟,例如终结点限流器无法进行链式组合、无法为终结点设置多个限流策略等
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/xiaoxiaotank/p/17560251.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!