You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

476 lines
9.7 KiB
Go

package logproto
import (
"fmt"
"io"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/prometheus/promql/parser"
)
// Stream contains a unique labels set as a string and a set of entries for it.
// We are not using the proto generated version but this custom one so that we
// can improve serialization see benchmark.
type Stream struct {
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
}
// MarshalJSON implements the json.Marshaler interface.
func (r *PushRequest) MarshalJSON() ([]byte, error) {
stream := jsoniter.ConfigDefault.BorrowStream(nil)
defer jsoniter.ConfigDefault.ReturnStream(stream)
stream.WriteObjectStart()
stream.WriteObjectField("streams")
stream.WriteArrayStart()
for i, s := range r.Streams {
stream.WriteObjectStart()
stream.WriteObjectField("stream")
stream.WriteObjectStart()
lbs, err := parser.ParseMetric(s.Labels)
if err != nil {
continue
}
for i, lb := range lbs {
stream.WriteObjectField(lb.Name)
stream.WriteStringWithHTMLEscaped(lb.Value)
if i != len(lbs)-1 {
stream.WriteMore()
}
}
stream.WriteObjectEnd()
stream.WriteMore()
stream.WriteObjectField("values")
stream.WriteArrayStart()
for i, entry := range s.Entries {
stream.WriteArrayStart()
stream.WriteRaw(fmt.Sprintf(`"%d"`, entry.Timestamp.UnixNano()))
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(entry.Line)
stream.WriteArrayEnd()
if i != len(s.Entries)-1 {
stream.WriteMore()
}
}
stream.WriteArrayEnd()
stream.WriteObjectEnd()
if i != len(r.Streams)-1 {
stream.WriteMore()
}
}
stream.WriteArrayEnd()
stream.WriteObjectEnd()
return stream.Buffer(), nil
}
// Entry is a log entry with a timestamp.
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
}
func (m *Stream) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Stream) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Labels) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels)))
i += copy(dAtA[i:], m.Labels)
}
if len(m.Entries) > 0 {
for _, msg := range m.Entries {
dAtA[i] = 0x12
i++
i = encodeVarintLogproto(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *Entry) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Entry) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(SizeOfStdTime(m.Timestamp)))
n5, err := StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err != nil {
return 0, err
}
i += n5
if len(m.Line) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Line)))
i += copy(dAtA[i:], m.Line)
}
return i, nil
}
func (m *Stream) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Stream: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Stream: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Labels = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Entries = append(m.Entries, Entry{})
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Entry) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Entry: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Entry: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Stream) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Labels)
if l > 0 {
n += 1 + l + sovLogproto(uint64(l))
}
if len(m.Entries) > 0 {
for _, e := range m.Entries {
l = e.Size()
n += 1 + l + sovLogproto(uint64(l))
}
}
return n
}
func (m *Entry) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = SizeOfStdTime(m.Timestamp)
n += 1 + l + sovLogproto(uint64(l))
l = len(m.Line)
if l > 0 {
n += 1 + l + sovLogproto(uint64(l))
}
return n
}
func (m *Stream) Equal(that interface{}) bool {
if that == nil {
return m == nil
}
that1, ok := that.(*Stream)
if !ok {
that2, ok := that.(Stream)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return m == nil
} else if m == nil {
return false
}
if m.Labels != that1.Labels {
return false
}
if len(m.Entries) != len(that1.Entries) {
return false
}
for i := range m.Entries {
if !m.Entries[i].Equal(that1.Entries[i]) {
return false
}
}
return true
}
func (m *Entry) Equal(that interface{}) bool {
if that == nil {
return m == nil
}
that1, ok := that.(*Entry)
if !ok {
that2, ok := that.(Entry)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return m == nil
} else if m == nil {
return false
}
if !m.Timestamp.Equal(that1.Timestamp) {
return false
}
if m.Line != that1.Line {
return false
}
return true
}