From 8ad64181d995a2a17d388da8e1ce3431caa478f7 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 7 Jan 2016 15:15:35 -0500 Subject: [PATCH 01/16] Upgrade metric name handling functions Deal with different file name suffixes such as .wsp and .tsj. Handle the fact that the file portion of the metric file name uses a "." to end the name and begin meta information about the archive. --- metrics/metrics.go | 70 +++++++++++++++++++++++++++-------------- metrics/metrics_test.go | 10 +++--- 2 files changed, 52 insertions(+), 28 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index af3c1fed..bd19467b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -7,11 +7,20 @@ import ( "path" "path/filepath" "regexp" + "sort" "strings" "sync" "time" ) +// TimeSeries is the in memory and transit representation of time series +// data. +type TimeSeries struct { + Epoch int64 `json: epoch` + Interval int64 `json: interval` + Values []float64 `json: values` +} + type MetricsCacheType struct { metrics []string timestamp int64 @@ -30,26 +39,29 @@ func init() { } // MetricToPath takes a metric name and return an absolute path -// using the --prefix flag. -func MetricToPath(metric string) string { - p := MetricToRelative(metric) +// using the --prefix flag. You must supply the metric file's path suffix +// such as ".wsp". +func MetricToPath(metric, suffix string) string { + p := MetricToRelative(metric, suffix) return path.Join(Prefix, p) } // MetricToRelative take a metric name and returns a relative path // to the Whisper DB. This path combined with the root path to the -// DB store would create a proper absolute path. -func MetricToRelative(metric string) string { - p := strings.Replace(metric, ".", "/", -1) + ".wsp" +// DB store would create a proper absolute path. You must supply the file's +// path suffix such as ".wsp". +func MetricToRelative(metric, suffix string) string { + p := strings.Replace(metric, ".", "/", -1) + suffix return path.Clean(p) } // MetricsToPaths operates on a slice of metric names and returns a -// slice of absolute paths using the --prefix flag. -func MetricsToPaths(metrics []string) []string { +// slice of absolute paths using the --prefix flag. Suffix such as ".wsp" +// is required. +func MetricsToPaths(metrics []string, suffix string) []string { p := make([]string, 0) for _, m := range metrics { - p = append(p, MetricToPath(m)) + p = append(p, MetricToPath(m, suffix)) } return p @@ -67,9 +79,7 @@ func PathToMetric(p string) string { if strings.HasPrefix(p, "/") { p = p[1:] } - - p = strings.Replace(p, ".wsp", "", 1) - return strings.Replace(p, "/", ".", -1) + return RelativeToMetric(p) } // RelativeToMetric takes a relative path from the root of your DB store @@ -77,8 +87,11 @@ func PathToMetric(p string) string { // transformed. func RelativeToMetric(p string) string { p = path.Clean(p) - p = strings.Replace(p, ".wsp", "", 1) - return strings.Replace(p, "/", ".", -1) + dir, file := path.Split(p) + dir = strings.Replace(dir, "/", ".", -1) + // Any "." in the file begins meta information, like .wsp for Whisper DBs + file = strings.Split(file, ".")[0] + return dir + file } // PathsToMetrics operates on a slice of absolute paths prefixed with @@ -127,8 +140,8 @@ func FilterRegex(regex string, metrics []string) ([]string, error) { return result, nil } -// checkWalk is a helper function to sanity check for *.wsp files in a -// file tree walk. If the file is valid, normal *.wsp nil is returned. +// checkWalk is a helper function to sanity check for timeseries DB files in a +// file tree walk. If the file is valid, normal timeseries nil is returned. // Otherwise a non-nil error value is returned. func checkWalk(path string, info os.FileInfo, err error) (bool, error) { // Did the Walk function hit an error on this file? @@ -150,12 +163,15 @@ func checkWalk(path string, info os.FileInfo, err error) (bool, error) { // Not a regular file return false, nil } - if !strings.HasSuffix(path, ".wsp") { - // Not a Whisper Database - return false, nil + if strings.HasSuffix(path, ".tsj") { + return true, nil + } + if strings.HasSuffix(path, ".wsp") { + return true, nil } - return true, nil + // Not a Whisper Database + return false, nil } // NewMetricsCache creates and returns a MetricsCacheType object @@ -166,13 +182,13 @@ func NewMetricsCache() *MetricsCacheType { return m } -// IsAvailable returns a boolean true value if the MetricsCache is avaliable +// IsAvailable returns a boolean true value if the MetricsCache is available // for use. Rebuilding the cache can take some time. func (m *MetricsCacheType) IsAvailable() bool { return m.metrics != nil && !m.updating } -// TimedOut returns true if the cache hasn't been refresed recently. +// TimedOut returns true if the cache hasn't been refreshed recently. func (m *MetricsCacheType) TimedOut() bool { // 1 hour cache timeout return time.Now().Unix()-m.timestamp > 3600 @@ -191,7 +207,6 @@ func (m *MetricsCacheType) RefreshCache() error { return err } if ok { - //log.Printf("Found %s or %s", path, PathToMetric(path)) m.metrics = append(m.metrics, PathToMetric(path)) } return nil @@ -206,6 +221,15 @@ func (m *MetricsCacheType) RefreshCache() error { log.Printf("Scan returned an Error: %s", err) } + // Sort and Unique + sort.Strings(m.metrics) + length := len(m.metrics) - 1 + for i := 0; i < length; i++ { + if m.metrics[i] == m.metrics[i+1] { + m.metrics = append(m.metrics[:i], m.metrics[i+1:]...) + } + } + m.timestamp = time.Now().Unix() m.updating = false m.lock.Unlock() diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 3ee0d0c7..ffb16bac 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -9,18 +9,18 @@ var testMetrics = map[string]string{ } func TestMetricToPath(t *testing.T) { - path := MetricToPath("bobby.sue.foo.bar") - if path != "/opt/graphite/storage/whisper/bobby/sue/foo/bar.wsp" { + path := MetricToPath("bobby.sue.foo.bar", ".foo") + if path != "/opt/graphite/storage/whisper/bobby/sue/foo/bar.foo" { t.Errorf("MetricToPath returned %s for %s, rather than %s", - path, "bobby.sue.foo.bar", "/opt/graphite/storage/whisper/sue/foo/bar.wsp") + path, "bobby.sue.foo.bar", "/opt/graphite/storage/whisper/sue/foo/bar.foo") } } func TestPathToMetric(t *testing.T) { - metric := PathToMetric("/opt/graphite/storage/whisper/bobby/sue/foo/bar.wsp") + metric := PathToMetric("/opt/graphite/storage/whisper/bobby/sue/foo/bar.2011.wsp") if metric != "bobby.sue.foo.bar" { t.Errorf("PathToMetric returned %s for %s, rather than %s", - metric, "/opt/graphite/storage/whisper/bobby/sue/foo/bar.wsp", + metric, "/opt/graphite/storage/whisper/bobby/sue/foo/bar.2011.wsp", "bobby.sue.foo.bar") } } From 703f0a1189ddf84316931c4d9bf5dc2e7aa9a6d0 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 7 Jan 2016 15:19:45 -0500 Subject: [PATCH 02/16] XXX: Hard code tar command for Whisper DBs This needs to be addressed but for the short term (hopefully) this gets us working code. --- bucky/tar.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bucky/tar.go b/bucky/tar.go index be22b5bf..319ec1e1 100644 --- a/bucky/tar.go +++ b/bucky/tar.go @@ -67,7 +67,8 @@ func writeTar(workOut chan *MetricData, wg *sync.WaitGroup) { for work := range workOut { log.Printf("Writing %s...", work.Name) th := new(tar.Header) - th.Name = metrics.MetricToRelative(work.Name) + // XXX: Hard coded for Whisper DBs! + th.Name = metrics.MetricToRelative(work.Name, ".wsp") th.Size = work.Size th.Mode = work.Mode th.ModTime = time.Unix(work.ModTime, 0) From cca70764f4227357e230868cbbaf3f89c226578b Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Sun, 10 Jan 2016 15:32:11 -0500 Subject: [PATCH 03/16] Work testing/experimenting with TS Journals --- REST_API_NOTES.md | 21 ++++++- buckyd/main.go | 1 + buckyd/metrics.go | 3 +- buckyd/schema.go | 30 ++++++++++ buckyd/timeseries.go | 137 +++++++++++++++++++++++++++++++++++++++++++ metrics/metrics.go | 29 +++++++++ 6 files changed, 217 insertions(+), 4 deletions(-) create mode 100644 buckyd/schema.go create mode 100644 buckyd/timeseries.go diff --git a/REST_API_NOTES.md b/REST_API_NOTES.md index b991b73e..f0bd2b85 100644 --- a/REST_API_NOTES.md +++ b/REST_API_NOTES.md @@ -4,9 +4,9 @@ Buckyd REST API Specification /metrics -------- -Returns a JSON array listing the metrics on the local host. May return a status -code of 202 Accepted when the internal cache is being rebuilt. In that case -the client should sleep and try again. +Returns a JSON array listing the metrics on the local host. May return a +status code of 202 Accepted when the internal cache is being rebuilt. In that +case the client should sleep and try again. Methods: @@ -40,6 +40,21 @@ Methods: data point is null. See Carbonate's whisper-fill.py. * DELETE - Remove this metric from the file system. +/timeseries/ +------------------------ + +Operates on timeseries data contained within the Graphite style metric.key. +These operations accept/return a JSON dict value with integer keys 'epoch', +'interval', and a list of floats stored as 'values'. + +XXX: Protobufs in the future + +Methods: +* POST - commit these data points to disk. 200 OK is returned after a + successful write operation. No caching at this layer. +* GET - Supply query parameters of 'from' and optionally 'until' to retrieve + a set of values from on disk storage. + /hashring --------- diff --git a/buckyd/main.go b/buckyd/main.go index dde59a02..b1dfd2b9 100644 --- a/buckyd/main.go +++ b/buckyd/main.go @@ -124,6 +124,7 @@ func main() { http.HandleFunc("/metrics", listMetrics) http.HandleFunc("/metrics/", serveMetrics) http.HandleFunc("/hashring", listHashring) + http.HandleFunc("/timeseries/", serveTimeSeries) log.Printf("Starting server on %s", bindAddress) err = http.ListenAndServe(bindAddress, nil) diff --git a/buckyd/metrics.go b/buckyd/metrics.go index d216352b..c2c3dcd7 100644 --- a/buckyd/metrics.go +++ b/buckyd/metrics.go @@ -81,7 +81,8 @@ func serveMetrics(w http.ResponseWriter, r *http.Request) { logRequest(r) metric := r.URL.Path[len("/metrics/"):] - path := MetricToPath(metric) + // XXX: Hardcoded for Whisper DBs + path := MetricToPath(metric, ".wsp") if len(metric) == 0 { http.Error(w, "Metric name missing.", http.StatusBadRequest) return diff --git a/buckyd/schema.go b/buckyd/schema.go new file mode 100644 index 00000000..e1e063dc --- /dev/null +++ b/buckyd/schema.go @@ -0,0 +1,30 @@ +package main + +import ( + "strings" +) + +// MetricInterval examines the given Graphite style metric name and returns +// the associated interval in seconds. XXX: This should be based off a +// configuration source. +func MetricInterval(metric string) int64 { + switch { + case strings.HasPrefix(metric, "1sec."): + return 1 + case strings.HasPrefix(metric, "1min."): + return 60 + case strings.HasPrefix(metric, "5min."): + return 300 + case strings.HasPrefix(metric, "10min."): + return 600 + case strings.HasPrefix(metric, "15min."): + return 900 + case strings.HasPrefix(metric, "hourly."): + return 3600 + case strings.HasPrefix(metric, "daily."): + return 86400 + } + + // Default + return 60 +} diff --git a/buckyd/timeseries.go b/buckyd/timeseries.go new file mode 100644 index 00000000..9ade6f2f --- /dev/null +++ b/buckyd/timeseries.go @@ -0,0 +1,137 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "os" + "strconv" + "time" +) + +import "github.com/jjneely/buckytools/metrics" +import "github.com/jjneely/journal/timeseries" +import "github.com/jjneely/journal" + +func serveTimeSeries(w http.ResponseWriter, r *http.Request) { + logRequest(r) + + metric := r.URL.Path[len("/timeseries/"):] + if len(metric) == 0 { + http.Error(w, "Metric name missing.", http.StatusBadRequest) + return + } + + switch r.Method { + case "GET": + getTimeSeries(w, r, metric) + case "POST": + postTimeSeries(w, r, metric) + default: + http.Error(w, "Bad method request.", http.StatusBadRequest) + } +} + +func getTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { + // XXX: Need to know about the data partitions we have on disk + // XXX: Support Whisper DB fallback? + path := metrics.MetricToPath(metric, ".tsj") + if r.FormValue("from") == "" { + http.Error(w, "No from timestamp.", http.StatusBadRequest) + return + } + from, err := strconv.ParseInt(r.FormValue("from"), 0, 64) + if err != nil { + http.Error(w, "from: "+err.Error(), http.StatusBadRequest) + return + } + var until int64 + if r.FormValue("until") == "" { + until = time.Now().Unix() + } else { + until, err = strconv.ParseInt(r.FormValue("until"), 0, 64) + if err != nil { + http.Error(w, "until: "+err.Error(), http.StatusBadRequest) + return + } + } + if from >= until { + http.Error(w, "Bad time range request", http.StatusBadRequest) + return + } + + j, err := timeseries.Open(path) + if os.IsNotExist(err) { + http.Error(w, "File not found.", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Printf("Error opening journal: %s", err) + return + } + + ret, err := metrics.JournalFetch(j, from, until) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Printf("Error reading journal: %s", err) + return + } + + // Marshal the data back as a JSON blob + blob, err := json.Marshal(ret) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Printf("Error marshaling data: %s", err) + } else { + w.Header().Set("Content-Type", "application/json") + w.Write(blob) + } +} + +func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { + // XXX: Need to know about the data partitions we have on disk + // XXX: Support Whisper DB fallback? + path := metrics.MetricToPath(metric, ".tsj") + + // Does this request look sane? + if r.Header.Get("Content-Type") != "application/octet-stream" { + http.Error(w, "Content-Type must be application/octet-stream.", + http.StatusBadRequest) + log.Printf("postTimeSeries: content-type of %s, abort!", + r.Header.Get("Content-Type")) + return + } + + blob, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Printf("Error reading body in postTimeSeries: %s", err) + return + } + ts := new(metrics.TimeSeries) + err = json.Unmarshal(blob, ts) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + log.Printf("Error unmarshalling json: %s", err) + return + } + + j, err := timeseries.Open(path) + if os.IsNotExist(err) { + j, err = timeseries.Create(path, MetricInterval(metric), + journal.NewFloat64ValueType(), make([]int64, 0)) + } + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Printf("Error opening/creating timeseries journal: %s", err) + return + } + err = metrics.JournalUpdate(j, ts) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Printf("Error updating journal: %s", err) + return + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index bd19467b..93010ebe 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "flag" + "fmt" "log" "os" "path" @@ -13,6 +14,9 @@ import ( "time" ) +import "github.com/jjneely/journal/timeseries" +import "github.com/jjneely/journal" + // TimeSeries is the in memory and transit representation of time series // data. type TimeSeries struct { @@ -38,6 +42,31 @@ func init() { "The root of the whisper database store.") } +// JournalFetch is a convienance wrapper around reading from timeseries +// journals +func JournalFetch(j timeseries.Journal, from, until int64) (*TimeSeries, error) { + // Integer division and inclusive! + n := int((until - from) / j.Interval()) + values, err := j.Read(from, n) + if err != nil { + return nil, err + } + + ret := new(TimeSeries) + ret.Epoch = from - (from % j.Interval()) + ret.Interval = j.Interval() + ret.Values = []float64(values.(journal.Float64Values)) + + return ret, nil +} + +func JournalUpdate(j timeseries.Journal, ts *TimeSeries) error { + if ts.Interval != j.Interval() { + return fmt.Errorf("Interval mismatch.") + } + return j.Write(ts.Epoch, journal.Float64Values(ts.Values)) +} + // MetricToPath takes a metric name and return an absolute path // using the --prefix flag. You must supply the metric file's path suffix // such as ".wsp". From c7e81792bb04f392018ce294a84a5c962de81c9e Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 11 Jan 2016 08:38:01 -0500 Subject: [PATCH 04/16] Convert Whisper files to the bucky tsj store --- bucky-convert/main.go | 156 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 bucky-convert/main.go diff --git a/bucky-convert/main.go b/bucky-convert/main.go new file mode 100644 index 00000000..90025684 --- /dev/null +++ b/bucky-convert/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +import ( + "github.com/jjneely/buckytools" + "github.com/jjneely/buckytools/metrics" + "github.com/jjneely/buckytools/whisper" +) + +var httpClient = new(http.Client) +var host string + +func usage() { + fmt.Printf("%s [options] buckydaemon:port prefix\n", os.Args[0]) + fmt.Printf("Version: %s\n\n", buckytools.Version) + fmt.Printf("Given prefix is walked to find .wsp files and the path is\n") + fmt.Printf("to generate a metric name. This metric name and associated\n") + fmt.Printf("data points are committed to the Bucky TSJ store.") + fmt.Printf("\n\n") + flag.PrintDefaults() +} + +func FindValidDataPoints(wsp *whisper.Whisper) (*whisper.TimeSeries, error) { + retentions := whisper.RetentionsByPrecision{wsp.Retentions()} + sort.Sort(retentions) + + start := int(time.Now().Unix()) + from := 0 + for _, r := range retentions.Iterator() { + from = int(time.Now().Unix()) - r.MaxRetention() + + ts, err := wsp.Fetch(from, start) + if err != nil { + return nil, err + } + return ts, nil + } + + // we don't get here. + return nil, nil +} + +func examine(path string, info os.FileInfo, err error) error { + // Did the Walk function hit an error on this file? + if err != nil { + log.Printf("%s\n", err) + return nil + } + + // Sanity check our file + if info.IsDir() { + if strings.HasPrefix(path, ".") { + return filepath.SkipDir + } + return nil + } + if !info.Mode().IsRegular() { + // Not a regular file + return nil + } + if !strings.HasSuffix(path, ".wsp") { + // Not a Whisper Database + return nil + } + + wsp, err := whisper.Open(path) + if err != nil { + log.Printf("%s\n", err) + return err + } + defer wsp.Close() + + metric := metrics.PathToMetric(path) + ts, err := FindValidDataPoints(wsp) + if err != nil { + log.Printf("%s\n", err) + return err + } + + commitTimeSeries(ts, metric) + return nil +} + +func commitTimeSeries(ts *whisper.TimeSeries, metric string) { + tsj := new(metrics.TimeSeries) + tsj.Epoch = int64(ts.FromTime()) + tsj.Interval = int64(ts.Step()) + tsj.Values = ts.Values() + + u := &url.URL{ + Scheme: "http", + Host: host, + Path: "/timeseries/" + metric, + } + + blob, err := json.Marshal(tsj) + buf := bytes.NewBuffer(blob) + r, err := httpClient.Post(u.String(), "application/json", buf) + if err != nil { + log.Printf("HTTP POST Failed: %s", err) + return + } + if r.StatusCode != 200 { + log.Printf("Failed: %s", metric) + body, err := ioutil.ReadAll(r.Body) + if err == nil { + log.Printf("%s: %s", r.Status, string(body)) + } else { + log.Printf("%s: No body", r.Status) + } + } + os.Exit(2) +} + +func main() { + var version bool + flag.Usage = usage + flag.BoolVar(&version, "version", false, "Display version information.") + flag.Parse() + + if version { + fmt.Printf("Buckytools version: %s\n", buckytools.Version) + os.Exit(0) + } + + if flag.NArg() < 2 { + usage() + return + } + + if flag.NArg() >= 2 { + // Start our walk + host = flag.Arg(0) + for i := 1; i < flag.NArg(); i++ { + err := filepath.Walk(flag.Arg(i), examine) + if err != nil { + log.Printf("%s\n", err) + } + } + } +} From 48d450a33c07b3d2efb656c1444ab2203fbcf567 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 11 Jan 2016 18:10:12 -0500 Subject: [PATCH 05/16] Experimental converter to a Journal daemon This works as an experiment except for proper handling of NaN values. JSON isn't capable here and we need Protobufs to do this properly. --- bucky-convert/main.go | 108 +++++++++++++++++++++++++++++++++++------- buckyd/timeseries.go | 30 ++++++++---- metrics/metrics.go | 46 ++++++++++++++++-- 3 files changed, 153 insertions(+), 31 deletions(-) diff --git a/bucky-convert/main.go b/bucky-convert/main.go index 90025684..d401cd6f 100644 --- a/bucky-convert/main.go +++ b/bucky-convert/main.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "net/http" "net/url" "os" @@ -24,6 +25,7 @@ import ( var httpClient = new(http.Client) var host string +var confirm bool func usage() { fmt.Printf("%s [options] buckydaemon:port prefix\n", os.Args[0]) @@ -35,24 +37,37 @@ func usage() { flag.PrintDefaults() } -func FindValidDataPoints(wsp *whisper.Whisper) (*whisper.TimeSeries, error) { +func FindValidDataPoints(wsp *whisper.Whisper) (*metrics.TimeSeries, error) { retentions := whisper.RetentionsByPrecision{wsp.Retentions()} sort.Sort(retentions) start := int(time.Now().Unix()) from := 0 - for _, r := range retentions.Iterator() { - from = int(time.Now().Unix()) - r.MaxRetention() + r := retentions.Iterator()[0] + from = int(time.Now().Unix()) - r.MaxRetention() - ts, err := wsp.Fetch(from, start) - if err != nil { - return nil, err - } - return ts, nil + ts, err := wsp.Fetch(from, start) + if err != nil { + return nil, err + } + + tsj := new(metrics.TimeSeries) + tsj.Epoch = int64(ts.FromTime()) + tsj.Interval = int64(ts.Step()) + tsj.Values = make([]metrics.MarshalFloat64, 0) + values := ts.Values() + for math.IsNaN(values[0]) { + tsj.Epoch = tsj.Epoch + tsj.Interval + values = values[1:] + } + for math.IsNaN(values[len(values)-1]) { + values = values[:len(values)-1] } - // we don't get here. - return nil, nil + for _, f := range values { + tsj.Values = append(tsj.Values, metrics.MarshalFloat64(f)) + } + return tsj, nil } func examine(path string, info os.FileInfo, err error) error { @@ -92,15 +107,15 @@ func examine(path string, info os.FileInfo, err error) error { return err } + log.Printf("Converting : %s", path) + log.Printf("Metric name: %s", metric) commitTimeSeries(ts, metric) return nil } -func commitTimeSeries(ts *whisper.TimeSeries, metric string) { - tsj := new(metrics.TimeSeries) - tsj.Epoch = int64(ts.FromTime()) - tsj.Interval = int64(ts.Step()) - tsj.Values = ts.Values() +func commitTimeSeries(tsj *metrics.TimeSeries, metric string) { + log.Printf("Committing: Epoch = %d, Int = %d, len(values) = %d", + tsj.Epoch, tsj.Interval, len(tsj.Values)) u := &url.URL{ Scheme: "http", @@ -109,6 +124,10 @@ func commitTimeSeries(ts *whisper.TimeSeries, metric string) { } blob, err := json.Marshal(tsj) + if err != nil { + log.Printf("Error marshaling JSON data: %s", err) + return + } buf := bytes.NewBuffer(blob) r, err := httpClient.Post(u.String(), "application/json", buf) if err != nil { @@ -124,13 +143,70 @@ func commitTimeSeries(ts *whisper.TimeSeries, metric string) { log.Printf("%s: No body", r.Status) } } - os.Exit(2) + if confirm { + confirmData(metric, tsj) + } +} + +func confirmData(metric string, data *metrics.TimeSeries) { + u := &url.URL{ + Scheme: "http", + Host: host, + Path: "/timeseries/" + metric, + } + r, err := httpClient.Get(u.String()) + if err != nil { + log.Printf("Error GET'ing back data: %s", err) + return + } + + // This is a test, we assume that we are converting data to the new format + // so when we do a GET expecting all TSJ data to be returned, we should + // get back the same JSON blob, right? + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Printf("Error reading body to confirm data: %s", err) + return + } + if r.StatusCode != 200 { + log.Printf("GET request failed with: %s: %s", r.Status, string(body)) + return + } + tsj := new(metrics.TimeSeries) + err = json.Unmarshal(body, tsj) + if err != nil { + log.Printf("Could not unmarshal remote's JSON to compare: %s", err) + return + } + + if tsj.Epoch != data.Epoch || tsj.Interval != data.Interval { + log.Printf("Warning: Data returns has unmatched E/I: %d / %d", + tsj.Epoch, tsj.Interval) + return + } + if len(tsj.Values) != len(data.Values) { + log.Printf("Warning: Value slices not the same length: %d / %d", + len(tsj.Values), len(data.Values)) + return + } + var flag = true + for i, _ := range tsj.Values { + if tsj.Values[i] != data.Values[i] { + log.Printf("Index: %d: %f != %f", i, tsj.Values[i], data.Values[i]) + flag = false + } + } + if !flag { + log.Printf("Warning: Data in series doesn't match") + } } func main() { var version bool flag.Usage = usage flag.BoolVar(&version, "version", false, "Display version information.") + flag.BoolVar(&confirm, "confirm", false, "Query time series data back and compare.") flag.Parse() if version { diff --git a/buckyd/timeseries.go b/buckyd/timeseries.go index 9ade6f2f..3781efdc 100644 --- a/buckyd/timeseries.go +++ b/buckyd/timeseries.go @@ -36,17 +36,19 @@ func serveTimeSeries(w http.ResponseWriter, r *http.Request) { func getTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { // XXX: Need to know about the data partitions we have on disk // XXX: Support Whisper DB fallback? + var from, until int64 + var err error path := metrics.MetricToPath(metric, ".tsj") if r.FormValue("from") == "" { - http.Error(w, "No from timestamp.", http.StatusBadRequest) - return - } - from, err := strconv.ParseInt(r.FormValue("from"), 0, 64) - if err != nil { - http.Error(w, "from: "+err.Error(), http.StatusBadRequest) - return + from = 0 + } else { + from, err = strconv.ParseInt(r.FormValue("from"), 0, 64) + if err != nil { + http.Error(w, "from: "+err.Error(), http.StatusBadRequest) + return + } } - var until int64 + if r.FormValue("until") == "" { until = time.Now().Unix() } else { @@ -71,6 +73,7 @@ func getTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { log.Printf("Error opening journal: %s", err) return } + defer j.Close() ret, err := metrics.JournalFetch(j, from, until) if err != nil { @@ -96,8 +99,8 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { path := metrics.MetricToPath(metric, ".tsj") // Does this request look sane? - if r.Header.Get("Content-Type") != "application/octet-stream" { - http.Error(w, "Content-Type must be application/octet-stream.", + if r.Header.Get("Content-Type") != "application/json" { + http.Error(w, "Accepted Content-Type: application/json", http.StatusBadRequest) log.Printf("postTimeSeries: content-type of %s, abort!", r.Header.Get("Content-Type")) @@ -105,6 +108,7 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { } blob, err := ioutil.ReadAll(r.Body) + log.Printf("Body: %v", string(blob[:256])) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) log.Printf("Error reading body in postTimeSeries: %s", err) @@ -118,6 +122,10 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { return } + for i := 0; i < 20 && i < len(ts.Values); i++ { + log.Printf("Saw %v", ts.Values[i]) + } + j, err := timeseries.Open(path) if os.IsNotExist(err) { j, err = timeseries.Create(path, MetricInterval(metric), @@ -128,6 +136,8 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { log.Printf("Error opening/creating timeseries journal: %s", err) return } + defer j.Close() + err = metrics.JournalUpdate(j, ts) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/metrics/metrics.go b/metrics/metrics.go index 93010ebe..a3d263c3 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -4,11 +4,13 @@ import ( "flag" "fmt" "log" + "math" "os" "path" "path/filepath" "regexp" "sort" + "strconv" "strings" "sync" "time" @@ -20,11 +22,13 @@ import "github.com/jjneely/journal" // TimeSeries is the in memory and transit representation of time series // data. type TimeSeries struct { - Epoch int64 `json: epoch` - Interval int64 `json: interval` - Values []float64 `json: values` + Epoch int64 `json: epoch` + Interval int64 `json: interval` + Values []MarshalFloat64 `json: values` } +type MarshalFloat64 float64 + type MetricsCacheType struct { metrics []string timestamp int64 @@ -42,10 +46,35 @@ func init() { "The root of the whisper database store.") } +func (f MarshalFloat64) XXXUnmarshalJSON(buf []byte) error { + var err error + var v float64 + + if string(buf) == "null" { + f = MarshalFloat64(math.NaN()) + } else { + v, err = strconv.ParseFloat(string(buf), 64) + f = MarshalFloat64(v) + } + //log.Printf("Unmarshaled: %v", f) + return err +} + +func (f MarshalFloat64) MarshalJSON() ([]byte, error) { + if math.IsNaN(float64(f)) { + f = 0 + } + ret := []byte(strconv.FormatFloat(float64(f), 'E', -1, 64)) + return ret, nil +} + // JournalFetch is a convienance wrapper around reading from timeseries // journals func JournalFetch(j timeseries.Journal, from, until int64) (*TimeSeries, error) { // Integer division and inclusive! + if from < j.Epoch() { + from = j.Epoch() + } n := int((until - from) / j.Interval()) values, err := j.Read(from, n) if err != nil { @@ -55,7 +84,10 @@ func JournalFetch(j timeseries.Journal, from, until int64) (*TimeSeries, error) ret := new(TimeSeries) ret.Epoch = from - (from % j.Interval()) ret.Interval = j.Interval() - ret.Values = []float64(values.(journal.Float64Values)) + ret.Values = make([]MarshalFloat64, 0) + for _, f := range []float64(values.(journal.Float64Values)) { + ret.Values = append(ret.Values, MarshalFloat64(f)) + } return ret, nil } @@ -64,7 +96,11 @@ func JournalUpdate(j timeseries.Journal, ts *TimeSeries) error { if ts.Interval != j.Interval() { return fmt.Errorf("Interval mismatch.") } - return j.Write(ts.Epoch, journal.Float64Values(ts.Values)) + floats := make([]float64, 0) + for _, f := range ts.Values { + floats = append(floats, float64(f)) + } + return j.Write(ts.Epoch, journal.Float64Values(floats)) } // MetricToPath takes a metric name and return an absolute path From 7466cb01cac9cd7b42bff7e9ca66e748354946aa Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 11 Jan 2016 21:23:06 -0800 Subject: [PATCH 06/16] Ditch JSON and use protobufs --- bucky-convert/main.go | 35 +++++++++++--------------- buckyd/timeseries.go | 14 +++++------ metrics/metrics.go | 39 +---------------------------- metrics/timeseries.pb.go | 53 ++++++++++++++++++++++++++++++++++++++++ metrics/timeseries.proto | 8 ++++++ 5 files changed, 83 insertions(+), 66 deletions(-) create mode 100644 metrics/timeseries.pb.go create mode 100644 metrics/timeseries.proto diff --git a/bucky-convert/main.go b/bucky-convert/main.go index d401cd6f..a3eb9ff9 100644 --- a/bucky-convert/main.go +++ b/bucky-convert/main.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "encoding/json" "flag" "fmt" "io/ioutil" @@ -17,6 +16,8 @@ import ( "time" ) +import "github.com/golang/protobuf/proto" + import ( "github.com/jjneely/buckytools" "github.com/jjneely/buckytools/metrics" @@ -54,19 +55,15 @@ func FindValidDataPoints(wsp *whisper.Whisper) (*metrics.TimeSeries, error) { tsj := new(metrics.TimeSeries) tsj.Epoch = int64(ts.FromTime()) tsj.Interval = int64(ts.Step()) - tsj.Values = make([]metrics.MarshalFloat64, 0) - values := ts.Values() - for math.IsNaN(values[0]) { + tsj.Values = ts.Values() + for math.IsNaN(tsj.Values[0]) { tsj.Epoch = tsj.Epoch + tsj.Interval - values = values[1:] + tsj.Values = tsj.Values[1:] } - for math.IsNaN(values[len(values)-1]) { - values = values[:len(values)-1] + for math.IsNaN(tsj.Values[len(tsj.Values)-1]) { + tsj.Values = tsj.Values[:len(tsj.Values)-1] } - for _, f := range values { - tsj.Values = append(tsj.Values, metrics.MarshalFloat64(f)) - } return tsj, nil } @@ -123,13 +120,13 @@ func commitTimeSeries(tsj *metrics.TimeSeries, metric string) { Path: "/timeseries/" + metric, } - blob, err := json.Marshal(tsj) + blob, err := proto.Marshal(tsj) if err != nil { - log.Printf("Error marshaling JSON data: %s", err) + log.Printf("Error marshaling protobuf data: %s", err) return } buf := bytes.NewBuffer(blob) - r, err := httpClient.Post(u.String(), "application/json", buf) + r, err := httpClient.Post(u.String(), "application/protobuf", buf) if err != nil { log.Printf("HTTP POST Failed: %s", err) return @@ -160,10 +157,6 @@ func confirmData(metric string, data *metrics.TimeSeries) { return } - // This is a test, we assume that we are converting data to the new format - // so when we do a GET expecting all TSJ data to be returned, we should - // get back the same JSON blob, right? - body, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf("Error reading body to confirm data: %s", err) @@ -174,9 +167,9 @@ func confirmData(metric string, data *metrics.TimeSeries) { return } tsj := new(metrics.TimeSeries) - err = json.Unmarshal(body, tsj) + err = proto.Unmarshal(body, tsj) if err != nil { - log.Printf("Could not unmarshal remote's JSON to compare: %s", err) + log.Printf("Could not unmarshal remote's protobuf to compare: %s", err) return } @@ -192,8 +185,8 @@ func confirmData(metric string, data *metrics.TimeSeries) { } var flag = true for i, _ := range tsj.Values { - if tsj.Values[i] != data.Values[i] { - log.Printf("Index: %d: %f != %f", i, tsj.Values[i], data.Values[i]) + if !math.IsNaN(tsj.Values[i]) && !math.IsNaN(data.Values[i]) && tsj.Values[i] != data.Values[i] { + log.Printf("Index: %d: %v != %v", i, tsj.Values[i], data.Values[i]) flag = false } } diff --git a/buckyd/timeseries.go b/buckyd/timeseries.go index 3781efdc..e3d88f24 100644 --- a/buckyd/timeseries.go +++ b/buckyd/timeseries.go @@ -1,7 +1,6 @@ package main import ( - "encoding/json" "io/ioutil" "log" "net/http" @@ -10,6 +9,8 @@ import ( "time" ) +import "github.com/golang/protobuf/proto" + import "github.com/jjneely/buckytools/metrics" import "github.com/jjneely/journal/timeseries" import "github.com/jjneely/journal" @@ -83,12 +84,12 @@ func getTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { } // Marshal the data back as a JSON blob - blob, err := json.Marshal(ret) + blob, err := proto.Marshal(ret) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) log.Printf("Error marshaling data: %s", err) } else { - w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Type", "application/protobuf") w.Write(blob) } } @@ -99,8 +100,8 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { path := metrics.MetricToPath(metric, ".tsj") // Does this request look sane? - if r.Header.Get("Content-Type") != "application/json" { - http.Error(w, "Accepted Content-Type: application/json", + if r.Header.Get("Content-Type") != "application/protobuf" { + http.Error(w, "Accepted Content-Type: application/prtobuf", http.StatusBadRequest) log.Printf("postTimeSeries: content-type of %s, abort!", r.Header.Get("Content-Type")) @@ -108,14 +109,13 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { } blob, err := ioutil.ReadAll(r.Body) - log.Printf("Body: %v", string(blob[:256])) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) log.Printf("Error reading body in postTimeSeries: %s", err) return } ts := new(metrics.TimeSeries) - err = json.Unmarshal(blob, ts) + err = proto.Unmarshal(blob, ts) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) log.Printf("Error unmarshalling json: %s", err) diff --git a/metrics/metrics.go b/metrics/metrics.go index a3d263c3..7cdcf09e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -4,13 +4,11 @@ import ( "flag" "fmt" "log" - "math" "os" "path" "path/filepath" "regexp" "sort" - "strconv" "strings" "sync" "time" @@ -19,16 +17,6 @@ import ( import "github.com/jjneely/journal/timeseries" import "github.com/jjneely/journal" -// TimeSeries is the in memory and transit representation of time series -// data. -type TimeSeries struct { - Epoch int64 `json: epoch` - Interval int64 `json: interval` - Values []MarshalFloat64 `json: values` -} - -type MarshalFloat64 float64 - type MetricsCacheType struct { metrics []string timestamp int64 @@ -46,28 +34,6 @@ func init() { "The root of the whisper database store.") } -func (f MarshalFloat64) XXXUnmarshalJSON(buf []byte) error { - var err error - var v float64 - - if string(buf) == "null" { - f = MarshalFloat64(math.NaN()) - } else { - v, err = strconv.ParseFloat(string(buf), 64) - f = MarshalFloat64(v) - } - //log.Printf("Unmarshaled: %v", f) - return err -} - -func (f MarshalFloat64) MarshalJSON() ([]byte, error) { - if math.IsNaN(float64(f)) { - f = 0 - } - ret := []byte(strconv.FormatFloat(float64(f), 'E', -1, 64)) - return ret, nil -} - // JournalFetch is a convienance wrapper around reading from timeseries // journals func JournalFetch(j timeseries.Journal, from, until int64) (*TimeSeries, error) { @@ -84,10 +50,7 @@ func JournalFetch(j timeseries.Journal, from, until int64) (*TimeSeries, error) ret := new(TimeSeries) ret.Epoch = from - (from % j.Interval()) ret.Interval = j.Interval() - ret.Values = make([]MarshalFloat64, 0) - for _, f := range []float64(values.(journal.Float64Values)) { - ret.Values = append(ret.Values, MarshalFloat64(f)) - } + ret.Values = []float64(values.(journal.Float64Values)) return ret, nil } diff --git a/metrics/timeseries.pb.go b/metrics/timeseries.pb.go new file mode 100644 index 00000000..26404b5c --- /dev/null +++ b/metrics/timeseries.pb.go @@ -0,0 +1,53 @@ +// Code generated by protoc-gen-go. +// source: timeseries.proto +// DO NOT EDIT! + +/* +Package metrics is a generated protocol buffer package. + +It is generated from these files: + timeseries.proto + +It has these top-level messages: + TimeSeries +*/ +package metrics + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +const _ = proto.ProtoPackageIsVersion1 + +type TimeSeries struct { + Epoch int64 `protobuf:"varint,1,opt,name=epoch" json:"epoch,omitempty"` + Interval int64 `protobuf:"varint,2,opt,name=interval" json:"interval,omitempty"` + Values []float64 `protobuf:"fixed64,3,rep,name=values" json:"values,omitempty"` +} + +func (m *TimeSeries) Reset() { *m = TimeSeries{} } +func (m *TimeSeries) String() string { return proto.CompactTextString(m) } +func (*TimeSeries) ProtoMessage() {} +func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func init() { + proto.RegisterType((*TimeSeries)(nil), "metrics.TimeSeries") +} + +var fileDescriptor0 = []byte{ + // 111 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x28, 0xc9, 0xcc, 0x4d, + 0x2d, 0x4e, 0x2d, 0xca, 0x4c, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xcf, 0x4d, + 0x2d, 0x29, 0xca, 0x4c, 0x2e, 0x56, 0xb2, 0xe5, 0xe2, 0x0a, 0x01, 0x4a, 0x06, 0x83, 0x25, 0x85, + 0x78, 0xb9, 0x58, 0x53, 0x0b, 0xf2, 0x93, 0x33, 0x24, 0x18, 0x15, 0x18, 0x35, 0x98, 0x85, 0x04, + 0xb8, 0x38, 0x32, 0xf3, 0x4a, 0x52, 0x8b, 0xca, 0x12, 0x73, 0x24, 0x98, 0xc0, 0x22, 0x7c, 0x5c, + 0x6c, 0x40, 0x4e, 0x69, 0x6a, 0xb1, 0x04, 0xb3, 0x02, 0xb3, 0x06, 0x63, 0x12, 0x1b, 0xd8, 0x38, + 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa6, 0xc3, 0x30, 0xc0, 0x62, 0x00, 0x00, 0x00, +} diff --git a/metrics/timeseries.proto b/metrics/timeseries.proto new file mode 100644 index 00000000..2a823259 --- /dev/null +++ b/metrics/timeseries.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; +package metrics; + +message TimeSeries { + int64 epoch = 1; + int64 interval = 2; + repeated double values = 3; +} From 99992deb410ed5486333197ccb10a2f01570e142 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Tue, 12 Jan 2016 23:26:31 +0000 Subject: [PATCH 07/16] Whisper now reads files without requiring write access --- whisper/whisper.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/whisper/whisper.go b/whisper/whisper.go index 42b5568f..2ffda3be 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -229,6 +229,9 @@ func validateRetentions(retentions Retentions) error { */ func Open(path string) (whisper *Whisper, err error) { file, err := os.OpenFile(path, os.O_RDWR, 0666) + if os.IsPermission(err) { + file, err = os.OpenFile(path, os.O_RDONLY, 0666) + } if err != nil { return nil, err } From cab4713614f9424c5b6d489acaf94981e2439708 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Tue, 12 Jan 2016 23:29:31 +0000 Subject: [PATCH 08/16] Use the Interval from the incoming TimeSeries rather than trying to parse and guess. --- buckyd/timeseries.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buckyd/timeseries.go b/buckyd/timeseries.go index e3d88f24..03669e7e 100644 --- a/buckyd/timeseries.go +++ b/buckyd/timeseries.go @@ -128,7 +128,7 @@ func postTimeSeries(w http.ResponseWriter, r *http.Request, metric string) { j, err := timeseries.Open(path) if os.IsNotExist(err) { - j, err = timeseries.Create(path, MetricInterval(metric), + j, err = timeseries.Create(path, ts.Interval, journal.NewFloat64ValueType(), make([]int64, 0)) } if err != nil { From 28637ba96378202f7e801249b7218c9815d30d27 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Tue, 12 Jan 2016 23:30:13 +0000 Subject: [PATCH 09/16] Handle metrics that are empty --- bucky-convert/main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bucky-convert/main.go b/bucky-convert/main.go index a3eb9ff9..c7a67fd7 100644 --- a/bucky-convert/main.go +++ b/bucky-convert/main.go @@ -56,11 +56,11 @@ func FindValidDataPoints(wsp *whisper.Whisper) (*metrics.TimeSeries, error) { tsj.Epoch = int64(ts.FromTime()) tsj.Interval = int64(ts.Step()) tsj.Values = ts.Values() - for math.IsNaN(tsj.Values[0]) { + for len(tsj.Values) > 0 && math.IsNaN(tsj.Values[0]) { tsj.Epoch = tsj.Epoch + tsj.Interval tsj.Values = tsj.Values[1:] } - for math.IsNaN(tsj.Values[len(tsj.Values)-1]) { + for len(tsj.Values) > 0 && math.IsNaN(tsj.Values[len(tsj.Values)-1]) { tsj.Values = tsj.Values[:len(tsj.Values)-1] } @@ -103,6 +103,10 @@ func examine(path string, info os.FileInfo, err error) error { log.Printf("%s\n", err) return err } + if len(ts.Values) == 0 { + log.Printf("Empty Metric: %s", path) + return nil + } log.Printf("Converting : %s", path) log.Printf("Metric name: %s", metric) From bdd99e3a0f996d81842612cb8abb7d05aa66f24e Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Wed, 20 Jan 2016 11:47:23 -0500 Subject: [PATCH 10/16] WIP: carbon-cache implementation Non-functional WIP --- buckyd/carbon.go | 131 ++++++++++++++++++++++++++++++ buckyd/heap.go | 201 +++++++++++++++++++++++++++++++++++++++++++++++ buckyd/schema.go | 30 ------- 3 files changed, 332 insertions(+), 30 deletions(-) create mode 100644 buckyd/carbon.go create mode 100644 buckyd/heap.go delete mode 100644 buckyd/schema.go diff --git a/buckyd/carbon.go b/buckyd/carbon.go new file mode 100644 index 00000000..8dbabeec --- /dev/null +++ b/buckyd/carbon.go @@ -0,0 +1,131 @@ +package main + +import ( + "bytes" + "io" + "log" + "net" + "strconv" +) + +type TimeSeriesPoint struct { + Metric string + Timestamp int64 + Value float64 +} + +func runCarbonServer(bind string) { + cache := runCache() + carbon := carbonServer(bind, stop) + + for m := range carbon { + cache <- m + } +} + +// carbonServer starts a TCP listener and handles incoming Graphite line +// protocol data. +func carbonServer(bind string) chan *TimeSeriesPoint { + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, os.Kill) + ln, err := net.Listen("tcp", bind) + if err != nil { + log.Fatalf("Error listening for TCP carbon connections: %s", err) + } + + c := make(chan *TimeSeriesPoint) + go func() { + defer ln.Close() + defer close(c) + for { + select { + case <-sig: + log.Printf("Received signal, shutting down carbon server") + return + default: + } + ln.SetDeadline(time.Now().Add(time.Second)) + conn, err := ln.Accept() + if err, ok := err.(net.Error); ok && err.Timeout() { + // Deadline timeout, continue loop + continue + } + if err != nil { + log.Printf("Accepting TCP connection failed: %s", err) + continue + } + + go handleCarbon(conn, c) + } + }() + + return c +} + +func handleCarbon(conn net.Conn, c chan *TimeSeriesPoint) { + // Each connection has a 1KiB buffer for reading / parsing incoming data + buf := make([]byte, 1024, 1024) + offset := 0 + + defer conn.Close() + for { + n, err := conn.Read(buf[offset:]) + if n > 0 { + lines := bytes.Split(buf[:n], []byte{'\n'}) + last := len(lines) - 1 + for i, line := range lines { + if i == last && err != io.EOF { + copy(buf, line) + offset = len(line) + } else { + dp := parseCarbonLine(line) + if dp != nil { + c <- dp + } + } + } + } + if err == io.EOF { + return + } + if err != nil { + log.Printf("Error reading TCP connection: %s", err) + return + } + } +} + +func parseCarbonLine(buf []byte) *TimeSeriesPoint { + // XXX: Sanity check the metric name, or do we let that happen at + // the relay level? + var i int64 + var f float64 + var err error + + fields := bytes.Split(buf, []byte{' '}) + if len(fields) != 3 { + log.Printf("Illegal metric: %s", string(buf)) + return nil + } + + dp := new(TimeSeriesPoint) + dp.Metric = string(fields[0]) + i, err = strconv.ParseInt(string(fields[1]), 10, 64) + if err != nil { + f, err = strconv.ParseFloat(string(fields[1]), 64) + i = int64(f) + } + if err != nil { + log.Printf("Illegal metric: %s", string(buf)) + return nil + } + dp.Timestamp = i + f, err = strconv.ParseFloat(string(fields[2]), 64) + if err != nil { + log.Printf("Illegal metric: %s", string(buf)) + return nil + } + dp.Value = f + + return dp +} diff --git a/buckyd/heap.go b/buckyd/heap.go new file mode 100644 index 00000000..0191d732 --- /dev/null +++ b/buckyd/heap.go @@ -0,0 +1,201 @@ +package main + +import ( + "container/heap" + //"log" + "math" + "os" + "sort" +) + +import "github.com/jjneely/buckytools/metrics" +import "github.com/jjneely/journal" +import "github.com/jjneely/journal/timeseries" + +const ( + // MAX_CACHE is the maximum number of data points (which take 8 bytes) + // that we will store in the internal cache. This should create a 1G + // cache. + MAX_CACHE = 1024 * 1024 * 1024 / 8 +) + +// CacheHeap is a Heap object that forms a priority queue to manage +// writing data points to disk. +type CacheHeap []*CacheItem + +// CacheStore is an ordered slice of timeseries in memory. Sorted by +// metric name. +type CacheStore []*CacheItem + +type CacheItem struct { + metric string + index int + ts *metrics.TimeSeries // Epoch, Interval, Values +} + +func (c CacheStore) Len() int { + return len(c) +} + +func (c CacheStore) Swap(i, j int) { + c[i], c[j] = c[j], c[i] +} + +func (c CacheStore) Less(i, j int) bool { + return c[i].metric < c[j].metric +} + +// Search performs a binary search over the CacheStore to locate a metric. +func (c CacheStore) Search(metric string) int { + cmp := func(i int) bool { return c[i].metric < metric } + return sort.Search(c.Len(), cmp) +} + +func (c CacheHeap) Len() int { + return len(c) +} + +func (c CacheHeap) Swap(i, j int) { + c[i], c[j] = c[j], c[i] +} + +func (c CacheHeap) Less(i, j int) bool { + // This is backwards to create a MaxHeap. We write to disk timeseries + // cache with the most points first. + return len(c[j].ts.Values) < len(c[i].ts.Values) +} + +func (c *CacheHeap) Push(x interface{}) { + n := len(*c) + item := x.(*CacheItem) + item.index = n + *c = append(*c, item) +} + +func (c *CacheHeap) Pop() interface{} { + old := *c + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *c = old[0 : n-1] + return item +} + +func (c *CacheHeap) update(item *CacheItem, dp *TimeSeriesPoint) { + // Adjust timestamp for regular intervals + timestamp := dp.Timestamp - (dp.Timestamp % item.ts.Interval) + i := (item.ts.Epoch - timestamp) % item.ts.Interval + length := int64(len(item.ts.Values)) // do casting only once + + switch { + case i < 0: + // Handle change of epoch back in time + values := make([]float64, -i) + values[0] = dp.Value + for j := 1; j < len(values); j++ { + values[j] = math.NaN() + } + item.ts.Values = append(values, item.ts.Values...) + item.ts.Epoch = timestamp + case i < length: + item.ts.Values[i] = dp.Value + case i == length: + item.ts.Values = append(item.ts.Values, dp.Value) + default: // i > len(items.ts.Values) + values := make([]float64, i-length) + values[len(values)-1] = dp.Value + for j := 0; j < len(values)-1; j++ { + values[j] = math.NaN() + } + item.ts.Values = append(item.ts.Values, values...) + } + + heap.Fix(c, item.index) +} + +func newCacheItem(metric *TimeSeriesPoint) *CacheItem { + c := new(CacheItem) + ts := new(metric.TimeSeries) + + ts.Epoch = metric.Timestamp + ts.Interval = 60 // XXX: Figure out schema + ts.Values = make([]float64, 1) + ts.Values[0] = metric.Value + + c.metric = metric.Metric + c.ts = ts + return c +} + +func runCache() chan *TimeSeriesPoint { + c := make(chan *TimeSeriesPoint) + cache := make(CacheHeap, 0) + search := make(CacheStore, 0) + heap.Init(&cache) + + go func() { + defer close(c) + for m := range c { + i := search.Search(m.Metric) + switch { + case i == search.Len(): + item := newCacheItem(m) + search = append(search, item) + heap.Push(cache, item) + case search[i].metric == m.Metric: + cache.update(search[i], m) + case search[i] != m.Metric: + item := newCacheItem(m) + search = append(search, nil) + copy(search[i+1:], search[i:]) + search[i] = item + heap.Push(cache, item) + } + if len(cache[len(cache)-1].ts.Values) > int(math.Sqrt(MAX_CACHE)) { + item = heap.Pop(cache) + i = search.Search(item.metric) + copy(search[i:], search[i+1:]) + evictItem(item) + } + if cache.Len() > int(math.Sqrt(MAX_CACHE)) { + item = heap.Pop(cache) + i = search.Search(item.metric) + copy(search[i:], search[i+1:]) + evictItem(item) + } + } + + evictAll(cache) + }() + + return c +} + +func evictItem(item *CacheItem) { + // XXX: If this fails it tosses metrics on the floor + path := metrics.MetricToPath(item.metric, ".tsj") + j, err := timeseries.Open(path) + if os.IsNotExist(err) { + j, err = timeseries.Create(path, ts.Interval, + journal.NewFloat64ValueType(), make([]int64, 0)) + } + if err != nil { + log.Printf("Error opening/creating timeseries journal: %s", err) + return + } + defer j.Close() + + err = metrics.JournalUpdate(j, item.ts) + if err != nil { + log.Printf("Error updating journal: %s", err) + return + } +} + +func evictAll(cache CacheHeap) { + // XXX: If this fails it tosses metrics on the floor + for cache.Len() > 0 { + item = heap.Pop(cache).(*CacheItem) + evictItem(item) + } +} diff --git a/buckyd/schema.go b/buckyd/schema.go deleted file mode 100644 index e1e063dc..00000000 --- a/buckyd/schema.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "strings" -) - -// MetricInterval examines the given Graphite style metric name and returns -// the associated interval in seconds. XXX: This should be based off a -// configuration source. -func MetricInterval(metric string) int64 { - switch { - case strings.HasPrefix(metric, "1sec."): - return 1 - case strings.HasPrefix(metric, "1min."): - return 60 - case strings.HasPrefix(metric, "5min."): - return 300 - case strings.HasPrefix(metric, "10min."): - return 600 - case strings.HasPrefix(metric, "15min."): - return 900 - case strings.HasPrefix(metric, "hourly."): - return 3600 - case strings.HasPrefix(metric, "daily."): - return 86400 - } - - // Default - return 60 -} From 228c3b3e176714103aa785c297423f0ac9d1b0cc Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 21 Jan 2016 09:45:07 -0500 Subject: [PATCH 11/16] WIP: carbon-cache implementation --- buckyd/carbon.go | 11 +++++- buckyd/heap.go | 99 ++++++++++++++++++++++++++++++------------------ 2 files changed, 72 insertions(+), 38 deletions(-) diff --git a/buckyd/carbon.go b/buckyd/carbon.go index 8dbabeec..20cd21a7 100644 --- a/buckyd/carbon.go +++ b/buckyd/carbon.go @@ -5,7 +5,10 @@ import ( "io" "log" "net" + "os" + "os/signal" "strconv" + "time" ) type TimeSeriesPoint struct { @@ -16,7 +19,7 @@ type TimeSeriesPoint struct { func runCarbonServer(bind string) { cache := runCache() - carbon := carbonServer(bind, stop) + carbon := carbonServer(bind) for m := range carbon { cache <- m @@ -28,7 +31,11 @@ func runCarbonServer(bind string) { func carbonServer(bind string) chan *TimeSeriesPoint { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, os.Kill) - ln, err := net.Listen("tcp", bind) + tcpAddr, err := net.ResolveTCPAddr("tcp", bind) + if err != nil { + log.Fatal(err) + } + ln, err := net.ListenTCP("tcp", tcpAddr) if err != nil { log.Fatalf("Error listening for TCP carbon connections: %s", err) } diff --git a/buckyd/heap.go b/buckyd/heap.go index 0191d732..9bb181ae 100644 --- a/buckyd/heap.go +++ b/buckyd/heap.go @@ -2,10 +2,12 @@ package main import ( "container/heap" - //"log" + "log" "math" "os" + "os/signal" "sort" + "time" ) import "github.com/jjneely/buckytools/metrics" @@ -17,6 +19,10 @@ const ( // that we will store in the internal cache. This should create a 1G // cache. MAX_CACHE = 1024 * 1024 * 1024 / 8 + + // EVICT_TIME is the number of milliseconds to wait before running a + // cache eviction after receiving a metric. + EVICT_TIME = 500 ) // CacheHeap is a Heap object that forms a priority queue to manage @@ -115,7 +121,7 @@ func (c *CacheHeap) update(item *CacheItem, dp *TimeSeriesPoint) { func newCacheItem(metric *TimeSeriesPoint) *CacheItem { c := new(CacheItem) - ts := new(metric.TimeSeries) + ts := new(metrics.TimeSeries) ts.Epoch = metric.Timestamp ts.Interval = 60 // XXX: Figure out schema @@ -128,6 +134,16 @@ func newCacheItem(metric *TimeSeriesPoint) *CacheItem { } func runCache() chan *TimeSeriesPoint { + // Signal handling + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, os.Kill) + + // Limits on data in cache + timer := time.NewTimer(0) + limit := int(math.Sqrt(MAX_CACHE)) + var timerCh <-chan time.Time + + // Data structures and resulting channel c := make(chan *TimeSeriesPoint) cache := make(CacheHeap, 0) search := make(CacheStore, 0) @@ -135,48 +151,60 @@ func runCache() chan *TimeSeriesPoint { go func() { defer close(c) - for m := range c { - i := search.Search(m.Metric) - switch { - case i == search.Len(): - item := newCacheItem(m) - search = append(search, item) - heap.Push(cache, item) - case search[i].metric == m.Metric: - cache.update(search[i], m) - case search[i] != m.Metric: - item := newCacheItem(m) - search = append(search, nil) - copy(search[i+1:], search[i:]) - search[i] = item - heap.Push(cache, item) + defer evictAll(search, cache) + for { + select { + case m := <-c: + updateCache(m, search, cache) + if timerCh == nil { + timer.Reset(EVICT_TIME * time.Millisecond) + timerCh = timer.C + } + case <-timerCh: + evictItem(search, cache) + timerCh = nil + i := len(cache) + for i > 0 && (i > limit || len(cache[i-1].ts.Values) > limit) { + evictItem(search, cache) + } + case <-sig: + return } - if len(cache[len(cache)-1].ts.Values) > int(math.Sqrt(MAX_CACHE)) { - item = heap.Pop(cache) - i = search.Search(item.metric) - copy(search[i:], search[i+1:]) - evictItem(item) - } - if cache.Len() > int(math.Sqrt(MAX_CACHE)) { - item = heap.Pop(cache) - i = search.Search(item.metric) - copy(search[i:], search[i+1:]) - evictItem(item) - } - } - evictAll(cache) + } }() return c } -func evictItem(item *CacheItem) { +func updateCache(m *TimeSeriesPoint, search CacheStore, cache CacheHeap) { + i := search.Search(m.Metric) + switch { + case i == search.Len(): + item := newCacheItem(m) + search = append(search, item) + heap.Push(cache, item) + case search[i].metric == m.Metric: + cache.update(search[i], m) + case search[i] != m.Metric: + item := newCacheItem(m) + search = append(search, nil) + copy(search[i+1:], search[i:]) + search[i] = item + heap.Push(cache, item) + } +} + +func evictItem(search CacheStore, cache CacheHeap) { // XXX: If this fails it tosses metrics on the floor + item := heap.Pop(cache).(*CacheItem) + i := search.Search(item.metric) + copy(search[i:], search[i+1:]) + path := metrics.MetricToPath(item.metric, ".tsj") j, err := timeseries.Open(path) if os.IsNotExist(err) { - j, err = timeseries.Create(path, ts.Interval, + j, err = timeseries.Create(path, item.ts.Interval, journal.NewFloat64ValueType(), make([]int64, 0)) } if err != nil { @@ -192,10 +220,9 @@ func evictItem(item *CacheItem) { } } -func evictAll(cache CacheHeap) { +func evictAll(search CacheStore, cache CacheHeap) { // XXX: If this fails it tosses metrics on the floor for cache.Len() > 0 { - item = heap.Pop(cache).(*CacheItem) - evictItem(item) + evictItem(search, cache) } } From 91cd5a531ff813afd7f25fb7b4d7fdfaea6a5ec6 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 21 Jan 2016 10:43:38 -0500 Subject: [PATCH 12/16] WIP: carbon-cache implementation: this at least compiles --- buckyd/heap.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/buckyd/heap.go b/buckyd/heap.go index 9bb181ae..da9152be 100644 --- a/buckyd/heap.go +++ b/buckyd/heap.go @@ -183,21 +183,21 @@ func updateCache(m *TimeSeriesPoint, search CacheStore, cache CacheHeap) { case i == search.Len(): item := newCacheItem(m) search = append(search, item) - heap.Push(cache, item) + heap.Push(&cache, item) case search[i].metric == m.Metric: cache.update(search[i], m) - case search[i] != m.Metric: + case search[i].metric != m.Metric: item := newCacheItem(m) search = append(search, nil) copy(search[i+1:], search[i:]) search[i] = item - heap.Push(cache, item) + heap.Push(&cache, item) } } func evictItem(search CacheStore, cache CacheHeap) { // XXX: If this fails it tosses metrics on the floor - item := heap.Pop(cache).(*CacheItem) + item := heap.Pop(&cache).(*CacheItem) i := search.Search(item.metric) copy(search[i:], search[i+1:]) From 04a849b491ab48126aa0afe9f635f26e2d77d723 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 21 Jan 2016 11:29:19 -0500 Subject: [PATCH 13/16] WIP: wire up the carbon-cache implementation --- buckyd/carbon.go | 1 + buckyd/heap.go | 15 ++++++--------- buckyd/main.go | 7 +++++++ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/buckyd/carbon.go b/buckyd/carbon.go index 20cd21a7..88b06f93 100644 --- a/buckyd/carbon.go +++ b/buckyd/carbon.go @@ -24,6 +24,7 @@ func runCarbonServer(bind string) { for m := range carbon { cache <- m } + close(cache) } // carbonServer starts a TCP listener and handles incoming Graphite line diff --git a/buckyd/heap.go b/buckyd/heap.go index da9152be..53cf52ff 100644 --- a/buckyd/heap.go +++ b/buckyd/heap.go @@ -5,7 +5,6 @@ import ( "log" "math" "os" - "os/signal" "sort" "time" ) @@ -134,16 +133,13 @@ func newCacheItem(metric *TimeSeriesPoint) *CacheItem { } func runCache() chan *TimeSeriesPoint { - // Signal handling - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, os.Kill) - // Limits on data in cache timer := time.NewTimer(0) limit := int(math.Sqrt(MAX_CACHE)) var timerCh <-chan time.Time // Data structures and resulting channel + // XXX: cache/search need to be global or shared via reference better c := make(chan *TimeSeriesPoint) cache := make(CacheHeap, 0) search := make(CacheStore, 0) @@ -154,7 +150,11 @@ func runCache() chan *TimeSeriesPoint { defer evictAll(search, cache) for { select { - case m := <-c: + case m, ok := <-c: + if !ok { + // channel closed + return + } updateCache(m, search, cache) if timerCh == nil { timer.Reset(EVICT_TIME * time.Millisecond) @@ -167,10 +167,7 @@ func runCache() chan *TimeSeriesPoint { for i > 0 && (i > limit || len(cache[i-1].ts.Values) > limit) { evictItem(search, cache) } - case <-sig: - return } - } }() diff --git a/buckyd/main.go b/buckyd/main.go index b1dfd2b9..16cdb448 100644 --- a/buckyd/main.go +++ b/buckyd/main.go @@ -89,6 +89,7 @@ func main() { var replicas int var hashType string var bindAddress string + var graphiteBind string hostname, err := os.Hostname() if err != nil { hostname = "UNKNOWN" @@ -111,6 +112,8 @@ func main() { fmt.Sprintf("Consistent Hash algorithm to use: %v", SupportedHashTypes)) flag.IntVar(&replicas, "replicas", 1, "Number of copies of each metric in the cluster.") + flag.StringVar(&graphiteBind, "graphite", "", + "Run a Graphite line protocol server at the given bind address:port") flag.Parse() i := sort.SearchStrings(SupportedHashTypes, hashType) @@ -126,6 +129,10 @@ func main() { http.HandleFunc("/hashring", listHashring) http.HandleFunc("/timeseries/", serveTimeSeries) + if graphiteBind != "" { + log.Printf("Starting Graphite server on %s", graphiteBind) + go runCarbonServer(graphiteBind) + } log.Printf("Starting server on %s", bindAddress) err = http.ListenAndServe(bindAddress, nil) if err != nil { From f23dad74d92be5b3d83f477027415cc3932f0e37 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 21 Jan 2016 18:05:30 -0500 Subject: [PATCH 14/16] WIP: carbon ingestion works reasonably This is actually working code, and shows a lot of promise for being able to ingest graphite line protocol metrics. --- buckyd/carbon.go | 29 +++++++++++++-------- buckyd/heap.go | 68 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 63 insertions(+), 34 deletions(-) diff --git a/buckyd/carbon.go b/buckyd/carbon.go index 88b06f93..932291b8 100644 --- a/buckyd/carbon.go +++ b/buckyd/carbon.go @@ -21,6 +21,7 @@ func runCarbonServer(bind string) { cache := runCache() carbon := carbonServer(bind) + // carbon is closed when signals are recieved for m := range carbon { cache <- m } @@ -79,7 +80,7 @@ func handleCarbon(conn net.Conn, c chan *TimeSeriesPoint) { for { n, err := conn.Read(buf[offset:]) if n > 0 { - lines := bytes.Split(buf[:n], []byte{'\n'}) + lines := bytes.Split(buf[:n+offset], []byte{'\n'}) last := len(lines) - 1 for i, line := range lines { if i == last && err != io.EOF { @@ -94,6 +95,7 @@ func handleCarbon(conn net.Conn, c chan *TimeSeriesPoint) { } } if err == io.EOF { + log.Printf("Client closed connection") return } if err != nil { @@ -112,28 +114,33 @@ func parseCarbonLine(buf []byte) *TimeSeriesPoint { fields := bytes.Split(buf, []byte{' '}) if len(fields) != 3 { - log.Printf("Illegal metric: %s", string(buf)) + log.Printf("Illegal protocol line: %s", string(buf)) return nil } - dp := new(TimeSeriesPoint) + + // Metric Name dp.Metric = string(fields[0]) - i, err = strconv.ParseInt(string(fields[1]), 10, 64) + + // Metric Value + f, err = strconv.ParseFloat(string(fields[1]), 64) + if err != nil { + log.Printf("Illegal protocol line: %s", string(buf)) + return nil + } + dp.Value = f + + // Metric Timestamp + i, err = strconv.ParseInt(string(fields[2]), 10, 64) if err != nil { f, err = strconv.ParseFloat(string(fields[1]), 64) i = int64(f) } if err != nil { - log.Printf("Illegal metric: %s", string(buf)) + log.Printf("Illegal protocol line: %s", string(buf)) return nil } dp.Timestamp = i - f, err = strconv.ParseFloat(string(fields[2]), 64) - if err != nil { - log.Printf("Illegal metric: %s", string(buf)) - return nil - } - dp.Value = f return dp } diff --git a/buckyd/heap.go b/buckyd/heap.go index 53cf52ff..6b8a4b44 100644 --- a/buckyd/heap.go +++ b/buckyd/heap.go @@ -21,7 +21,7 @@ const ( // EVICT_TIME is the number of milliseconds to wait before running a // cache eviction after receiving a metric. - EVICT_TIME = 500 + EVICT_TIME = 250 ) // CacheHeap is a Heap object that forms a priority queue to manage @@ -38,6 +38,12 @@ type CacheItem struct { ts *metrics.TimeSeries // Epoch, Interval, Values } +// cache is our global heap / priority queue for persisting metrics to disk +var cache CacheHeap + +// search is out global ordered search list for finding metrics in the cache +var search CacheStore + func (c CacheStore) Len() int { return len(c) } @@ -52,7 +58,7 @@ func (c CacheStore) Less(i, j int) bool { // Search performs a binary search over the CacheStore to locate a metric. func (c CacheStore) Search(metric string) int { - cmp := func(i int) bool { return c[i].metric < metric } + cmp := func(i int) bool { return c[i].metric >= metric } return sort.Search(c.Len(), cmp) } @@ -62,6 +68,8 @@ func (c CacheHeap) Len() int { func (c CacheHeap) Swap(i, j int) { c[i], c[j] = c[j], c[i] + c[i].index = i + c[j].index = j } func (c CacheHeap) Less(i, j int) bool { @@ -89,7 +97,7 @@ func (c *CacheHeap) Pop() interface{} { func (c *CacheHeap) update(item *CacheItem, dp *TimeSeriesPoint) { // Adjust timestamp for regular intervals timestamp := dp.Timestamp - (dp.Timestamp % item.ts.Interval) - i := (item.ts.Epoch - timestamp) % item.ts.Interval + i := (timestamp - item.ts.Epoch) / item.ts.Interval length := int64(len(item.ts.Values)) // do casting only once switch { @@ -139,34 +147,40 @@ func runCache() chan *TimeSeriesPoint { var timerCh <-chan time.Time // Data structures and resulting channel - // XXX: cache/search need to be global or shared via reference better c := make(chan *TimeSeriesPoint) - cache := make(CacheHeap, 0) - search := make(CacheStore, 0) + cache = make(CacheHeap, 0) + search = make(CacheStore, 0) heap.Init(&cache) + // close c to stop processing metrics go func() { - defer close(c) - defer evictAll(search, cache) + defer evictAll() for { select { + case <-timerCh: + if len(cache) == 0 { + timerCh = nil + } else { + evictItem() + timer.Reset(EVICT_TIME * time.Millisecond) + } case m, ok := <-c: if !ok { // channel closed return } - updateCache(m, search, cache) - if timerCh == nil { + updateCache(m) + + // We block processing new metrics if our cache is full + for len(cache) > limit || len(cache[len(cache)-1].ts.Values) > limit { + evictItem() + } + + // Setup timer to purge cache + if timerCh == nil && len(cache) > 0 { timer.Reset(EVICT_TIME * time.Millisecond) timerCh = timer.C } - case <-timerCh: - evictItem(search, cache) - timerCh = nil - i := len(cache) - for i > 0 && (i > limit || len(cache[i-1].ts.Values) > limit) { - evictItem(search, cache) - } } } }() @@ -174,7 +188,8 @@ func runCache() chan *TimeSeriesPoint { return c } -func updateCache(m *TimeSeriesPoint, search CacheStore, cache CacheHeap) { +func updateCache(m *TimeSeriesPoint) { + log.Printf("Updating heap: '%s' %v %v", m.Metric, m.Value, m.Timestamp) i := search.Search(m.Metric) switch { case i == search.Len(): @@ -192,11 +207,17 @@ func updateCache(m *TimeSeriesPoint, search CacheStore, cache CacheHeap) { } } -func evictItem(search CacheStore, cache CacheHeap) { +func evictItem() { // XXX: If this fails it tosses metrics on the floor item := heap.Pop(&cache).(*CacheItem) + log.Printf("Evict: '%s' with %d values", item.metric, len(item.ts.Values)) i := search.Search(item.metric) - copy(search[i:], search[i+1:]) + search[i] = nil + if len(search) > 1 { + search = append(search[:i], search[i+1:]...) + } else { + search = search[0:0] + } path := metrics.MetricToPath(item.metric, ".tsj") j, err := timeseries.Open(path) @@ -213,13 +234,14 @@ func evictItem(search CacheStore, cache CacheHeap) { err = metrics.JournalUpdate(j, item.ts) if err != nil { log.Printf("Error updating journal: %s", err) + log.Printf("Journal: Epoch %d; Int: %d; Last: %d", j.Epoch(), j.Interval(), j.Last()) + log.Printf("TimeSeries: Epoch: %d; Int: %d; Values: %d", item.ts.Epoch, item.ts.Interval, len(item.ts.Values)) return } } -func evictAll(search CacheStore, cache CacheHeap) { - // XXX: If this fails it tosses metrics on the floor +func evictAll() { for cache.Len() > 0 { - evictItem(search, cache) + evictItem() } } From 65e9a9fe7c8996f7de5f9cb954dbc103f074b98a Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Tue, 22 Mar 2016 13:43:56 -0400 Subject: [PATCH 15/16] bucky-pickle-relay: More verbose debugging --- bucky-pickle-relay/main.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/bucky-pickle-relay/main.go b/bucky-pickle-relay/main.go index 6b33cba3..2c3848c8 100644 --- a/bucky-pickle-relay/main.go +++ b/bucky-pickle-relay/main.go @@ -169,13 +169,17 @@ func handleConn(c chan []string, conn net.Conn) { // Pickle is preceded by an unsigned long integer of 4 bytes (!L) err := readSlice(conn, sizeBuf) if err == io.EOF { - // Remote end closed connection + if debug { + log.Printf("Normal connection close from %s", conn.RemoteAddr().String()) + } return } else if neterr, ok := err.(*net.OpError); ok && strings.Contains(neterr.Error(), "connection reset by peer") { - // XXX: This used to work in Go 1.4 neterr.Err == syscall.ECONNRESET // Connection reset by peer between Pickles // or TCP probe health check // at this point in the proto we ignore + if debug { + log.Printf("Connection reset: %s", conn.RemoteAddr().String()) + } return } else if neterr, ok := err.(net.Error); ok && neterr.Timeout() { // Timeout waiting for data on connection @@ -206,6 +210,11 @@ func handleConn(c chan []string, conn net.Conn) { seenPickles++ metrics := decodePickle(dataBuf) + if debug { + for i := range metrics { + log.Printf("Found Metric: %s", metrics[i]) + } + } if metrics != nil && len(metrics) > 0 { c <- metrics } From a1161e6847eedbb58f86177a3b90298667599c9f Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Tue, 22 Mar 2016 13:47:08 -0400 Subject: [PATCH 16/16] findhash - port to new hashing APIs --- findhash/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/findhash/main.go b/findhash/main.go index 050e1d1e..ef895ba6 100644 --- a/findhash/main.go +++ b/findhash/main.go @@ -32,7 +32,7 @@ func getConfig(file string) []string { return ret } -func printKeyAnalysis(hr *hashing.HashRing, file string) { +func printKeyAnalysis(hr *hashing.CarbonHashRing, file string) { keys := make(map[string]int) total := 0 data, err := ioutil.ReadFile(file) @@ -65,7 +65,7 @@ func printKeyAnalysis(hr *hashing.HashRing, file string) { fmt.Printf("Deviation: %.4f\n", math.Sqrt(variance/float64(len(keys)))) } -func printAnalysis(hr *hashing.HashRing) { +func printAnalysis(hr *hashing.CarbonHashRing) { hash := hr.BucketsPerNode() keys := make([]string, 0) min := 0xFFFF @@ -94,8 +94,8 @@ func printAnalysis(hr *hashing.HashRing) { fmt.Printf("Deviation: %.4f\n", math.Sqrt(v)) } -func makeRing(config []string) *hashing.HashRing { - hr := hashing.NewHashRing() +func makeRing(config []string) *hashing.CarbonHashRing { + hr := hashing.NewCarbonHashRing() for _, n := range config { fields := strings.Split(n, ":") if len(fields) < 2 {