迁移项目

master v1.0.0
loser 2 years ago
parent 12ea86b2fb
commit 5d4d02d679

@ -1,73 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
(a) You must give any other recipients of the Work or Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -1,3 +1,11 @@
# loki-client-go
# Loki Go client (Experimental. DO NOT USE IT)
loki-client-go
An HTTP client to send logs to Loki server
## Install
## Usage
## License
Apache License 2.0, see [LICENSE](LICENSE).

@ -0,0 +1,20 @@
module git.echol.cn/loser/loki-client-go
go 1.15
require (
github.com/blang/semver v3.5.1+incompatible
github.com/go-kit/kit v0.10.0
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.2
github.com/json-iterator/go v1.1.11
github.com/mitchellh/mapstructure v1.2.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.30.0
github.com/prometheus/prometheus v1.8.2-0.20201028100903-3245b3267b24
github.com/stretchr/testify v1.6.1
google.golang.org/grpc v1.32.0
gopkg.in/yaml.v2 v2.4.0
)

@ -0,0 +1,107 @@
package loki
import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
json "github.com/json-iterator/go"
"github.com/lixh00/loki-client-go/pkg/logproto"
)
// batch holds pending log streams waiting to be sent to Loki, and it's used
// to reduce the number of push requests to Loki aggregating multiple log streams
// and entries in a single batch request. In case of multi-tenant Promtail, log
// streams for each tenant are stored in a dedicated batch.
type batch struct {
streams map[string]*logproto.Stream
bytes int
createdAt time.Time
}
func newBatch(entries ...entry) *batch {
b := &batch{
streams: map[string]*logproto.Stream{},
bytes: 0,
createdAt: time.Now(),
}
// Add entries to the batch
for _, entry := range entries {
b.add(entry)
}
return b
}
// add an entry to the batch
func (b *batch) add(entry entry) {
b.bytes += len(entry.Line)
// Append the entry to an already existing stream (if any)
labels := entry.labels.String()
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry.Entry)
return
}
// Add the entry as a new stream
b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{entry.Entry},
}
}
// sizeBytes returns the current batch size in bytes
func (b *batch) sizeBytes() int {
return b.bytes
}
// sizeBytesAfter returns the size of the batch after the input entry
// will be added to the batch itself
func (b *batch) sizeBytesAfter(entry entry) int {
return b.bytes + len(entry.Line)
}
// age of the batch since its creation
func (b *batch) age() time.Duration {
return time.Since(b.createdAt)
}
// encode the batch as snappy-compressed push request, and returns
// the encoded bytes and the number of encoded entries
func (b *batch) encode() ([]byte, int, error) {
req, entriesCount := b.createPushRequest()
buf, err := proto.Marshal(req)
if err != nil {
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, entriesCount, nil
}
// encode the batch as json push request, and returns
// the encoded bytes and the number of encoded entries
func (b *batch) encodeJSON() ([]byte, int, error) {
req, entriesCount := b.createPushRequest()
buf, err := json.Marshal(req)
if err != nil {
return nil, 0, err
}
return buf, entriesCount, nil
}
// creates push request and returns it, together with number of entries
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
req := logproto.PushRequest{
Streams: make([]logproto.Stream, 0, len(b.streams)),
}
entriesCount := 0
for _, stream := range b.streams {
req.Streams = append(req.Streams, *stream)
entriesCount += len(stream.Entries)
}
return &req, entriesCount
}

