package logproto import ( "fmt" "io" "time" jsoniter "github.com/json-iterator/go" "github.com/prometheus/prometheus/promql/parser" ) // Stream contains a unique labels set as a string and a set of entries for it. // We are not using the proto generated version but this custom one so that we // can improve serialization see benchmark. type Stream struct { Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` } // MarshalJSON implements the json.Marshaler interface. func (r *PushRequest) MarshalJSON() ([]byte, error) { stream := jsoniter.ConfigDefault.BorrowStream(nil) defer jsoniter.ConfigDefault.ReturnStream(stream) stream.WriteObjectStart() stream.WriteObjectField("streams") stream.WriteArrayStart() for i, s := range r.Streams { stream.WriteObjectStart() stream.WriteObjectField("stream") stream.WriteObjectStart() lbs, err := parser.ParseMetric(s.Labels) if err != nil { continue } for i, lb := range lbs { stream.WriteObjectField(lb.Name) stream.WriteStringWithHTMLEscaped(lb.Value) if i != len(lbs)-1 { stream.WriteMore() } } stream.WriteObjectEnd() stream.WriteMore() stream.WriteObjectField("values") stream.WriteArrayStart() for i, entry := range s.Entries { stream.WriteArrayStart() stream.WriteRaw(fmt.Sprintf(`"%d"`, entry.Timestamp.UnixNano())) stream.WriteMore() stream.WriteStringWithHTMLEscaped(entry.Line) stream.WriteArrayEnd() if i != len(s.Entries)-1 { stream.WriteMore() } } stream.WriteArrayEnd() stream.WriteObjectEnd() if i != len(r.Streams)-1 { stream.WriteMore() } } stream.WriteArrayEnd() stream.WriteObjectEnd() return stream.Buffer(), nil } // Entry is a log entry with a timestamp. type Entry struct { Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` } func (m *Stream) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } return dAtA[:n], nil } func (m *Stream) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if len(m.Labels) > 0 { dAtA[i] = 0xa i++ i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels))) i += copy(dAtA[i:], m.Labels) } if len(m.Entries) > 0 { for _, msg := range m.Entries { dAtA[i] = 0x12 i++ i = encodeVarintLogproto(dAtA, i, uint64(msg.Size())) n, err := msg.MarshalTo(dAtA[i:]) if err != nil { return 0, err } i += n } } return i, nil } func (m *Entry) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } return dAtA[:n], nil } func (m *Entry) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l dAtA[i] = 0xa i++ i = encodeVarintLogproto(dAtA, i, uint64(SizeOfStdTime(m.Timestamp))) n5, err := StdTimeMarshalTo(m.Timestamp, dAtA[i:]) if err != nil { return 0, err } i += n5 if len(m.Line) > 0 { dAtA[i] = 0x12 i++ i = encodeVarintLogproto(dAtA, i, uint64(len(m.Line))) i += copy(dAtA[i:], m.Line) } return i, nil } func (m *Stream) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx var wire uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowLogproto } if iNdEx >= l { return io.ErrUnexpectedEOF } b := dAtA[iNdEx] iNdEx++ wire |= uint64(b&0x7F) << shift if b < 0x80 { break } } fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { return fmt.Errorf("proto: Stream: wiretype end group for non-group") } if fieldNum <= 0 { return fmt.Errorf("proto: Stream: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowLogproto } if iNdEx >= l { return io.ErrUnexpectedEOF } b := dAtA[iNdEx] iNdEx++ stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } } intStringLen := int(stringLen) if intStringLen < 0 { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen if postIndex < 0 { return ErrInvalidLengthLogproto } if postIndex > l { return io.ErrUnexpectedEOF } m.Labels = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) } var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowLogproto } if iNdEx >= l { return io.ErrUnexpectedEOF } b := dAtA[iNdEx] iNdEx++ msglen |= int(b&0x7F) << shift if b < 0x80 { break } } if msglen < 0 { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen if postIndex < 0 { return ErrInvalidLengthLogproto } if postIndex > l { return io.ErrUnexpectedEOF } m.Entries = append(m.Entries, Entry{}) if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) if err != nil { return err } if skippy < 0 { return ErrInvalidLengthLogproto } if (iNdEx + skippy) < 0 { return ErrInvalidLengthLogproto } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } iNdEx += skippy } } if iNdEx > l { return io.ErrUnexpectedEOF } return nil } func (m *Entry) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx var wire uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowLogproto } if iNdEx >= l { return io.ErrUnexpectedEOF } b := dAtA[iNdEx] iNdEx++ wire |= uint64(b&0x7F) << shift if b < 0x80 { break } } fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { return fmt.Errorf("proto: Entry: wiretype end group for non-group") } if fieldNum <= 0 { return fmt.Errorf("proto: Entry: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) } var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowLogproto } if iNdEx >= l { return io.ErrUnexpectedEOF } b := dAtA[iNdEx] iNdEx++ msglen |= int(b&0x7F) << shift if b < 0x80 { break } } if msglen < 0 { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen if postIndex < 0 { return ErrInvalidLengthLogproto } if postIndex > l { return io.ErrUnexpectedEOF } if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowLogproto } if iNdEx >= l { return io.ErrUnexpectedEOF } b := dAtA[iNdEx] iNdEx++ stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } } intStringLen := int(stringLen) if intStringLen < 0 { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen if postIndex < 0 { return ErrInvalidLengthLogproto } if postIndex > l { return io.ErrUnexpectedEOF } m.Line = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) if err != nil { return err } if skippy < 0 { return ErrInvalidLengthLogproto } if (iNdEx + skippy) < 0 { return ErrInvalidLengthLogproto } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } iNdEx += skippy } } if iNdEx > l { return io.ErrUnexpectedEOF } return nil } func (m *Stream) Size() (n int) { if m == nil { return 0 } var l int _ = l l = len(m.Labels) if l > 0 { n += 1 + l + sovLogproto(uint64(l)) } if len(m.Entries) > 0 { for _, e := range m.Entries { l = e.Size() n += 1 + l + sovLogproto(uint64(l)) } } return n } func (m *Entry) Size() (n int) { if m == nil { return 0 } var l int _ = l l = SizeOfStdTime(m.Timestamp) n += 1 + l + sovLogproto(uint64(l)) l = len(m.Line) if l > 0 { n += 1 + l + sovLogproto(uint64(l)) } return n } func (m *Stream) Equal(that interface{}) bool { if that == nil { return m == nil } that1, ok := that.(*Stream) if !ok { that2, ok := that.(Stream) if ok { that1 = &that2 } else { return false } } if that1 == nil { return m == nil } else if m == nil { return false } if m.Labels != that1.Labels { return false } if len(m.Entries) != len(that1.Entries) { return false } for i := range m.Entries { if !m.Entries[i].Equal(that1.Entries[i]) { return false } } return true } func (m *Entry) Equal(that interface{}) bool { if that == nil { return m == nil } that1, ok := that.(*Entry) if !ok { that2, ok := that.(Entry) if ok { that1 = &that2 } else { return false } } if that1 == nil { return m == nil } else if m == nil { return false } if !m.Timestamp.Equal(that1.Timestamp) { return false } if m.Line != that1.Line { return false } return true }