项目内(预研)在使用Istio
的默认网关envoy
作为service mesh的最要组件。其本身可以配置zipkin或jaeger等opentracing的实现作为业务调用追踪(tracing)监控方案。但如果只依靠于envoy本身的tracing,则会在靠近业务侧中断,使前后请求响应不能串联,所以业务本身需要负责此串联。花了些时间看了一下,使用Python基于opentracing实现了一个串联grpc的方案。
前置资料
背景
- 业界已经有几种Tracing方案,如ebay、点评的ACT方案。
- google的论文Dapper出来后,形成了一个通用的tracing标准,即
OpenTracing
。
OpenTracing基础框架
OpenTracing视角看请求
主要需要了解Tracer, Span, SpanContext, Carrier, Baggage等概念,具体自行查阅资料。
zipkin是一个opentracing的实现,稍了解下其基本框架。
jaeger是基于zipkin的实现,同样实现了opentracing。
业务目标
- 打通envoy及业务的tracing
- 对grpc请求进行tracing
实现
envoy侧
研究envoy的tracing机制,可见其官方文章 。支持的tracing已经挺丰富了。
envoy.lightstep
envoy.zipkin
envoy.dynamic.ot
envoy.tracers.datadog
我们使用jaeger来做追踪,默认配置如下:
1
2
3
4
5
6
| tracing:
http:
name: envoy.zipkin
config:
collector_cluster: jaeger
collector_endpoint: "/api/v1/spans"
|
对于jaeger(zipkin)的Tracing方式,envoy中大概是这样的过程:
When using the Zipkin tracer, Envoy relies on the service to propagate the B3 HTTP headers ( x-b3-traceid, x-b3-spanid, x-b3-parentspanid, x-b3-sampled, and x-b3-flags). The x-b3-sampled header can also be supplied by an external client to either enable or disable tracing for a particular request. In addition, the single b3 header propagation format is supported, which is a more compressed format.
即在头部有如上的一些消息,当然也支持b3的压缩格式: xx:yy:zz:aa。以实际中收包的metadata中打出来的如下:
1
2
3
4
5
6
7
8
9
10
| act_server_1 | x-b3-sampled 1
act_server_1 | x-request-id 91fac7d8-ad0f-9a04-84f6-88a8192d08fe
act_server_1 | x-b3-parentspanid 1a6d063b2e392c71
act_server_1 | x-b3-traceid 1a6d063b2e392c71
act_server_1 | x-envoy-expected-rq-timeout-ms 15000
act_server_1 | user-agent grpc-c++/1.15.0 grpc-c/6.0.0 (linux; chttp2; glider)
act_server_1 | x-forwarded-proto http
act_server_1 | x-envoy-decorator-operation checkAvailability
act_server_1 | server-family ActService
act_server_1 | x-b3-spanid 25e2a308dbb1a295
|
业务侧 Inbound
在envoy正常工作下,现在业务已经能接收到其转发的请求了。请求体中带了span-context,即构造一个SpanContext需要的信息在HTTP header中。如果是普通http则直接header中可取。我们业务是grpc请求,在信息在metadata里。
自己实现并不复杂,网上也有人进行了封装,最后实际代码很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| from grpc_opentracing import open_tracing_server_interceptor
from grpc_opentracing.grpcext import intercept_server
...
# init a tracer
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'local_agent': {
'reporting_host': 'jaeger'
},
'logging': True,
'tags': {
'first_name': 'lin',
'second_name': 'kevin'
}
}, service_name='act1', validate=True)
tracer = config.initialize_tracer()
tracer_interceptor = open_tracing_server_interceptor(tracer)
server = grpc.server(futures.ThreadPoolExecutor( max_workers=10))
# 设置tracing拦截器
server = intercept_server(server, tracer_interceptor)
# 现在接收的所有grpc请求都将会打一条operation=act1的日志。
|
- 注,这里本地不部署jaeger-agent,而直接将上报发给
reporting_host
(见上config配置)
除了找到合适的上报时机,还需要上报正确的内容,这就涉及到grpc本身协议机制(如metadata)和opentracing api的使用了。上面加上后,会发现tracing并没有沿着预想的一路下来,而是另建了一个新的Span,这是为什么呢?
回想一下整个过程: grpc request->server interceptor->tracer??
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| class OpenTracingServerInterceptor(grpcext.UnaryServerInterceptor,
grpcext.StreamServerInterceptor):
...
def _start_span(self, servicer_context, method):
span_context = None
error = None
metadata = servicer_context.invocation_metadata() #这里带的headers信息
try:
if metadata:
span_context = self._tracer.extract(
opentracing.Format.HTTP_HEADERS, dict(metadata)) #注意使用HTTP_HEADERS这个format去提取metadata
except
...
...
span = self._tracer.start_span(
operation_name=method, child_of=span_context, tags=tags) #使用child_of创建子span
...
return span
|
逻辑看起来清晰。而之所以未能继承自header的数据来追踪,问题显然是来自span_context = self._tracer.extract(opentracing.Format.HTTP_HEADERS, dict(metadata))
这段。深入extract()源码(jaeger的实现)后可见:
1
2
3
4
5
| def extract(self, format, carrier):
codec = self.codecs.get(format, None)
if codec is None:
raise UnsupportedFormatException(format)
return codec.extract(carrier)
|
这里解码器codecs是可扩展的多种类型,而jaeger在__init__
中是这样初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| self.codecs = {
Format.TEXT_MAP: TextCodec(
url_encoding=False,
trace_id_header=trace_id_header,
baggage_header_prefix=baggage_header_prefix,
debug_id_header=debug_id_header,
),
Format.HTTP_HEADERS: TextCodec(
url_encoding=True,
trace_id_header=trace_id_header,
baggage_header_prefix=baggage_header_prefix,
debug_id_header=debug_id_header,
),
Format.BINARY: BinaryCodec(),
ZipkinSpanFormat: ZipkinCodec(),
}
if extra_codecs:
self.codecs.update(extra_codecs)
|
问题基本浮出水面,Format.HTTP_HEADERS被注册成TextCodec(…)的实例,这个并不能解B3-header系列的头部,所以codec.extract将返回None,于是没有和parent span串联。
最后利用extra_codecs->propagation->config[‘proopagation’]=‘b3’来解决问题,即在初始化Tracer时,传入的config添加 'propagation': 'b3'
即可。与此相关代码:
1
2
3
4
5
6
7
| @property
def propagation(self):
propagation = self.config.get('propagation')
if propagation == 'b3':
# replace the codec with a B3 enabled instance
return {Format.HTTP_HEADERS: B3Codec()}
return {}
|
业务侧 Outbound
业务侧的Outboud(向外部的其它请求)一般需要带上来源span,这样才能继续跟踪调用。有前面的基础后,在发起grpc请求前,创建一个client_interceptor,大概代码形如:
1
2
3
4
5
6
7
| from grpc_opentracing import open_tracing_client_interceptor
from grpc_opentracing.grpcext import intercept_channel
...
def AnGrpcCall(self, request, context):
tracer_client_interceptor = open_tracing_client_interceptor(self_.tracer, active_span_source=ActiveSpanSource(self._tracer, context.invocation_metadata())
channel = intercept_channel(channel, tracer_client_interceptor)
|
这里有个参数active_span_source,需要自行实现返回源Span。我们可以通过grpc调用时的context来产生一个。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class ActiveSpanSource(six.with_metaclass(abc.ABCMeta)):
"""Provides a way to access an the active span."""
def __init__(self, tracer, metadata):
self._tracer = tracer
self._metadata = metadata
def get_active_span(self):
"""Identifies the active span.
Returns:
An object that implements the opentracing.Span interface.
"""
span_ctx = None
try:
if self._metadata:
span_ctx = self._tracer.extract(
opentracing.Format.HTTP_HEADERS, dict(self._metadata))
except (opentracing.UnsupportedFormatException,
opentracing.InvalidCarrierException,
opentracing.SpanContextCorruptedException) as e:
logging.exception(
'tracer.extract() failed')
return None
return Span(span_ctx, self._tracer, 'Outbound')
|
至此,出去的所有调用都继承了来源的Span,整个链路即可串联。
结果展示
附录:API纵览
- jaeger_client(以python模块举例)
- Tracer模块
- init(service_name, reporter, sampler, …)
- start_span(operation_name, child_of, references, tags, start_time)
- inject(span_context, format, carrier)
- extract(format, carrier)
- close()
- report_span(span)
- random_id()
- is_debug_allowed()
- Config模块
- init(config, metrics, service_name,…)
- @service_name
- @logging()
- @trace_id_header()
- @sampler
- @tags
- initialize_tracer(io_loop)
- new_tracer()
- create_tracer(reporter, sampler,…)
- Span模块
- init(context, tracer, operation_name, tags, start_time)
- set_operation_name(name)
- finish(finish_time)
- set_tag(key, value) – append
- log_kv(key_values, timestamp)
- set_baggage_item(key, value)
- get_baggage_item(key)
- is_sampled()
- is_rpc()
- is_rpc_client()
- @trace_id
- @span_id
- @parent_id
- @flags
- SpanContext模块
- init(trace_id, span_id, parent_id, flags, baggage, debug_id)
- @baggage
- with_baggage_item(key, value) – return SpanContext
- @has_trace
- @debug_id
- with_debug_id(debug_id) – return SpanContext
- Sampler模块
- ConstSampler
- ProbabilisticSampler
- RateLimitingSampler
- RemoteControlledSampler
参考