@ -0,0 +1,139 @@
package loki
import (
"fmt"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/lixh00/loki-client-go/pkg/logproto"
)
func TestBatch_add(t *testing.T) {
t.Parallel()
tests := map[string]struct {
inputEntries []entry
expectedSizeBytes int
}{
"empty batch": {
inputEntries: []entry{},
expectedSizeBytes: 0,
},
"single stream with single log entry": {
inputEntries: []entry{
{"tenant", model.LabelSet{}, logEntries[0].Entry},
},
expectedSizeBytes: len(logEntries[0].Entry.Line),
},
"single stream with multiple log entries": {
inputEntries: []entry{
{"tenant", model.LabelSet{}, logEntries[0].Entry},
{"tenant", model.LabelSet{}, logEntries[1].Entry},
},
expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line),
},
"multiple streams with multiple log entries": {
inputEntries: []entry{
{"tenant", model.LabelSet{"type": "a"}, logEntries[0].Entry},
{"tenant", model.LabelSet{"type": "a"}, logEntries[1].Entry},
{"tenant", model.LabelSet{"type": "b"}, logEntries[2].Entry},
},
expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line) + len(logEntries[2].Entry.Line),
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
b := newBatch()
for _, entry := range testData.inputEntries {
b.add(entry)
}
assert.Equal(t, testData.expectedSizeBytes, b.sizeBytes())
})
}
}
func TestBatch_encode(t *testing.T) {
t.Parallel()
tests := map[string]struct {
inputBatch *batch
expectedEntriesCount int
}{
"empty batch": {
inputBatch: newBatch(),
expectedEntriesCount: 0,
},
"single stream with single log entry": {
inputBatch: newBatch(
entry{"tenant", model.LabelSet{}, logEntries[0].Entry},
),
expectedEntriesCount: 1,
},
"single stream with multiple log entries": {
inputBatch: newBatch(
entry{"tenant", model.LabelSet{}, logEntries[0].Entry},
entry{"tenant", model.LabelSet{}, logEntries[1].Entry},
),
expectedEntriesCount: 2,
},
"multiple streams with multiple log entries": {
inputBatch: newBatch(
entry{"tenant", model.LabelSet{"type": "a"}, logEntries[0].Entry},
entry{"tenant", model.LabelSet{"type": "a"}, logEntries[1].Entry},
entry{"tenant", model.LabelSet{"type": "b"}, logEntries[2].Entry},
),
expectedEntriesCount: 3,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
t.Parallel()
_, entriesCount, err := testData.inputBatch.encode()
require.NoError(t, err)
assert.Equal(t, testData.expectedEntriesCount, entriesCount)
})
}
}
func TestHashCollisions(t *testing.T) {
b := newBatch()
ls1 := model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}
ls2 := model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}
require.False(t, ls1.Equal(ls2))
require.Equal(t, ls1.FastFingerprint(), ls2.FastFingerprint())
const entriesPerLabel = 10
for i := 0; i < entriesPerLabel; i++ {
b.add(entry{labels: ls1, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
b.add(entry{labels: ls2, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
}
// make sure that colliding labels are stored properly as independent streams
req, entries := b.createPushRequest()
assert.Len(t, req.Streams, 2)
assert.Equal(t, 2*entriesPerLabel, entries)
if req.Streams[0].Labels == ls1.String() {
assert.Equal(t, ls1.String(), req.Streams[0].Labels)
assert.Equal(t, ls2.String(), req.Streams[1].Labels)
} else {
assert.Equal(t, ls2.String(), req.Streams[0].Labels)
assert.Equal(t, ls1.String(), req.Streams[1].Labels)
}
}

@ -0,0 +1,403 @@
package loki
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/lixh00/loki-client-go/pkg/backoff"
"github.com/prometheus/prometheus/promql/parser"
"github.com/lixh00/loki-client-go/pkg/metric"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"github.com/lixh00/loki-client-go/pkg/helpers"
"github.com/lixh00/loki-client-go/pkg/logproto"
)
const (
protoContentType = "application/x-protobuf"
JSONContentType = "application/json"
maxErrMsgLen = 1024
// Label reserved to override the tenant ID while processing
// pipeline stages
ReservedLabelTenantID = "__tenant_id__"
LatencyLabel = "filename"
HostLabel = "host"
)
var (
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{HostLabel})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{HostLabel})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{HostLabel})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", HostLabel})
batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "batch_retries_total",
Help: "Number of times batches has had to be retried.",
}, []string{HostLabel})
streamLag *metric.Gauges
countersWithHost = []*prometheus.CounterVec{
encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries,
}
UserAgent = fmt.Sprintf("promtail/%s", version.Version)
)
func init() {
prometheus.MustRegister(encodedBytes)
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(droppedBytes)
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
prometheus.MustRegister(batchRetries)
var err error
streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
"Difference between current time and last batch timestamp for successful sends",
metric.GaugeConfig{Action: "set"},
int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric.
)
if err != nil {
panic(err)
}
prometheus.MustRegister(streamLag)
}
// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
logger log.Logger
cfg Config
client *http.Client
quit chan struct{}
once sync.Once
entries chan entry
wg sync.WaitGroup
externalLabels model.LabelSet
}
type entry struct {
tenantID string
labels model.LabelSet
logproto.Entry
}
// New makes a new Client from config
func New(cfg Config) (*Client, error) {
logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowWarn())
return NewWithLogger(cfg, logger)
}
// NewWithDefault creates a new client with default configuration.
func NewWithDefault(url string) (*Client, error) {
cfg, err := NewDefaultConfig(url)
if err != nil {
return nil, err
}
return New(cfg)
}
// NewWithLogger makes a new Client from a logger and a config
func NewWithLogger(cfg Config, logger log.Logger) (*Client, error) {
if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
}
c := &Client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),
externalLabels: cfg.ExternalLabels.LabelSet,
}
err := cfg.Client.Validate()
if err != nil {
return nil, err
}
c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", config.WithKeepAlivesDisabled(), config.WithHTTP2Disabled())
if err != nil {
return nil, err
}
c.client.Timeout = cfg.Timeout
// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range countersWithHost {
counter.WithLabelValues(c.cfg.URL.Host).Add(0)
}
c.wg.Add(1)
go c.run()
return c, nil
}
func (c *Client) run() {
batches := map[string]*batch{}
// Given the client handles multiple batches (1 per tenant) and each batch
// can be created at a different point in time, we look for batches whose
// max wait time has been reached every 10 times per BatchWait, so that the
// maximum delay we have sending batches is 10% of the max waiting time.
// We apply a cap of 10ms to the ticker, to avoid too frequent checks in
// case the BatchWait is very low.
minWaitCheckFrequency := 10 * time.Millisecond
maxWaitCheckFrequency := c.cfg.BatchWait / 10
if maxWaitCheckFrequency < minWaitCheckFrequency {
maxWaitCheckFrequency = minWaitCheckFrequency
}
maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)
defer func() {
// Send all pending batches
for tenantID, batch := range batches {
c.sendBatch(tenantID, batch)
}
c.wg.Done()
}()
for {
select {
case <-c.quit:
return
case e := <-c.entries:
batch, ok := batches[e.tenantID]
// If the batch doesn't exist yet, we create a new one with the entry
if !ok {
batches[e.tenantID] = newBatch(e)
break
}
// If adding the entry to the batch will increase the size over the max
// size allowed, we do send the current batch and then create a new one
if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
c.sendBatch(e.tenantID, batch)
batches[e.tenantID] = newBatch(e)
break
}
// The max size of the batch isn't reached, so we can add the entry
batch.add(e)
case <-maxWaitCheck.C:
// Send all batches whose max wait time has been reached
for tenantID, batch := range batches {
if batch.age() < c.cfg.BatchWait {
continue
}
c.sendBatch(tenantID, batch)
delete(batches, tenantID)
}
}
}
}
func (c *Client) sendBatch(tenantID string, batch *batch) {
var (
err error
buf []byte
entriesCount int
)
if c.cfg.EncodeJson {
buf, entriesCount, err = batch.encodeJSON()
} else {
buf, entriesCount, err = batch.encode()
}
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
}
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
ctx := context.Background()
backoff := backoff.New(ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, tenantID, buf)
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
// is this possible?
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
return
}
var lblSet model.LabelSet
for i := range lbls {
if lbls[i].Name == LatencyLabel {
lblSet = model.LabelSet{
model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host),
model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
}
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
}
return
}
// Only retry 429s, 500s and connection-level errors.
if status > 0 && status != 429 && status/100 != 5 {
break
}
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()
}
if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}
func (c *Client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
if err != nil {
return -1, err
}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", protoContentType)
if c.cfg.EncodeJson {
req.Header.Set("Content-Type", JSONContentType)
}
req.Header.Set("User-Agent", UserAgent)
// If the tenant ID is not empty promtail is running in multi-tenant mode, so
// we should send it to Loki
if tenantID != "" {
req.Header.Set("X-Scope-OrgID", tenantID)
}
resp, err := c.client.Do(req)
if err != nil {
return -1, err
}
defer helpers.LogError(c.logger, "closing response body", resp.Body.Close)
if resp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
}
return resp.StatusCode, err
}
func (c *Client) getTenantID(labels model.LabelSet) string {
// Check if it has been overridden while processing the pipeline stages
if value, ok := labels[ReservedLabelTenantID]; ok {
return string(value)
}
// Check if has been specified in the config
if c.cfg.TenantID != "" {
return c.cfg.TenantID
}
// Defaults to an empty string, which means the X-Scope-OrgID header
// will not be sent
return ""
}
// Stop the client.
func (c *Client) Stop() {
c.once.Do(func() { close(c.quit) })
c.wg.Wait()
}
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
ls = c.externalLabels.Merge(ls)
}
// Get the tenant ID in case it has been overridden while processing
// the pipeline stages, then remove the special label
tenantID := c.getTenantID(ls)
if _, ok := ls[ReservedLabelTenantID]; ok {
// Clone the label set to not manipulate the input one
ls = ls.Clone()
delete(ls, ReservedLabelTenantID)
}
c.entries <- entry{tenantID, ls, logproto.Entry{
Timestamp: t,
Line: s,
}}
return nil
}
func (c *Client) UnregisterLatencyMetric(labels model.LabelSet) {
labels[HostLabel] = model.LabelValue(c.cfg.URL.Host)
streamLag.Delete(labels)
}

