.NET Core-统一认证平台之定制Ocelot(2)

这篇文章,我们将从Ocelot的中间件源码分析,目前Ocelot已经实现那些功能,还有那些功能在我们实际项目中暂时还未实现,如果我们要使用这些功能,应该如何改造等方面来说明。

一、Ocelot源码解读

在使用一个组件前,最好我们要了解其中的一些原理,否则在使用过程中遇到问题,也无从下手,今天我带着大家一起来解读下Ocelot源码,并梳理出具体实现的原理和流程,便于我们根据需求扩展应用。
Ocelot源码地址[https://github.com/ThreeMammals/Ocelot],
Ocelot文档地址[https://ocelot.readthedocs.io/en/latest/]

查看.NETCORE相关中间件源码,我们优先找到入口方法,比如Ocelot中间件使用的是app.UseOcelot(),我们直接搜索UserOcelot,我们会找到OcelotMiddlewareExtensions方法,里面是Ocelot中间件实际运行的方式和流程。
定制Ocelot来满足需求

然后继续顺藤摸瓜,查看详细的实现,我们会发现如下代码

public static async Task<IApplicationBuilder> UseOcelot(this IApplicationBuilder builder, OcelotPipelineConfiguration pipelineConfiguration)        {                //创建配置信息             var configuration = await CreateConfiguration(builder);            //监听配置信息             ConfigureDiagnosticListener(builder);            //创建执行管道             return CreateOcelotPipeline(builder, pipelineConfiguration);         }

然后我们继续跟踪到创建管道方法,可以发现Ocelot的执行流程已经被找到,现在问题变的简单了,直接查看

private static IApplicationBuilder CreateOcelotPipeline(IApplicationBuilder builder, OcelotPipelineConfiguration pipelineConfiguration){    var pipelineBuilder = new OcelotPipelineBuilder(builder.ApplicationServices);    //详细创建的管道顺序在此方法     pipelineBuilder.BuildOcelotPipeline(pipelineConfiguration);    var firstDelegate = pipelineBuilder.Build();    /*             inject first delegate into first piece of asp.net middleware..maybe not like this             then because we are updating the http context in ocelot it comes out correct for             rest of asp.net..             */      builder.Properties["analysis.NextMiddlewareName"] = "TransitionToOcelotMiddleware";      builder.Use(async (context, task) =>                 {                    var downstreamContext = new DownstreamContext(context);                    await firstDelegate.Invoke(downstreamContext);                 });    return builder; }

管道创建流程及实现,会不会感觉到摸到大动脉了,核心的功能及原理基本找到了,那以后动手术也就可以避开一些坑了,我们可以对着这个执行顺序,再查看详细的源码,按照这个执行顺序查看源码,您就会发现整个思路非常清晰,每一步的实现一目了然。为了更直观的介绍源码的解读方式,这里我们就拿我们后续要操刀的中间件来讲解下中间件的具体实现。

public static class OcelotPipelineExtensions     {        public static OcelotRequestDelegate BuildOcelotPipeline(this IOcelotPipelineBuilder builder,             OcelotPipelineConfiguration pipelineConfiguration)         {            // This is registered to catch any global exceptions that are not handled             // It also sets the Request Id if anything is set globally             builder.UseExceptionHandlerMiddleware();            // If the request is for websockets upgrade we fork into a different pipeline             builder.MapWhen(context => context.HttpContext.WebSockets.IsWebSocketRequest,                 app =>                 {                    app.UseDownstreamRouteFinderMiddleware();                    app.UseDownstreamRequestInitialiser();                    app.UseLoadBalancingMiddleware();                    app.UseDownstreamUrlCreatorMiddleware();                    app.UseWebSocketsProxyMiddleware();                 });            // Allow the user to respond with absolutely anything they want.             builder.UseIfNotNull(pipelineConfiguration.PreErrorResponderMiddleware);            // This is registered first so it can catch any errors and issue an appropriate response             builder.UseResponderMiddleware();            // Then we get the downstream route information             builder.UseDownstreamRouteFinderMiddleware();            // This security module, IP whitelist blacklist, extended security mechanism             builder.UseSecurityMiddleware();            //Expand other branch pipes             if (pipelineConfiguration.MapWhenOcelotPipeline != null)             {                foreach (var pipeline in pipelineConfiguration.MapWhenOcelotPipeline)                 {                    builder.MapWhen(pipeline);                 }             }            // Now we have the ds route we can transform headers and stuff?             builder.UseHttpHeadersTransformationMiddleware();            // Initialises downstream request             builder.UseDownstreamRequestInitialiser();            // We check whether the request is ratelimit, and if there is no continue processing             builder.UseRateLimiting();            // This adds or updates the request id (initally we try and set this based on global config in the error handling middleware)             // If anything was set at global level and we have a different setting at re route level the global stuff will be overwritten             // This means you can get a scenario where you have a different request id from the first piece of middleware to the request id middleware.             builder.UseRequestIdMiddleware();            // Allow pre authentication logic. The idea being people might want to run something custom before what is built in.             builder.UseIfNotNull(pipelineConfiguration.PreAuthenticationMiddleware);            // Now we know where the client is going to go we can authenticate them.             // We allow the ocelot middleware to be overriden by whatever the             // user wants             if (pipelineConfiguration.AuthenticationMiddleware == null)             {                builder.UseAuthenticationMiddleware();             }            else             {                builder.Use(pipelineConfiguration.AuthenticationMiddleware);             }            // The next thing we do is look at any claims transforms in case this is important for authorisation             builder.UseClaimsToClaimsMiddleware();            // Allow pre authorisation logic. The idea being people might want to run something custom before what is built in.             builder.UseIfNotNull(pipelineConfiguration.PreAuthorisationMiddleware);            // Now we have authenticated and done any claims transformation we              // can authorise the request             // We allow the ocelot middleware to be overriden by whatever the             // user wants             if (pipelineConfiguration.AuthorisationMiddleware == null)             {//使用自定义认证,移除默认的认证方式                 //builder.UseAuthorisationMiddleware();             }             else             {                builder.Use(pipelineConfiguration.AuthorisationMiddleware);             }            // Now we can run the claims to headers transformation middleware             builder.UseClaimsToHeadersMiddleware();            // Allow the user to implement their own query string manipulation logic             builder.UseIfNotNull(pipelineConfiguration.PreQueryStringBuilderMiddleware);            // Now we can run any claims to query string transformation middleware             builder.UseClaimsToQueryStringMiddleware();            // Get the load balancer for this request             builder.UseLoadBalancingMiddleware();            // This takes the downstream route we retrieved earlier and replaces any placeholders with the variables that should be used             builder.UseDownstreamUrlCreatorMiddleware();            // Not sure if this is the best place for this but we use the downstream url              // as the basis for our cache key.             builder.UseOutputCacheMiddleware();            //We fire off the request and set the response on the scoped data repo             builder.UseHttpRequesterMiddleware();            return builder.Build();         }    private static void UseIfNotNull(this IOcelotPipelineBuilder builder,             Func<DownstreamContext, Func<Task>, Task> middleware)         {            if (middleware != null)             {                builder.Use(middleware);             }         }     }

限流中间件实现解析

实现代码如下builder.UseRateLimiting();,我们转到定义,得到如下代码,详细的实现逻辑在ClientRateLimitMiddleware方法里,继续转定义到这个方法,我把方法里用到的内容注释了下。

public static class RateLimitMiddlewareExtensions{    public static IOcelotPipelineBuilder UseRateLimiting(this IOcelotPipelineBuilder builder)    {        return builder.UseMiddleware<ClientRateLimitMiddleware>();     } }public class ClientRateLimitMiddleware : OcelotMiddleware{        private readonly OcelotRequestDelegate _next;        private readonly IRateLimitCounterHandler _counterHandler;        private readonly ClientRateLimitProcessor _processor;        public ClientRateLimitMiddleware(OcelotRequestDelegate next,             IOcelotLoggerFactory loggerFactory,             IRateLimitCounterHandler counterHandler)                 :base(loggerFactory.CreateLogger<ClientRateLimitMiddleware>())        {             _next = next;             _counterHandler = counterHandler;             _processor = new ClientRateLimitProcessor(counterHandler);         }        //熟悉的Tnvoke方法,所有的逻辑都在此方法里。         public async Task Invoke(DownstreamContext context)        {            var options = context.DownstreamReRoute.RateLimitOptions;            // 校验是否启用限流配置             if (!context.DownstreamReRoute.EnableEndpointEndpointRateLimiting)             {//未启用直接进入下一个中间件                 Logger.LogInformation($"EndpointRateLimiting is not enabled for {context.DownstreamReRoute.DownstreamPathTemplate.Value}");                await _next.Invoke(context);                return;             }            // 获取配置的校验客户端的方式             var identity = SetIdentity(context.HttpContext, options);            // 校验是否为白名单             if (IsWhitelisted(identity, options))             {//白名单直接放行                 Logger.LogInformation($"{context.DownstreamReRoute.DownstreamPathTemplate.Value} is white listed from rate limiting");                await _next.Invoke(context);                return;             }            var rule = options.RateLimitRule;            if (rule.Limit > 0)             {//限流数是否大于0                 // 获取当前客户端请求情况,这里需要注意_processor是从哪里注入的,后续重                 var counter = _processor.ProcessRequest(identity, options);                // 校验请求数是否大于限流数                 if (counter.TotalRequests > rule.Limit)                 {                    //获取下次有效请求的时间,就是避免每次请求,都校验一次                     var retryAfter = _processor.RetryAfterFrom(counter.Timestamp, rule);                    // 写入日志                     LogBlockedRequest(context.HttpContext, identity, counter, rule, context.DownstreamReRoute);                    var retrystring = retryAfter.ToString(System.Globalization.CultureInfo.InvariantCulture);                    // 抛出超出限流异常并把下次可请求时间写入header里。                     await ReturnQuotaExceededResponse(context.HttpContext, options, retrystring);                    return;                 }             }            //如果启用了限流头部             if (!options.DisableRateLimitHeaders)             {                var headers = _processor.GetRateLimitHeaders(context.HttpContext, identity, options);                 context.HttpContext.Response.OnStarting(SetRateLimitHeaders, state: headers);             }            //进入下一个中间件             await _next.Invoke(context);         }        public virtual ClientRequestIdentity SetIdentity(HttpContext httpContext, RateLimitOptions option)        {            var clientId = "client";            if (httpContext.Request.Headers.Keys.Contains(option.ClientIdHeader))             {                 clientId = httpContext.Request.Headers[option.ClientIdHeader].First();             }            return new ClientRequestIdentity(                 clientId,                 httpContext.Request.Path.ToString().ToLowerInvariant(),                 httpContext.Request.Method.ToLowerInvariant()                 );         }        public bool IsWhitelisted(ClientRequestIdentity requestIdentity, RateLimitOptions option)        {            if (option.ClientWhitelist.Contains(requestIdentity.ClientId))             {                return true;             }            return false;         }        public virtual void LogBlockedRequest(HttpContext httpContext, ClientRequestIdentity identity, RateLimitCounter counter, RateLimitRule rule, DownstreamReRoute downstreamReRoute)        {             Logger.LogInformation(                $"Request {identity.HttpVerb}:{identity.Path} from ClientId {identity.ClientId} has been blocked, quota {rule.Limit}/{rule.Period} exceeded by {counter.TotalRequests}. Blocked by rule { downstreamReRoute.UpstreamPathTemplate.OriginalValue }, TraceIdentifier {httpContext.TraceIdentifier}.");         }        public virtual Task ReturnQuotaExceededResponse(HttpContext httpContext, RateLimitOptions option, string retryAfter)        {            var message = string.IsNullOrEmpty(option.QuotaExceededMessage) ? $"API calls quota exceeded! maximum admitted {option.RateLimitRule.Limit} per {option.RateLimitRule.Period}." : option.QuotaExceededMessage;            if (!option.DisableRateLimitHeaders)             {                 httpContext.Response.Headers["Retry-After"] = retryAfter;             }              httpContext.Response.StatusCode = option.HttpStatusCode;            return httpContext.Response.WriteAsync(message);         }        private Task SetRateLimitHeaders(object rateLimitHeaders)        {            var headers = (RateLimitHeaders)rateLimitHeaders;              headers.Context.Response.Headers["X-Rate-Limit-Limit"] = headers.Limit;             headers.Context.Response.Headers["X-Rate-Limit-Remaining"] = headers.Remaining;             headers.Context.Response.Headers["X-Rate-Limit-Reset"] = headers.Reset;            return Task.CompletedTask;         }    }

通过源码解析,发现实现一个限流还是很简单的吗!再进一步解析,IRateLimitCounterHandler ClientRateLimitProcessor里的相关接口又是怎么实现的呢?这时候我们就需要了解下.NETCORE 的运行原理,其中ConfigureServices方法实现了依赖注入(DI)的配置。这时候我们看下Ocelot是在哪里进行注入的呢?

services.AddOcelot()是不是印象深刻呢?原来所有的注入信息都写在这里,那么问题简单了,Ctrl+F查找AddOcelot方法,马上就能定位到ServiceCollectionExtensions方法,然后再转到定义OcelotBuilder

public static class ServiceCollectionExtensions{    public static IOcelotBuilder AddOcelot(this IServiceCollection services)    {        var service = services.First(x => x.ServiceType == typeof(IConfiguration));        var configuration = (IConfiguration)service.ImplementationInstance;        return new OcelotBuilder(services, configuration);     }    public static IOcelotBuilder AddOcelot(this IServiceCollection services, IConfiguration configuration)    {        return new OcelotBuilder(services, configuration);     } }

又摸到大动脉啦,现在问题迎刃而解,原来所有的注入都写在这里,从这里可以找下我们熟悉的几个接口注入。

public OcelotBuilder(IServiceCollection services, IConfiguration configurationRoot) {     Configuration = configurationRoot;     Services = services;     Services.Configure<FileConfiguration>(configurationRoot);      Services.TryAddSingleton<IOcelotCache<FileConfiguration>, InMemoryCache<FileConfiguration>>();     Services.TryAddSingleton<IOcelotCache<CachedResponse>, InMemoryCache<CachedResponse>>();     Services.TryAddSingleton<IHttpResponseHeaderReplacer, HttpResponseHeaderReplacer>();     Services.TryAddSingleton<IHttpContextRequestHeaderReplacer, HttpContextRequestHeaderReplacer>();     Services.TryAddSingleton<IHeaderFindAndReplaceCreator, HeaderFindAndReplaceCreator>();     Services.TryAddSingleton<IInternalConfigurationCreator, FileInternalConfigurationCreator>();     Services.TryAddSingleton<IInternalConfigurationRepository, InMemoryInternalConfigurationRepository>();     Services.TryAddSingleton<IConfigurationValidator, FileConfigurationFluentValidator>();     Services.TryAddSingleton<HostAndPortValidator>();     Services.TryAddSingleton<IReRoutesCreator, ReRoutesCreator>();     Services.TryAddSingleton<IAggregatesCreator, AggregatesCreator>();     Services.TryAddSingleton<IReRouteKeyCreator, ReRouteKeyCreator>();     Services.TryAddSingleton<IConfigurationCreator, ConfigurationCreator>();     Services.TryAddSingleton<IDynamicsCreator, DynamicsCreator>();     Services.TryAddSingleton<ILoadBalancerOptionsCreator, LoadBalancerOptionsCreator>();     Services.TryAddSingleton<ReRouteFluentValidator>();     Services.TryAddSingleton<FileGlobalConfigurationFluentValidator>();     Services.TryAddSingleton<FileQoSOptionsFluentValidator>();     Services.TryAddSingleton<IClaimsToThingCreator, ClaimsToThingCreator>();     Services.TryAddSingleton<IAuthenticationOptionsCreator, AuthenticationOptionsCreator>();     Services.TryAddSingleton<IUpstreamTemplatePatternCreator, UpstreamTemplatePatternCreator>();     Services.TryAddSingleton<IRequestIdKeyCreator, RequestIdKeyCreator>();     Services.TryAddSingleton<IServiceProviderConfigurationCreator,ServiceProviderConfigurationCreator>();     Services.TryAddSingleton<IQoSOptionsCreator, QoSOptionsCreator>();     Services.TryAddSingleton<IReRouteOptionsCreator, ReRouteOptionsCreator>();     Services.TryAddSingleton<IRateLimitOptionsCreator, RateLimitOptionsCreator>();     Services.TryAddSingleton<IBaseUrlFinder, BaseUrlFinder>();     Services.TryAddSingleton<IRegionCreator, RegionCreator>();     Services.TryAddSingleton<IFileConfigurationRepository, DiskFileConfigurationRepository>();     Services.TryAddSingleton<IFileConfigurationSetter, FileAndInternalConfigurationSetter>();     Services.TryAddSingleton<IServiceDiscoveryProviderFactory, ServiceDiscoveryProviderFactory>();     Services.TryAddSingleton<ILoadBalancerFactory, LoadBalancerFactory>();     Services.TryAddSingleton<ILoadBalancerHouse, LoadBalancerHouse>();     Services.TryAddSingleton<IOcelotLoggerFactory, AspDotNetLoggerFactory>();     Services.TryAddSingleton<IRemoveOutputHeaders, RemoveOutputHeaders>();     Services.TryAddSingleton<IClaimToThingConfigurationParser, ClaimToThingConfigurationParser>();     Services.TryAddSingleton<IClaimsAuthoriser, ClaimsAuthoriser>();     Services.TryAddSingleton<IScopesAuthoriser, ScopesAuthoriser>();     Services.TryAddSingleton<IAddClaimsToRequest, AddClaimsToRequest>();     Services.TryAddSingleton<IAddHeadersToRequest, AddHeadersToRequest>();     Services.TryAddSingleton<IAddQueriesToRequest, AddQueriesToRequest>();     Services.TryAddSingleton<IClaimsParser, ClaimsParser>();     Services.TryAddSingleton<IUrlPathToUrlTemplateMatcher, RegExUrlMatcher>();     Services.TryAddSingleton<IPlaceholderNameAndValueFinder, UrlPathPlaceholderNameAndValueFinder>();     Services.TryAddSingleton<IDownstreamPathPlaceholderReplacer, DownstreamTemplatePathPlaceholderReplacer>();     Services.TryAddSingleton<IDownstreamRouteProvider, DownstreamRouteFinder>();     Services.TryAddSingleton<IDownstreamRouteProvider, DownstreamRouteCreator>();     Services.TryAddSingleton<IDownstreamRouteProviderFactory, DownstreamRouteProviderFactory>();     Services.TryAddSingleton<IHttpRequester, HttpClientHttpRequester>();     Services.TryAddSingleton<IHttpResponder, HttpContextResponder>();     Services.TryAddSingleton<IErrorsToHttpStatusCodeMapper, ErrorsToHttpStatusCodeMapper>();     Services.TryAddSingleton<IRateLimitCounterHandler, MemoryCacheRateLimitCounterHandler>();     Services.TryAddSingleton<IHttpClientCache, MemoryHttpClientCache>();     Services.TryAddSingleton<IRequestMapper, RequestMapper>();     Services.TryAddSingleton<IHttpHandlerOptionsCreator, HttpHandlerOptionsCreator>();     Services.TryAddSingleton<IDownstreamAddressesCreator, DownstreamAddressesCreator>();     Services.TryAddSingleton<IDelegatingHandlerHandlerFactory, DelegatingHandlerHandlerFactory>();     Services.TryAddSingleton<IHttpRequester, HttpClientHttpRequester>();      // see this for why we register this as singleton http://stackoverflow.com/questions/37371264/invalidoperationexception-unable-to-resolve-service-for-type-microsoft-aspnetc     // could maybe use a scoped data repository     Services.TryAddSingleton<IHttpContextAccessor, HttpContextAccessor>();     Services.TryAddSingleton<IRequestScopedDataRepository, HttpDataRepository>();     Services.AddMemoryCache();     Services.TryAddSingleton<OcelotDiagnosticListener>();     Services.TryAddSingleton<IMultiplexer, Multiplexer>();     Services.TryAddSingleton<IResponseAggregator, SimpleJsonResponseAggregator>();     Services.TryAddSingleton<ITracingHandlerFactory, TracingHandlerFactory>();     Services.TryAddSingleton<IFileConfigurationPollerOptions, InMemoryFileConfigurationPollerOptions>();     Services.TryAddSingleton<IAddHeadersToResponse, AddHeadersToResponse>();     Services.TryAddSingleton<IPlaceholders, Placeholders>();     Services.TryAddSingleton<IResponseAggregatorFactory, InMemoryResponseAggregatorFactory>();     Services.TryAddSingleton<IDefinedAggregatorProvider, ServiceLocatorDefinedAggregatorProvider>();     Services.TryAddSingleton<IDownstreamRequestCreator, DownstreamRequestCreator>();     Services.TryAddSingleton<IFrameworkDescription, FrameworkDescription>();     Services.TryAddSingleton<IQoSFactory, QoSFactory>();     Services.TryAddSingleton<IExceptionToErrorMapper, HttpExeptionToErrorMapper>();      //add security      this.AddSecurity();      //add asp.net services..     var assembly = typeof(FileConfigurationController).GetTypeInfo().Assembly;      Services.AddMvcCore()         .AddApplicationPart(assembly)         .AddControllersAsServices()         .AddAuthorization()         .AddJsonFormatters();      Services.AddLogging();     Services.AddMiddlewareAnalysis();     Services.AddWebEncoders(); }

至此Ocelot源码解析就到这里了,其他的具体实现代码就根据流程一个一个查看即可,这里就不详细讲解了,因为我们已经掌握整个Ocelot代码的运行原理和实现方式及流程,项目里其他的一大堆的代码都是围绕这个流程去一步一步实现的。

有没有感觉添加一个中间件不是很复杂呢,是不是都跃跃欲试,准备尝试开发自己的自定义中间件啦,本篇就不介绍中间件的具体开发流程了,后续实战中会包含部分项目中需要用到的中间件,到时候会详细讲解如何规划和开发一个满足自己项目需求的中间件。

二、结合项目梳理功能

在完整学习完Ocelot文档和源码后,我们基本掌握了Ocelot目前已经实现的功能,再结合我们实际项目需求,我们梳理下还有哪些功能可能需要自己扩展实现。

项目设计网关基本需求包括路由、认证、授权、限流、缓存,仔细学习文档和源码后发现功能都已经存在,那是不是我们就可以直接拿来使用呢?这时候我们需要拿出一些复杂业务场景来对号入座,看能否实现复杂场景的一些应用。

1、授权

能否为每一个客户端设置独立的访问权限,如果客户端A可以访问服务A、服务B,客户端B只能访问服务A,从网关层面直接授权,不满足需求不路由到具体服务。从文档和代码分析后发现暂时未实现。

2、限流

能否为每一个客户端设置不能限流规则,例如客户端A为我们内容应用,我希望对服务A不启用限流,客户端B为第三方接入应用,我需要B访问服务A访问进行单独限流(30次/分钟),看能否通过配置实现自定义限流。从文档和代码分析后发现暂时未实现。

3、缓存

通过代码发现目前缓存实现的只是Dictionary方式实现的缓存,不能实现分布式结构的应用。

通过分析我们发现列举的5个基本需求,尽然有3个在我们实际项目应用中可能会存在问题,如果不解决这些问题,很难直接拿这个完美的网关项目应用到正式项目,所以我们到通过扩展Ocelot方法来实现我们的目的。

如何扩展呢

为了满足我们项目应用的需要,我们需要为每一个路由进行单独设置,如果还采用配置文件的方式,肯定无法满足需求,且后续网关动态增加路由、授权、限流等无法控制,所以我们需要把网关配置信息从配置文件中移到数据库中,由数据库中的路由表、限流表、授权表等方式记录当前网关的应用,且后续扩展直接在数据库中增加或减少相关配置,然后动态更新网关配置实现网关的高可用。

想一想是不是有点小激动,原来只要稍微改造下宝骏瞬间变宝马,那接下来的课程就是网关改造之旅,我会从设计、思想、编码等方面讲解下如何实现我们的第一辆宝马。


未经允许不得转载:996ICU » .NET Core-统一认证平台之定制Ocelot(2)

赞 (0) 打赏