@ -0,0 +1,356 @@
package loki
import (
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/lixh00/loki-client-go/pkg/backoff"
"github.com/lixh00/loki-client-go/pkg/httputil"
"github.com/lixh00/loki-client-go/pkg/labelutil"
"github.com/lixh00/loki-client-go/pkg/urlutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/lixh00/loki-client-go/pkg/logproto"
)
var logEntries = []entry{
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
{labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
{labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
{labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
}
type receivedReq struct {
tenantID string
pushReq logproto.PushRequest
}
func TestClient_Handle(t *testing.T) {
tests := map[string]struct {
clientBatchSize int
clientBatchWait time.Duration
clientMaxRetries int
clientTenantID string
serverResponseStatus int
inputEntries []entry
inputDelay time.Duration
expectedReqs []receivedReq
expectedMetrics string
}{
"batch log entries together until the batch size is reached": {
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 200,
inputEntries: []entry{logEntries[0], logEntries[1], logEntries[2]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
`,
},
"batch log entries together until the batch wait time is reached": {
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 200,
inputEntries: []entry{logEntries[0], logEntries[1]},
inputDelay: 110 * time.Millisecond,
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 2.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
`,
},
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
clientBatchSize: 10,
clientBatchWait: 10 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 500,
inputEntries: []entry{logEntries[0]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
`,
},
"do not retry send a batch in case the server responds with a 4xx": {
clientBatchSize: 10,
clientBatchWait: 10 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 400,
inputEntries: []entry{logEntries[0]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
`,
},
"do retry sending a batch in case the server responds with a 429": {
clientBatchSize: 10,
clientBatchWait: 10 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 429,
inputEntries: []entry{logEntries[0]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
`,
},
"batch log entries together honoring the client tenant ID": {
clientBatchSize: 100,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
clientTenantID: "tenant-default",
serverResponseStatus: 200,
inputEntries: []entry{logEntries[0], logEntries[1]},
expectedReqs: []receivedReq{
{
tenantID: "tenant-default",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 2.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
`,
},
"batch log entries together honoring the tenant ID overridden while processing the pipeline stages": {
clientBatchSize: 100,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
clientTenantID: "tenant-default",
serverResponseStatus: 200,
inputEntries: []entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]},
expectedReqs: []receivedReq{
{
tenantID: "tenant-default",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
{
tenantID: "tenant-1",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[3].Entry, logEntries[4].Entry}}}},
},
{
tenantID: "tenant-2",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[5].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 4.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
`,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()
// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)
// Start a local HTTP server
server := httptest.NewServer(createServerHandler(receivedReqsChan, testData.serverResponseStatus))
require.NotNil(t, server)
defer server.Close()
// Get the URL at which the local test server is listening to
serverURL := urlutil.URLValue{}
err := serverURL.Set(server.URL)
require.NoError(t, err)
// Instance the client
cfg := Config{
URL: serverURL,
BatchWait: testData.clientBatchWait,
BatchSize: testData.clientBatchSize,
Client: config.HTTPClientConfig{},
BackoffConfig: backoff.BackoffConfig{MinBackoff: 1 * time.Millisecond, MaxBackoff: 2 * time.Millisecond, MaxRetries: testData.clientMaxRetries},
ExternalLabels: labelutil.LabelSet{},
Timeout: 1 * time.Second,
TenantID: testData.clientTenantID,
}
c, err := NewWithLogger(cfg, log.NewNopLogger())
require.NoError(t, err)
// Send all the input log entries
for i, logEntry := range testData.inputEntries {
err = c.Handle(logEntry.labels, logEntry.Timestamp, logEntry.Line)
require.NoError(t, err)
if testData.inputDelay > 0 && i < len(testData.inputEntries)-1 {
time.Sleep(testData.inputDelay)
}
}
// Wait until the expected push requests are received (with a timeout)
deadline := time.Now().Add(1 * time.Second)
for len(receivedReqsChan) < len(testData.expectedReqs) && time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
}
// Stop the client: it waits until the current batch is sent
c.Stop()
close(receivedReqsChan)
// Get all push requests received on the server side
receivedReqs := make([]receivedReq, 0)
for req := range receivedReqsChan {
receivedReqs = append(receivedReqs, req)
}
// Due to implementation details (maps iteration ordering is random) we just check
// that the expected requests are equal to the received requests, without checking
// the exact order which is not guaranteed in case of multi-tenant
require.ElementsMatch(t, testData.expectedReqs, receivedReqs)
expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
}
func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Parse the request
var pushReq logproto.PushRequest
if err := httputil.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, httputil.RawSnappy); err != nil {
rw.WriteHeader(500)
return
}
receivedReqsChan <- receivedReq{
tenantID: req.Header.Get("X-Scope-OrgID"),
pushReq: pushReq,
}
rw.WriteHeader(status)
})
}
type roundTripFunc func(r *http.Request) (*http.Response, error)
func (s roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return s(r)
}
func TestClient_EncodeJSON(t *testing.T) {
c, err := NewWithDefault("http://loki.com/loki/api/v1/push")
require.NoError(t, err)
c.cfg.EncodeJson = true
c.client.Transport = roundTripFunc(func(r *http.Request) (*http.Response, error) {
require.Equal(t, r.Header.Get("Content-Type"), JSONContentType)
require.Equal(t, r.URL.Path, "/loki/api/v1/push")
b, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Equal(t, `{"streams":[{"stream":{"foo":"bar"},"values":[["1","11"],["2","22"]]},{"stream":{"foo":"buzz"},"values":[["3","33"],["4","44"]]}]}`, string(b))
return &http.Response{StatusCode: 200, Body: http.NoBody}, nil
})
c.sendBatch("",
newBatch(
entry{labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Unix(0, 1), Line: "11"}},
entry{labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Unix(0, 2), Line: "22"}},
entry{labels: model.LabelSet{"foo": "buzz"}, Entry: logproto.Entry{Timestamp: time.Unix(0, 3), Line: "33"}},
entry{labels: model.LabelSet{"foo": "buzz"}, Entry: logproto.Entry{Timestamp: time.Unix(0, 4), Line: "44"}},
),
)
}

@ -0,0 +1,109 @@
package loki
import (
"flag"
"time"
"github.com/lixh00/loki-client-go/pkg/backoff"
"github.com/lixh00/loki-client-go/pkg/labelutil"
"github.com/lixh00/loki-client-go/pkg/urlutil"
"github.com/prometheus/common/config"
)
// NOTE the helm chart for promtail and fluent-bit also have defaults for these values, please update to match if you make changes here.
const (
BatchWait = 1 * time.Second
BatchSize int = 1024 * 1024
MinBackoff = 500 * time.Millisecond
MaxBackoff = 5 * time.Minute
MaxRetries int = 10
Timeout = 10 * time.Second
)
// Config describes configuration for a HTTP pusher client.
type Config struct {
URL urlutil.URLValue
BatchWait time.Duration
BatchSize int
Client config.HTTPClientConfig `yaml:",inline"`
BackoffConfig backoff.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels labelutil.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
// The tenant ID to use when pushing logs to Loki (empty string means
// single tenant mode)
TenantID string `yaml:"tenant_id"`
// Use Loki JSON api as opposed to the snappy protobuf.
EncodeJson bool `yaml:"encode_json"`
}
// NewDefaultConfig creates a default configuration for a given target Loki URL.
func NewDefaultConfig(url string) (Config, error) {
var cfg Config
var u urlutil.URLValue
f := &flag.FlagSet{}
cfg.RegisterFlags(f)
if err := f.Parse(nil); err != nil {
return cfg, err
}
if err := u.Set(url); err != nil {
return cfg, err
}
cfg.URL = u
return cfg, nil
}
// RegisterFlags with prefix registers flags where every name is prefixed by
// prefix. If prefix is a non-empty string, prefix should end with a period.
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&c.URL, prefix+"client.url", "URL of log server")
f.DurationVar(&c.BatchWait, prefix+"client.batch-wait", BatchWait, "Maximum wait period before sending batch.")
f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", BatchSize, "Maximum batch size to accrue before sending. ")
// Default backoff schedule: 0.5s, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(4.267m) For a total time of 511.5s(8.5m) before logs are lost
f.IntVar(&c.BackoffConfig.MaxRetries, prefix+"client.max-retries", MaxRetries, "Maximum number of retires when sending batches.")
f.DurationVar(&c.BackoffConfig.MinBackoff, prefix+"client.min-backoff", MinBackoff, "Initial backoff time between retries.")
f.DurationVar(&c.BackoffConfig.MaxBackoff, prefix+"client.max-backoff", MaxBackoff, "Maximum backoff time between retries.")
f.DurationVar(&c.Timeout, prefix+"client.timeout", Timeout, "Maximum time to wait for server to respond to a request")
f.Var(&c.ExternalLabels, prefix+"client.external-labels", "list of external labels to add to each log (e.g: --client.external-labels=lb1=v1,lb2=v2)")
f.StringVar(&c.TenantID, prefix+"client.tenant-id", "", "Tenant ID to use when pushing logs to Loki.")
f.BoolVar(&c.EncodeJson, prefix+"client.encode-json", false, "Encode payload in JSON, default to snappy protobuf")
}
// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", flags)
}
// UnmarshalYAML implement Yaml Unmarshaler
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
type raw Config
var cfg raw
if c.URL.URL != nil {
// we used flags to set that value, which already has sane default.
cfg = raw(*c)
} else {
// force sane defaults.
cfg = raw{
BackoffConfig: backoff.BackoffConfig{
MaxBackoff: MaxBackoff,
MaxRetries: MaxRetries,
MinBackoff: MinBackoff,
},
BatchSize: BatchSize,
BatchWait: BatchWait,
Timeout: Timeout,
}
}
if err := unmarshal(&cfg); err != nil {
return err
}
*c = Config(cfg)
return nil
}

@ -0,0 +1,94 @@
package loki
import (
"net/url"
"reflect"
"testing"
"time"
"github.com/lixh00/loki-client-go/pkg/backoff"
"github.com/lixh00/loki-client-go/pkg/urlutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)
var clientConfig = Config{}
var clientDefaultConfig = (`
url: http://localhost:3100/loki/api/v1/push
`)
var clientCustomConfig = `
url: http://localhost:3100/loki/api/v1/push
backoff_config:
max_retries: 20
min_period: 5s
max_period: 1m
batchwait: 5s
batchsize: 204800
timeout: 5s
`
func Test_Config(t *testing.T) {
u, err := url.Parse("http://localhost:3100/loki/api/v1/push")
require.NoError(t, err)
tests := []struct {
configValues string
expectedConfig Config
}{
{
clientDefaultConfig,
Config{
URL: urlutil.URLValue{
URL: u,
},
BackoffConfig: backoff.BackoffConfig{
MaxBackoff: MaxBackoff,
MaxRetries: MaxRetries,
MinBackoff: MinBackoff,
},
BatchSize: BatchSize,
BatchWait: BatchWait,
Timeout: Timeout,
},
},
{
clientCustomConfig,
Config{
URL: urlutil.URLValue{
URL: u,
},
BackoffConfig: backoff.BackoffConfig{
MaxBackoff: 1 * time.Minute,
MaxRetries: 20,
MinBackoff: 5 * time.Second,
},
BatchSize: 100 * 2048,
BatchWait: 5 * time.Second,
Timeout: 5 * time.Second,
},
},
}
for _, tc := range tests {
err := yaml.Unmarshal([]byte(tc.configValues), &clientConfig)
require.NoError(t, err)
if !reflect.DeepEqual(tc.expectedConfig, clientConfig) {
t.Errorf("Configs does not match, expected: %v, received: %v", tc.expectedConfig, clientConfig)
}
}
}
func TestDefaultConfig(t *testing.T) {
cfg, err := NewDefaultConfig("http://loki.com")
assert.Nil(t, err)
assert.Equal(t, cfg.BatchSize, BatchSize)
assert.Equal(t, cfg.BatchWait, BatchWait)
assert.Equal(t, cfg.Timeout, Timeout)
assert.Equal(t, cfg.BackoffConfig.MaxBackoff, MaxBackoff)
assert.Equal(t, cfg.BackoffConfig.MinBackoff, MinBackoff)
assert.Equal(t, cfg.BackoffConfig.MaxRetries, MaxRetries)
assert.Equal(t, cfg.URL.URL.String(), "http://loki.com")
}

@ -0,0 +1,117 @@
package backoff
import (
"context"
"flag"
"fmt"
"math/rand"
"time"
)
// BackoffConfig configures a Backoff
type BackoffConfig struct {
MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level
MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level
MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries
}
// RegisterFlags for BackoffConfig.
func (cfg *BackoffConfig) RegisterFlags(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.")
f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.")
f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.")
}
// Backoff implements exponential backoff with randomized wait times
type Backoff struct {
cfg BackoffConfig
ctx context.Context
numRetries int
nextDelayMin time.Duration
nextDelayMax time.Duration
}
// New creates a Backoff object. Pass a Context that can also terminate the operation.
func New(ctx context.Context, cfg BackoffConfig) *Backoff {
return &Backoff{
cfg: cfg,
ctx: ctx,
nextDelayMin: cfg.MinBackoff,
nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff),
}
}
// Reset the Backoff back to its initial condition
func (b *Backoff) Reset() {
b.numRetries = 0
b.nextDelayMin = b.cfg.MinBackoff
b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff)
}
// Ongoing returns true if caller should keep going
func (b *Backoff) Ongoing() bool {
// Stop if Context has errored or max retry count is exceeded
return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries)
}
// Err returns the reason for terminating the backoff, or nil if it didn't terminate
func (b *Backoff) Err() error {
if b.ctx.Err() != nil {
return b.ctx.Err()
}
if b.cfg.MaxRetries != 0 && b.numRetries >= b.cfg.MaxRetries {
return fmt.Errorf("terminated after %d retries", b.numRetries)
}
return nil
}
// NumRetries returns the number of retries so far
func (b *Backoff) NumRetries() int {
return b.numRetries
}
// Wait sleeps for the backoff time then increases the retry count and backoff time
// Returns immediately if Context is terminated
func (b *Backoff) Wait() {
// Increase the number of retries and get the next delay
sleepTime := b.NextDelay()
if b.Ongoing() {
select {
case <-b.ctx.Done():
case <-time.After(sleepTime):
}
}
}
func (b *Backoff) NextDelay() time.Duration {
b.numRetries++
// Handle the edge case the min and max have the same value
// (or due to some misconfig max is < min)
if b.nextDelayMin >= b.nextDelayMax {
return b.nextDelayMin
}
// Add a jitter within the next exponential backoff range
sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin)))
// Apply the exponential backoff to calculate the next jitter
// range, unless we've already reached the max
if b.nextDelayMax < b.cfg.MaxBackoff {
b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff)
b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff)
}
return sleepTime
}
func doubleDuration(value time.Duration, max time.Duration) time.Duration {
value = value * 2
if value <= max {
return value
}
return max
}

@ -0,0 +1,18 @@
package helpers
import (
"io/ioutil"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
)
// LoadConfig read YAML-formatted config from filename into cfg.
func LoadConfig(filename string, cfg interface{}) error {
buf, err := ioutil.ReadFile(filename)
if err != nil {
return errors.Wrap(err, "Error reading config file")
}
return yaml.UnmarshalStrict(buf, cfg)
}

@ -0,0 +1,13 @@
package helpers
import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
// LogError logs any error returned by f; useful when deferring Close etc.
func LogError(logger log.Logger, message string, f func() error) {
if err := f(); err != nil {
level.Error(logger).Log("message", message, "error", err)
}
}

@ -0,0 +1,9 @@
package helpers
// MinUint32 return the min of a and b.
func MinUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}

@ -0,0 +1,164 @@
package httputil
import (
"bytes"
"context"
"encoding/json"
"fmt"
"html/template"
"io"
"net/http"
"strings"
"github.com/blang/semver"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
)
// WriteJSONResponse writes some JSON as a HTTP response.
func WriteJSONResponse(w http.ResponseWriter, v interface{}) {
data, err := json.Marshal(v)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err = w.Write(data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
}
// RenderHTTPResponse either responds with json or a rendered html page using the passed in template
// by checking the Accepts header
func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request) {
accept := r.Header.Get("Accept")
if strings.Contains(accept, "application/json") {
WriteJSONResponse(w, v)
return
}
err := t.Execute(w, v)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// CompressionType for encoding and decoding requests and responses.
type CompressionType int
// Values for CompressionType
const (
NoCompression CompressionType = iota
FramedSnappy
RawSnappy
)
var rawSnappyFromVersion = semver.MustParse("0.1.0")
// CompressionTypeFor a given version of the Prometheus remote storage protocol.
// See https://github.com/prometheus/prometheus/issues/2692.
func CompressionTypeFor(version string) CompressionType {
ver, err := semver.Make(version)
if err != nil {
return FramedSnappy
}
if ver.GTE(rawSnappyFromVersion) {
return RawSnappy
}
return FramedSnappy
}
// ParseProtoReader parses a compressed proto from an io.Reader.
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
var body []byte
var err error
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
}
var buf bytes.Buffer
if expectedSize > 0 {
if expectedSize > maxSize {
return fmt.Errorf("message expected size larger than max (%d vs %d)", expectedSize, maxSize)
}
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
switch compression {
case NoCompression:
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
_, err = buf.ReadFrom(io.LimitReader(reader, int64(maxSize)+1))
body = buf.Bytes()
case FramedSnappy:
_, err = buf.ReadFrom(io.LimitReader(snappy.NewReader(reader), int64(maxSize)+1))
body = buf.Bytes()
case RawSnappy:
_, err = buf.ReadFrom(reader)
body = buf.Bytes()
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"),
otlog.Int("size", len(body)))
}
if err == nil && len(body) <= maxSize {
body, err = snappy.Decode(nil, body)
}
}
if err != nil {
return err
}
if len(body) > maxSize {
return fmt.Errorf("received message larger than max (%d vs %d)", len(body), maxSize)
}
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"),
otlog.Int("size", len(body)))
}
// We re-implement proto.Unmarshal here as it calls XXX_Unmarshal first,
// which we can't override without upsetting golint.
req.Reset()
if u, ok := req.(proto.Unmarshaler); ok {
err = u.Unmarshal(body)
} else {
err = proto.NewBuffer(body).Unmarshal(req)
}
if err != nil {
return err
}
return nil
}
// SerializeProtoResponse serializes a protobuf response into an HTTP response.
func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error {
data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("error marshaling proto response: %v", err)
}
switch compression {
case NoCompression:
case FramedSnappy:
buf := bytes.Buffer{}
writer := snappy.NewBufferedWriter(&buf)
if _, err := writer.Write(data); err != nil {
return err
}
writer.Close()
data = buf.Bytes()
case RawSnappy:
data = snappy.Encode(nil, data)
}
if _, err := w.Write(data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("error sending proto response: %v", err)
}
return nil
}

@ -0,0 +1,99 @@
package labelutil
import (
"bytes"
"encoding/csv"
"fmt"
"strings"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
)
// LabelSet is a labelSet that can be used as a flag.
type LabelSet struct {
model.LabelSet `yaml:",inline"`
}
// String implements flag.Value
// Format: a=1,b=2
func (v LabelSet) String() string {
if v.LabelSet == nil {
return ""
}
records := make([]string, 0, len(v.LabelSet)>>1)
for k, v := range v.LabelSet {
records = append(records, string(k)+"="+string(v))
}
var buf bytes.Buffer
w := csv.NewWriter(&buf)
if err := w.Write(records); err != nil {
panic(err)
}
w.Flush()
return "[" + strings.TrimSpace(buf.String()) + "]"
}
// Set implements flag.Value
func (v *LabelSet) Set(s string) error {
var ss []string
n := strings.Count(s, "=")
switch n {
case 0:
return fmt.Errorf("%s must be formatted as key=value", s)
case 1:
ss = append(ss, strings.Trim(s, `"`))
default:
r := csv.NewReader(strings.NewReader(s))
var err error
ss, err = r.Read()
if err != nil {
return err
}
}
out := model.LabelSet{}
for _, pair := range ss {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("%s must be formatted as key=value", pair)
}
out[model.LabelName(kv[0])] = model.LabelValue(kv[1])
}
if err := out.Validate(); err != nil {
return err
}
v.LabelSet = out
return nil
}
// UnmarshalYAML the Unmarshaler interface of the yaml pkg.
func (v *LabelSet) UnmarshalYAML(unmarshal func(interface{}) error) error {
lbSet := model.LabelSet{}
err := unmarshal(&lbSet)
if err != nil {
return err
}
v.LabelSet = lbSet
return nil
}
// MarshalYAML implements yaml.Marshaller.
func (v LabelSet) MarshalYAML() (interface{}, error) {
out, err := yaml.Marshal(ModelLabelSetToMap(v.LabelSet))
if err != nil {
return nil, err
}
return string(out), nil
}
// ModelLabelSetToMap convert a model.LabelSet to a map[string]string
func ModelLabelSetToMap(m model.LabelSet) map[string]string {
result := map[string]string{}
for k, v := range m {
result[string(k)] = string(v)
}
return result
}

@ -0,0 +1,23 @@
package logproto
import "github.com/prometheus/prometheus/pkg/labels"
// Note, this is not very efficient and use should be minimized as it requires label construction on each comparison
type SeriesIdentifiers []SeriesIdentifier
func (ids SeriesIdentifiers) Len() int { return len(ids) }
func (ids SeriesIdentifiers) Swap(i, j int) { ids[i], ids[j] = ids[j], ids[i] }
func (ids SeriesIdentifiers) Less(i, j int) bool {
a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels)
return labels.Compare(a, b) <= 0
}
type Streams []Stream
func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }
func (s Series) Len() int { return len(s.Samples) }
func (s Series) Swap(i, j int) { s.Samples[i], s.Samples[j] = s.Samples[j], s.Samples[i] }
func (s Series) Less(i, j int) bool { return s.Samples[i].Timestamp < s.Samples[j].Timestamp }

File diff suppressed because it is too large Load Diff

@ -0,0 +1,166 @@
syntax = "proto3";
package logproto;
option go_package = "github.com/lixh00/loki-client-go/pkg/logproto";
import "google/protobuf/timestamp.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
service Pusher {
rpc Push(PushRequest) returns (PushResponse) {};
}
service Querier {
rpc Query(QueryRequest) returns (stream QueryResponse) {};
rpc QuerySample(SampleQueryRequest) returns (stream SampleQueryResponse) {};
rpc Label(LabelRequest) returns (LabelResponse) {};
rpc Tail(TailRequest) returns (stream TailResponse) {};
rpc Series(SeriesRequest) returns (SeriesResponse) {};
rpc TailersCount(TailersCountRequest) returns (TailersCountResponse) {};
rpc GetChunkIDs(GetChunkIDsRequest) returns (GetChunkIDsResponse) {}; // GetChunkIDs returns ChunkIDs from the index store holding logs for given selectors and time-range.
}
service Ingester {
rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {};
}
message PushRequest {
repeated StreamAdapter streams = 1 [(gogoproto.jsontag) = "streams", (gogoproto.customtype) = "Stream"];
}
message PushResponse {
}
message QueryRequest {
string selector = 1;
uint32 limit = 2;
google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
Direction direction = 5;
reserved 6;
repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"];
}
message SampleQueryRequest {
string selector = 1;
google.protobuf.Timestamp start = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"];
}
message SampleQueryResponse {
repeated Series series = 1 [(gogoproto.customtype) = "Series", (gogoproto.nullable) = true];
}
enum Direction {
FORWARD = 0;
BACKWARD = 1;
}
message QueryResponse {
repeated StreamAdapter streams = 1 [(gogoproto.customtype) = "Stream", (gogoproto.nullable) = true];
}
message LabelRequest {
string name = 1;
bool values = 2; // True to fetch label values, false for fetch labels names.
google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
}
message LabelResponse {
repeated string values = 1;
}
message StreamAdapter {
string labels = 1 [(gogoproto.jsontag) = "labels"];
repeated EntryAdapter entries = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "entries"];
}
message EntryAdapter {
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false, (gogoproto.jsontag) = "ts"];
string line = 2 [(gogoproto.jsontag) = "line"];
}
message Sample {
int64 timestamp = 1 [(gogoproto.jsontag) = "ts"];
double value = 2 [(gogoproto.jsontag) = "value"];
uint64 hash = 3 [(gogoproto.jsontag) = "hash"];
}
message Series {
string labels = 1 [(gogoproto.jsontag) = "labels"];
repeated Sample samples = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "samples"];
}
message TailRequest {
string query = 1;
reserved 2;
uint32 delayFor = 3;
uint32 limit = 4;
google.protobuf.Timestamp start = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message TailResponse {
StreamAdapter stream = 1 [(gogoproto.customtype) = "Stream"];
repeated DroppedStream droppedStreams = 2;
}
message SeriesRequest {
google.protobuf.Timestamp start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
repeated string groups = 3;
}
message SeriesResponse {
repeated SeriesIdentifier series = 1 [(gogoproto.nullable) = false];
}
message SeriesIdentifier {
map<string,string> labels = 1;
}
message DroppedStream {
google.protobuf.Timestamp from = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string labels = 3;
}
message TimeSeriesChunk {
string from_ingester_id = 1;
string user_id = 2;
repeated LabelPair labels = 3;
repeated Chunk chunks = 4;
}
message LabelPair {
string name = 1;
string value = 2;
}
message Chunk {
bytes data = 1;
}
message TransferChunksResponse {
}
message TailersCountRequest {
}
message TailersCountResponse {
uint32 count = 1;
}
message GetChunkIDsRequest {
string matchers = 1;
google.protobuf.Timestamp start = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message GetChunkIDsResponse {
repeated string chunkIDs = 1;
}

@ -0,0 +1,106 @@
package logproto
import (
"errors"
strconv "strconv"
time "time"
"github.com/gogo/protobuf/types"
)
const (
// Seconds field of the earliest valid Timestamp.
// This is time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
minValidSeconds = -62135596800
// Seconds field just after the latest valid Timestamp.
// This is time.Date(10000, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
maxValidSeconds = 253402300800
)
// validateTimestamp determines whether a Timestamp is valid.
// A valid timestamp represents a time in the range
// [0001-01-01, 10000-01-01) and has a Nanos field
// in the range [0, 1e9).
//
// If the Timestamp is valid, validateTimestamp returns nil.
// Otherwise, it returns an error that describes
// the problem.
//
// Every valid Timestamp can be represented by a time.Time, but the converse is not true.
func validateTimestamp(ts *types.Timestamp) error {
if ts == nil {
return errors.New("timestamp: nil Timestamp")
}
if ts.Seconds < minValidSeconds {
return errors.New("timestamp: " + formatTimestamp(ts) + " before 0001-01-01")
}
if ts.Seconds >= maxValidSeconds {
return errors.New("timestamp: " + formatTimestamp(ts) + " after 10000-01-01")
}
if ts.Nanos < 0 || ts.Nanos >= 1e9 {
return errors.New("timestamp: " + formatTimestamp(ts) + ": nanos not in range [0, 1e9)")
}
return nil
}
// formatTimestamp is equivalent to fmt.Sprintf("%#v", ts)
// but avoids the escape incurred by using fmt.Sprintf, eliminating
// unnecessary heap allocations.
func formatTimestamp(ts *types.Timestamp) string {
if ts == nil {
return "nil"
}
seconds := strconv.FormatInt(ts.Seconds, 10)
nanos := strconv.FormatInt(int64(ts.Nanos), 10)
return "&types.Timestamp{Seconds: " + seconds + ",\nNanos: " + nanos + ",\n}"
}
func SizeOfStdTime(t time.Time) int {
ts, err := timestampProto(t)
if err != nil {
return 0
}
return ts.Size()
}
func StdTimeMarshalTo(t time.Time, data []byte) (int, error) {
ts, err := timestampProto(t)
if err != nil {
return 0, err
}
return ts.MarshalTo(data)
}
func StdTimeUnmarshal(t *time.Time, data []byte) error {
ts := &types.Timestamp{}
if err := ts.Unmarshal(data); err != nil {
return err
}
tt, err := timestampFromProto(ts)
if err != nil {
return err
}
*t = tt
return nil
}
func timestampFromProto(ts *types.Timestamp) (time.Time, error) {
// Don't return the zero value on error, because corresponds to a valid
// timestamp. Instead return whatever time.Unix gives us.
var t time.Time
if ts == nil {
t = time.Unix(0, 0).UTC() // treat nil like the empty Timestamp
} else {
t = time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
}
return t, validateTimestamp(ts)
}
func timestampProto(t time.Time) (types.Timestamp, error) {
ts := types.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
return ts, validateTimestamp(&ts)
}

@ -0,0 +1,475 @@
package logproto
import (
"fmt"
"io"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/prometheus/promql/parser"
)
// Stream contains a unique labels set as a string and a set of entries for it.
// We are not using the proto generated version but this custom one so that we
// can improve serialization see benchmark.
type Stream struct {
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
}
// MarshalJSON implements the json.Marshaler interface.
func (r *PushRequest) MarshalJSON() ([]byte, error) {
stream := jsoniter.ConfigDefault.BorrowStream(nil)
defer jsoniter.ConfigDefault.ReturnStream(stream)
stream.WriteObjectStart()
stream.WriteObjectField("streams")
stream.WriteArrayStart()
for i, s := range r.Streams {
stream.WriteObjectStart()
stream.WriteObjectField("stream")
stream.WriteObjectStart()
lbs, err := parser.ParseMetric(s.Labels)
if err != nil {
continue
}
for i, lb := range lbs {
stream.WriteObjectField(lb.Name)
stream.WriteStringWithHTMLEscaped(lb.Value)
if i != len(lbs)-1 {
stream.WriteMore()
}
}
stream.WriteObjectEnd()
stream.WriteMore()
stream.WriteObjectField("values")
stream.WriteArrayStart()
for i, entry := range s.Entries {
stream.WriteArrayStart()
stream.WriteRaw(fmt.Sprintf(`"%d"`, entry.Timestamp.UnixNano()))
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(entry.Line)
stream.WriteArrayEnd()
if i != len(s.Entries)-1 {
stream.WriteMore()
}
}
stream.WriteArrayEnd()
stream.WriteObjectEnd()
if i != len(r.Streams)-1 {
stream.WriteMore()
}
}
stream.WriteArrayEnd()
stream.WriteObjectEnd()
return stream.Buffer(), nil
}
// Entry is a log entry with a timestamp.
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
}
func (m *Stream) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Stream) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Labels) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels)))
i += copy(dAtA[i:], m.Labels)
}
if len(m.Entries) > 0 {
for _, msg := range m.Entries {
dAtA[i] = 0x12
i++
i = encodeVarintLogproto(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *Entry) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Entry) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintLogproto(dAtA, i, uint64(SizeOfStdTime(m.Timestamp)))
n5, err := StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err != nil {
return 0, err
}
i += n5
if len(m.Line) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Line)))
i += copy(dAtA[i:], m.Line)
}
return i, nil
}
func (m *Stream) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Stream: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Stream: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Labels = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Entries = append(m.Entries, Entry{})
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Entry) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Entry: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Entry: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthLogproto
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthLogproto
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Stream) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Labels)
if l > 0 {
n += 1 + l + sovLogproto(uint64(l))
}
if len(m.Entries) > 0 {
for _, e := range m.Entries {
l = e.Size()
n += 1 + l + sovLogproto(uint64(l))
}
}
return n
}
func (m *Entry) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = SizeOfStdTime(m.Timestamp)
n += 1 + l + sovLogproto(uint64(l))
l = len(m.Line)
if l > 0 {
n += 1 + l + sovLogproto(uint64(l))
}
return n
}
func (m *Stream) Equal(that interface{}) bool {
if that == nil {
return m == nil
}
that1, ok := that.(*Stream)
if !ok {
that2, ok := that.(Stream)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return m == nil
} else if m == nil {
return false
}
if m.Labels != that1.Labels {
return false
}
if len(m.Entries) != len(that1.Entries) {
return false
}
for i := range m.Entries {
if !m.Entries[i].Equal(that1.Entries[i]) {
return false
}
}
return true
}
func (m *Entry) Equal(that interface{}) bool {
if that == nil {
return m == nil
}
that1, ok := that.(*Entry)
if !ok {
that2, ok := that.(Entry)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return m == nil
} else if m == nil {
return false
}
if !m.Timestamp.Equal(that1.Timestamp) {
return false
}
if m.Line != that1.Line {
return false
}
return true
}

@ -0,0 +1,111 @@
package logproto
import (
"testing"
time "time"
"github.com/stretchr/testify/require"
)
var (
now = time.Now().UTC()
line = `level=info ts=2019-12-12T15:00:08.325Z caller=compact.go:441 component=tsdb msg="compact blocks" count=3 mint=1576130400000 maxt=1576152000000 ulid=01DVX9ZHNM71GRCJS7M34Q0EV7 sources="[01DVWNC6NWY1A60AZV3Z6DGS65 01DVWW7XXX75GHA6ZDTD170CSZ 01DVX33N5W86CWJJVRPAVXJRWJ]" duration=2.897213221s`
stream = Stream{
Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`,
Entries: []Entry{
{now, line},
{now.Add(1 * time.Second), line},
{now.Add(2 * time.Second), line},
{now.Add(3 * time.Second), line},
},
}
streamAdapter = StreamAdapter{
Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`,
Entries: []EntryAdapter{
{now, line},
{now.Add(1 * time.Second), line},
{now.Add(2 * time.Second), line},
{now.Add(3 * time.Second), line},
},
}
)
func TestStream(t *testing.T) {
avg := testing.AllocsPerRun(200, func() {
b, err := stream.Marshal()
require.NoError(t, err)
var new Stream
err = new.Unmarshal(b)
require.NoError(t, err)
require.Equal(t, stream, new)
})
t.Log("avg allocs per run:", avg)
}
func TestStreamAdapter(t *testing.T) {
avg := testing.AllocsPerRun(200, func() {
b, err := streamAdapter.Marshal()
require.NoError(t, err)
var new StreamAdapter
err = new.Unmarshal(b)
require.NoError(t, err)
require.Equal(t, streamAdapter, new)
})
t.Log("avg allocs per run:", avg)
}
func TestCompatibility(t *testing.T) {
b, err := stream.Marshal()
require.NoError(t, err)
var adapter StreamAdapter
err = adapter.Unmarshal(b)
require.NoError(t, err)
require.Equal(t, streamAdapter, adapter)
ba, err := adapter.Marshal()
require.NoError(t, err)
require.Equal(t, b, ba)
var new Stream
err = new.Unmarshal(ba)
require.NoError(t, err)
require.Equal(t, stream, new)
}
func BenchmarkStream(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
by, err := stream.Marshal()
if err != nil {
b.Fatal(err)
}
var new Stream
err = new.Unmarshal(by)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkStreamAdapter(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
by, err := streamAdapter.Marshal()
if err != nil {
b.Fatal(err)
}
var new StreamAdapter
err = new.Unmarshal(by)
if err != nil {
b.Fatal(err)
}
}
}

@ -0,0 +1,117 @@
package metric
import (
"strings"
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
const (
CounterInc = "inc"
CounterAdd = "add"
ErrCounterActionRequired = "counter action must be defined as either `inc` or `add`"
ErrCounterInvalidAction = "action %s is not valid, action must be either `inc` or `add`"
ErrCounterInvalidMatchAll = "`match_all: true` cannot be combined with `value`, please remove `match_all` or `value`"
ErrCounterInvalidCountBytes = "`count_entry_bytes: true` can only be set with `match_all: true`"
ErrCounterInvalidCountBytesAction = "`count_entry_bytes: true` can only be used with `action: add`"
)
type CounterConfig struct {
MatchAll *bool `mapstructure:"match_all"`
CountBytes *bool `mapstructure:"count_entry_bytes"`
Value *string `mapstructure:"value"`
Action string `mapstructure:"action"`
}
func validateCounterConfig(config *CounterConfig) error {
if config.Action == "" {
return errors.New(ErrCounterActionRequired)
}
config.Action = strings.ToLower(config.Action)
if config.Action != CounterInc && config.Action != CounterAdd {
return errors.Errorf(ErrCounterInvalidAction, config.Action)
}
if config.MatchAll != nil && *config.MatchAll && config.Value != nil {
return errors.Errorf(ErrCounterInvalidMatchAll)
}
if config.CountBytes != nil && *config.CountBytes && (config.MatchAll == nil || !*config.MatchAll) {
return errors.New(ErrCounterInvalidCountBytes)
}
if config.CountBytes != nil && *config.CountBytes && config.Action != CounterAdd {
return errors.New(ErrCounterInvalidCountBytesAction)
}
return nil
}
func parseCounterConfig(config interface{}) (*CounterConfig, error) {
cfg := &CounterConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Counters is a vec tor of counters for a each log stream.
type Counters struct {
*metricVec
Cfg *CounterConfig
}
// NewCounters creates a new counter vec.
func NewCounters(name, help string, config interface{}, maxIdleSec int64) (*Counters, error) {
cfg, err := parseCounterConfig(config)
if err != nil {
return nil, err
}
err = validateCounterConfig(cfg)
if err != nil {
return nil, err
}
return &Counters{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return &expiringCounter{prometheus.NewCounter(prometheus.CounterOpts{
Help: help,
Name: name,
ConstLabels: labels,
}),
0,
}
}, maxIdleSec),
Cfg: cfg,
}, nil
}
// With returns the counter associated with a stream labelset.
func (c *Counters) With(labels model.LabelSet) prometheus.Counter {
return c.metricVec.With(labels).(prometheus.Counter)
}
type expiringCounter struct {
prometheus.Counter
lastModSec int64
}
// Inc increments the counter by 1. Use Add to increment it by arbitrary
// non-negative values.
func (e *expiringCounter) Inc() {
e.Counter.Inc()
e.lastModSec = time.Now().Unix()
}
// Add adds the given value to the counter. It panics if the value is <
// 0.
func (e *expiringCounter) Add(val float64) {
e.Counter.Add(val)
e.lastModSec = time.Now().Unix()
}
// HasExpired implements Expirable
func (e *expiringCounter) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-e.lastModSec >= maxAgeSec
}

@ -0,0 +1,140 @@
package metric
import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
var (
counterTestTrue = true
counterTestFalse = false
counterTestVal = "some val"
)
func Test_validateCounterConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
config CounterConfig
err error
}{
{"invalid action",
CounterConfig{
Action: "del",
},
errors.Errorf(ErrCounterInvalidAction, "del"),
},
{"invalid counter match all",
CounterConfig{
MatchAll: &counterTestTrue,
Value: &counterTestVal,
Action: "inc",
},
errors.New(ErrCounterInvalidMatchAll),
},
{"invalid counter match bytes",
CounterConfig{
MatchAll: nil,
CountBytes: &counterTestTrue,
Action: "add",
},
errors.New(ErrCounterInvalidCountBytes),
},
{"invalid counter match bytes action",
CounterConfig{
MatchAll: &counterTestTrue,
CountBytes: &counterTestTrue,
Action: "inc",
},
errors.New(ErrCounterInvalidCountBytesAction),
},
{"valid counter match bytes",
CounterConfig{
MatchAll: &counterTestTrue,
CountBytes: &counterTestTrue,
Action: "add",
},
nil,
},
{"valid",
CounterConfig{
Value: &counterTestVal,
Action: "inc",
},
nil,
},
{"valid match all is false",
CounterConfig{
MatchAll: &counterTestFalse,
Value: &counterTestVal,
Action: "inc",
},
nil,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := validateCounterConfig(&tt.config)
if ((err != nil) && (err.Error() != tt.err.Error())) || (err == nil && tt.err != nil) {
t.Errorf("Metrics stage validation error, expected error = %v, actual error = %v", tt.err, err)
return
}
})
}
}
func TestCounterExpiration(t *testing.T) {
t.Parallel()
cfg := CounterConfig{
Action: "inc",
}
cnt, err := NewCounters("test1", "HELP ME!!!!!", cfg, 1)
assert.Nil(t, err)
// Create a label and increment the counter
lbl1 := model.LabelSet{}
lbl1["test"] = "i don't wanna make this a constant"
cnt.With(lbl1).Inc()
// Collect the metrics, should still find the metric in the map
collect(cnt)
assert.Contains(t, cnt.metrics, lbl1.Fingerprint())
time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec
//Add another counter with new label val
lbl2 := model.LabelSet{}
lbl2["test"] = "eat this linter"
cnt.With(lbl2).Inc()
// Collect the metrics, first counter should have expired and removed, second should still be present
collect(cnt)
assert.NotContains(t, cnt.metrics, lbl1.Fingerprint())
assert.Contains(t, cnt.metrics, lbl2.Fingerprint())
}
func collect(c prometheus.Collector) {
done := make(chan struct{})
collector := make(chan prometheus.Metric)
go func() {
defer close(done)
c.Collect(collector)
}()
for {
select {
case <-collector:
case <-done:
return
}
}
}

@ -0,0 +1,136 @@
package metric
import (
"strings"
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
const (
GaugeSet = "set"
GaugeInc = "inc"
GaugeDec = "dec"
GaugeAdd = "add"
GaugeSub = "sub"
ErrGaugeActionRequired = "gauge action must be defined as `set`, `inc`, `dec`, `add`, or `sub`"
ErrGaugeInvalidAction = "action %s is not valid, action must be `set`, `inc`, `dec`, `add`, or `sub`"
)
type GaugeConfig struct {
Value *string `mapstructure:"value"`
Action string `mapstructure:"action"`
}
func validateGaugeConfig(config *GaugeConfig) error {
if config.Action == "" {
return errors.New(ErrGaugeActionRequired)
}
config.Action = strings.ToLower(config.Action)
if config.Action != GaugeSet &&
config.Action != GaugeInc &&
config.Action != GaugeDec &&
config.Action != GaugeAdd &&
config.Action != GaugeSub {
return errors.Errorf(ErrGaugeInvalidAction, config.Action)
}
return nil
}
func parseGaugeConfig(config interface{}) (*GaugeConfig, error) {
cfg := &GaugeConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Gauges is a vector of gauges for a each log stream.
type Gauges struct {
*metricVec
Cfg *GaugeConfig
}
// NewGauges creates a new gauge vec.
func NewGauges(name, help string, config interface{}, maxIdleSec int64) (*Gauges, error) {
cfg, err := parseGaugeConfig(config)
if err != nil {
return nil, err
}
err = validateGaugeConfig(cfg)
if err != nil {
return nil, err
}
return &Gauges{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return &expiringGauge{prometheus.NewGauge(prometheus.GaugeOpts{
Help: help,
Name: name,
ConstLabels: labels,
}),
0,
}
}, maxIdleSec),
Cfg: cfg,
}, nil
}
// With returns the gauge associated with a stream labelset.
func (g *Gauges) With(labels model.LabelSet) prometheus.Gauge {
return g.metricVec.With(labels).(prometheus.Gauge)
}
type expiringGauge struct {
prometheus.Gauge
lastModSec int64
}
// Set sets the Gauge to an arbitrary value.
func (g *expiringGauge) Set(val float64) {
g.Gauge.Set(val)
g.lastModSec = time.Now().Unix()
}
// Inc increments the Gauge by 1. Use Add to increment it by arbitrary
// values.
func (g *expiringGauge) Inc() {
g.Gauge.Inc()
g.lastModSec = time.Now().Unix()
}
// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary
// values.
func (g *expiringGauge) Dec() {
g.Gauge.Dec()
g.lastModSec = time.Now().Unix()
}
// Add adds the given value to the Gauge. (The value can be negative,
// resulting in a decrease of the Gauge.)
func (g *expiringGauge) Add(val float64) {
g.Gauge.Add(val)
g.lastModSec = time.Now().Unix()
}
// Sub subtracts the given value from the Gauge. (The value can be
// negative, resulting in an increase of the Gauge.)
func (g *expiringGauge) Sub(val float64) {
g.Gauge.Sub(val)
g.lastModSec = time.Now().Unix()
}
// SetToCurrentTime sets the Gauge to the current Unix time in seconds.
func (g *expiringGauge) SetToCurrentTime() {
g.Gauge.SetToCurrentTime()
g.lastModSec = time.Now().Unix()
}
// HasExpired implements Expirable
func (g *expiringGauge) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-g.lastModSec >= maxAgeSec
}

@ -0,0 +1,40 @@
package metric
import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
func TestGaugeExpiration(t *testing.T) {
t.Parallel()
cfg := GaugeConfig{
Action: "inc",
}
gag, err := NewGauges("test1", "HELP ME!!!!!", cfg, 1)
assert.Nil(t, err)
// Create a label and increment the gauge
lbl1 := model.LabelSet{}
lbl1["test"] = "app"
gag.With(lbl1).Inc()
// Collect the metrics, should still find the metric in the map
collect(gag)
assert.Contains(t, gag.metrics, lbl1.Fingerprint())
time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec
//Add another gauge with new label val
lbl2 := model.LabelSet{}
lbl2["test"] = "app2"
gag.With(lbl2).Inc()
// Collect the metrics, first gauge should have expired and removed, second should still be present
collect(gag)
assert.NotContains(t, gag.metrics, lbl1.Fingerprint())
assert.Contains(t, gag.metrics, lbl2.Fingerprint())
}

@ -0,0 +1,79 @@
package metric
import (
"time"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
type HistogramConfig struct {
Value *string `mapstructure:"value"`
Buckets []float64 `mapstructure:"buckets"`
}
func validateHistogramConfig(config *HistogramConfig) error {
return nil
}
func parseHistogramConfig(config interface{}) (*HistogramConfig, error) {
cfg := &HistogramConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Histograms is a vector of histograms for a each log stream.
type Histograms struct {
*metricVec
Cfg *HistogramConfig
}
// NewHistograms creates a new histogram vec.
func NewHistograms(name, help string, config interface{}, maxIdleSec int64) (*Histograms, error) {
cfg, err := parseHistogramConfig(config)
if err != nil {
return nil, err
}
err = validateHistogramConfig(cfg)
if err != nil {
return nil, err
}
return &Histograms{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return &expiringHistogram{prometheus.NewHistogram(prometheus.HistogramOpts{
Help: help,
Name: name,
ConstLabels: labels,
Buckets: cfg.Buckets,
}),
0,
}
}, maxIdleSec),
Cfg: cfg,
}, nil
}
// With returns the histogram associated with a stream labelset.
func (h *Histograms) With(labels model.LabelSet) prometheus.Histogram {
return h.metricVec.With(labels).(prometheus.Histogram)
}
type expiringHistogram struct {
prometheus.Histogram
lastModSec int64
}
// Observe adds a single observation to the histogram.
func (h *expiringHistogram) Observe(val float64) {
h.Histogram.Observe(val)
h.lastModSec = time.Now().Unix()
}
// HasExpired implements Expirable
func (h *expiringHistogram) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-h.lastModSec >= maxAgeSec
}

@ -0,0 +1,38 @@
package metric
import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
func TestHistogramExpiration(t *testing.T) {
t.Parallel()
cfg := HistogramConfig{}
hist, err := NewHistograms("test1", "HELP ME!!!!!", cfg, 1)
assert.Nil(t, err)
// Create a label and increment the histogram
lbl1 := model.LabelSet{}
lbl1["test"] = "app"
hist.With(lbl1).Observe(23)
// Collect the metrics, should still find the metric in the map
collect(hist)
assert.Contains(t, hist.metrics, lbl1.Fingerprint())
time.Sleep(1100 * time.Millisecond) // Wait just past our max idle of 1 sec
//Add another histogram with new label val
lbl2 := model.LabelSet{}
lbl2["test"] = "app2"
hist.With(lbl2).Observe(2)
// Collect the metrics, first histogram should have expired and removed, second should still be present
collect(hist)
assert.NotContains(t, hist.metrics, lbl1.Fingerprint())
assert.Contains(t, hist.metrics, lbl2.Fingerprint())
}

@ -0,0 +1,82 @@
package metric
import (
"sync"
"time"
"github.com/lixh00/loki-client-go/pkg/labelutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
// Expirable allows checking if something has exceeded the provided maxAge based on the provided currentTime
type Expirable interface {
HasExpired(currentTimeSec int64, maxAgeSec int64) bool
}
type metricVec struct {
factory func(labels map[string]string) prometheus.Metric
mtx sync.Mutex
metrics map[model.Fingerprint]prometheus.Metric
maxAgeSec int64
}
func newMetricVec(factory func(labels map[string]string) prometheus.Metric, maxAgeSec int64) *metricVec {
return &metricVec{
metrics: map[model.Fingerprint]prometheus.Metric{},
factory: factory,
maxAgeSec: maxAgeSec,
}
}
// Describe implements prometheus.Collector and doesn't declare any metrics on purpose to bypass prometheus validation.
// see https://godoc.org/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics search for "unchecked"
func (c *metricVec) Describe(ch chan<- *prometheus.Desc) {}
// Collect implements prometheus.Collector
func (c *metricVec) Collect(ch chan<- prometheus.Metric) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, m := range c.metrics {
ch <- m
}
c.prune()
}
// With returns the metric associated with the labelset.
func (c *metricVec) With(labels model.LabelSet) prometheus.Metric {
c.mtx.Lock()
defer c.mtx.Unlock()
fp := labels.Fingerprint()
var ok bool
var metric prometheus.Metric
if metric, ok = c.metrics[fp]; !ok {
metric = c.factory(labelutil.ModelLabelSetToMap(labels))
c.metrics[fp] = metric
}
return metric
}
func (c *metricVec) Delete(labels model.LabelSet) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
fp := labels.Fingerprint()
_, ok := c.metrics[fp]
if ok {
delete(c.metrics, fp)
}
return ok
}
// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
currentTimeSec := time.Now().Unix()
for fp, m := range c.metrics {
if em, ok := m.(Expirable); ok {
if em.HasExpired(currentTimeSec, c.maxAgeSec) {
delete(c.metrics, fp)
}
}
}
}

@ -0,0 +1,59 @@
package urlutil
import "net/url"
// URLValue is a url.URL that can be used as a flag.
type URLValue struct {
*url.URL
}
// String implements flag.Value
func (v URLValue) String() string {
if v.URL == nil {
return ""
}
return v.URL.String()
}
// Set implements flag.Value
func (v *URLValue) Set(s string) error {
u, err := url.Parse(s)
if err != nil {
return err
}
v.URL = u
return nil
}
// UnmarshalYAML implements yaml.Unmarshaler.
func (v *URLValue) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}
// An empty string means no URL has been configured.
if s == "" {
v.URL = nil
return nil
}
return v.Set(s)
}
// MarshalYAML implements yaml.Marshaler.
func (v URLValue) MarshalYAML() (interface{}, error) {
if v.URL == nil {
return "", nil
}
// Mask out passwords when marshalling URLs back to YAML.
u := *v.URL
if u.User != nil {
if _, set := u.User.Password(); set {
u.User = url.UserPassword(u.User.Username(), "********")
}
}
return u.String(), nil
}
Loading…
Cancel
Save