diff --git a/core/consumer/consumer.go b/core/consumer/consumer.go index 6170d63e..0cd54a53 100644 --- a/core/consumer/consumer.go +++ b/core/consumer/consumer.go @@ -61,6 +61,7 @@ type consumer struct { logger *zap.SugaredLogger metricsScope tally.Scope registry TopicRegistry + classifiers []errs.Classifier mu sync.Mutex stopped bool @@ -76,12 +77,22 @@ type activeSubscription struct { } // New creates a new consumer. -// registry provides queue and subscription config for topics. -func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry) Consumer { +// +// registry provides queue and subscription config for topics. classifiers are +// the per-backend error classifiers used to decide whether an error returned +// by a controller is retryable (nack for redelivery) or non-retryable (reject +// to DLQ). The consumer runs errs.Classify(err, classifiers...) exactly once +// per failing delivery and then drives ack/nack/reject from the resulting +// chain via plain errs.IsRetryable type checks. Pass any backend-specific +// classifiers the controllers rely on (e.g. core/errs/mysql.Classifier); +// passing none is fine for tests where controllers always return explicit +// framework-wrapped errors. +func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry, classifiers ...errs.Classifier) Consumer { return &consumer{ logger: logger, metricsScope: scope.SubScope("consumer"), registry: registry, + classifiers: classifiers, subscriptions: make(map[TopicKey]*activeSubscription), } } @@ -370,6 +381,12 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d metrics.NamedTimer(controllerScope, opName, "controller_latency", elapsed, metrics.NewTag("success", successTag)) if err != nil { + // Single explicit classification pass: if err's chain does not already + // carry a framework wrap, ask the configured classifiers and prepend + // the matching framework type. Downstream errs.IsRetryable / IsUserError + // calls then only do a plain type check on the result. + err = errs.Classify(err, m.classifiers...) + // Check if the error is non-retryable (poison pill message) if !errs.IsRetryable(err) { m.logger.Errorw("non-retryable controller error, rejecting message", diff --git a/core/errs/README.md b/core/errs/README.md index 84ff3f47..1f51f4a1 100644 --- a/core/errs/README.md +++ b/core/errs/README.md @@ -18,30 +18,109 @@ Errors are classified along two axes: **Infra by default.** Any error that is not explicitly wrapped with `NewUserError` is an infra error. There is no `NewInfraError` constructor — infra is the default classification. -## Who Classifies Errors +## Two Routes to a Classification -**Extensions return plain Go errors.** Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do not classify errors as user or infra. +A returned `error` reaches `IsUserError` / `IsRetryable` / `IsDependencyError` carrying one of the framework types (`*userError` / `*infraError`). It gets there one of two ways: -**Service controllers classify errors.** The controller that calls an extension decides whether the error is user-caused or infrastructure-caused, and whether it should be retried: +1. **Explicit wrap by the controller** — the controller knows the meaning of the failure and wraps the cause with `NewUserError`, `NewRetryableError`, `NewDependencyError`, or `NewRetryableDependencyError` before returning. +2. **Automatic wrap by `Classify`** — the controller returns a raw driver/library/sentinel error, and a per-backend `Classifier` recognises it later in the pipeline (typically inside the consumer) and adds the appropriate framework wrap. + +Both routes feed the same downstream helpers; the chain that reaches `IsRetryable` looks identical regardless of who wrapped it. + +## `Classify` and the `Classifier` Interface + +`Classifier` inspects a **single error node** and returns a `Verdict`: ```go -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - // Extension returns a plain error - result, err := c.mergeChecker.Check(ctx, queue, change) - if err != nil { - // Controller classifies: merge checker failure is infra, worth retrying - return errs.NewRetryableError(fmt.Errorf("merge check failed: %w", err)) - } +type Classifier interface { + Classify(err error) Verdict +} +``` + +Verdicts: `Unknown` (this node carries no signal), `User`, `Infra`, `InfraRetryable`, `InfraDependency`, `InfraDependencyRetryable`. + +`Classify(err, classifiers...)` is the single, explicit pass that turns a raw chain into a wrapped one. It is called **exactly once per chain** — typically by the consumer immediately after the controller returns. After that point, callers use only the `IsXxx` helpers, which are pure type checks. + +`Classify` walks the chain twice: + +1. **Pass 1 — framework-wrap check.** A cheap type switch looks for an existing `*userError` / `*infraError` anywhere in the chain. If found, the chain is already interpretable and `Classify` returns `err` unchanged. **No classifier is invoked.** +2. **Pass 2 — classifier walk.** From outermost to innermost node, each registered classifier is asked for a verdict. The first non-`Unknown` verdict wins and `err` is wrapped with the matching framework constructor. + +If no classifier recognises anything, `err` is returned unchanged — and behaves as non-retryable infra at the helper layer. + +## Adding a Backend-Specific Classifier + +Backend classifiers live alongside the extension they classify, under `core/errs//`. The canonical examples are `core/errs/mysql` (MySQL driver errors) and `core/errs/generic` (transport-agnostic concerns such as `context.Canceled`). + +A classifier: + +- Inspects exactly one node — the `err` argument passed in. **Do not call `errors.Is` / `errors.As`** from inside `Classify`; the framework owns the chain walk. Calling it yourself can shadow a deeper-but-different verdict and breaks the controller-override rules described below. +- Returns `Unknown` for anything it does not recognise, so the surrounding walker can continue. +- Is stateless. The convention is to expose a package-level singleton value rather than a constructor: + +```go +// core/errs/foo/foo.go +package foo + +import "github.com/uber/submitqueue/core/errs" - if !result.Mergeable { - // Controller classifies: not mergeable is a user error, never retry - return errs.NewUserError(fmt.Errorf("not mergeable: %s", result.Reason)) +var Classifier errs.Classifier = classifier{} + +type classifier struct{} + +func (classifier) Classify(err error) errs.Verdict { + // Type-assert / sentinel-compare on err directly, never errors.As / errors.Is. + if fe, ok := err.(*FooError); ok { + return classifyFooCode(fe.Code) } + return errs.Unknown +} +``` + +Servers wire each classifier into the consumer as a vararg. Order matters only when two classifiers might both match a node — earlier classifiers win: + +```go +import ( + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" +) + +c := consumer.New(logger, scope, registry, + genericerrs.Classifier, + mysqlerrs.Classifier, +) +``` + +Tests follow the same shape: assert per-node behaviour against `Classifier.Classify(node)` directly, and assert end-to-end behaviour by running `errs.Classify(err, Classifier)` and checking the helpers (`IsRetryable`, `IsUserError`, …) on the result. See `core/errs/mysql/mysql_test.go` and `core/errs/generic/generic_test.go`. - // ... +## Overriding Classification from a Controller + +Because pass 1 short-circuits on the first framework wrap it finds, **an explicit wrap by the controller always wins over any classifier**. Use this when the controller has context the classifier cannot — typically when the same low-level error means different things in different call sites. + +```go +result, err := c.storage.Get(ctx, id) +if errors.Is(err, storage.ErrNotFound) { + // This caller treats "not found" as a user error: the user asked for an + // unknown resource. The mysql classifier never gets a vote because the + // framework wrap short-circuits pass 1. + return errs.NewUserError(fmt.Errorf("request %s: %w", id, err)) +} +if err != nil { + // Hand the raw error to Classify — the mysql classifier will recognise + // deadlocks, lock-wait timeouts, etc. and wrap them as retryable infra. + return fmt.Errorf("get %s: %w", id, err) } ``` +Two practical rules fall out of the short-circuit semantics: + +- **Wrap with a framework constructor as soon as the controller knows the right verdict.** Any wrap added later in the chain still wins, but wrapping early keeps the intent close to the decision. +- **A wrap anywhere in the chain blocks all classifiers — including for nodes deeper than the wrap.** If you want a classifier to still get a look at the cause, do not wrap above it. (In practice this is rare: controllers wrap because they have the final answer.) + +## Extensions Return Plain Go Errors + +Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do **not** classify errors as user or infra — that is the controller's (and `Classify`'s) job. + This separation keeps extensions reusable across contexts. The same `storage.ErrNotFound` might be a user error in one controller (user requested a non-existent resource) and an infra error in another (expected record is missing). ## Error Chain Compatibility @@ -71,3 +150,5 @@ errors.Is(err, ErrNotFound) // true — cause is in the chain | `IsUserError(err)` | `err` is or wraps a `userError` | | `IsRetryable(err)` | `err` is or wraps an infra error with the retryable flag set | | `IsDependencyError(err)` | `err` is or wraps an infra error marked as dependency | + +All three are type-only checks. They do not invoke classifiers — pair them with a preceding `Classify` call when the controller's error may not carry an explicit wrap. diff --git a/core/errs/errs.go b/core/errs/errs.go index c18e0319..667b6f95 100644 --- a/core/errs/errs.go +++ b/core/errs/errs.go @@ -15,7 +15,6 @@ package errs import ( - "context" "errors" ) @@ -107,34 +106,147 @@ func (e *infraError) Is(target error) bool { return ok } -// IsUserError checks if err is or wraps a user error. +// Verdict is the classification of a single error node, returned by a +// Classifier. Unknown means the node carries no signal and the chain walker +// should keep looking; every other value names a terminal classification. +type Verdict int + +const ( + // Unknown means this node carries no classification. The chain walker + // will move on to the next node in the unwrap chain. + Unknown Verdict = iota + // User means the error is caused by the user's input or action (e.g. a + // merge conflict or invalid request) and must not be retried. + User + // Infra means a non-retryable infrastructure failure: something below the + // caller broke in a way that retrying will not fix (e.g. a schema or + // programmer bug). This is the implicit verdict for an unclassified chain, + // so Classify does not add a wrap for it. + Infra + // InfraRetryable means a transient infrastructure failure that is + // expected to succeed on retry (e.g. a deadlock, lock-wait timeout, or + // dropped connection). + InfraRetryable + // InfraDependency means a non-retryable failure originating in a + // downstream dependency outside the caller's control (e.g. an external + // service rejecting the request). + InfraDependency + // InfraDependencyRetryable means a transient failure originating in a + // downstream dependency (e.g. an external service is briefly unavailable) + // that is expected to succeed on retry. + InfraDependencyRetryable +) + +// Classifier inspects a single error node (not the whole chain) and returns a +// Verdict. Implementations should return Unknown for nodes they do not +// recognize so the chain walker can continue down the unwrap chain. +// +// Classifiers must not call errors.As / errors.Is themselves, which would walk +// the chain and could shadow a classification carried by an outer node (such +// as a controller's explicit NewUserError wrap). The package-level Classify +// function owns the walk. +// +// Classifiers are typically stateless; the canonical convention is to expose a +// package-level singleton value (e.g. mysqlerrs.Classifier) rather than a +// constructor. +type Classifier interface { + Classify(err error) Verdict +} + +// Classify is the single, explicit classification pass. It is intended to be +// called exactly once per error chain — typically by the consumer immediately +// after a controller returns — and produces a chain that subsequent IsUserError +// / IsRetryable / IsDependencyError calls can interpret with simple type +// checks (no further classifier walks). +// +// Semantics: +// +// - nil in, nil out. +// - If err's chain already carries a framework classification (*userError or +// *infraError anywhere in the chain), returns err unchanged — the chain is +// already interpretable by IsUserError / IsRetryable / IsDependencyError. +// - Otherwise, walks the chain from outermost to innermost, asking each +// classifier per node. The FIRST non-Unknown verdict wins; the outermost +// such node determines the wrap. err is wrapped with the framework +// constructor matching that verdict (User -> NewUserError, InfraRetryable +// -> NewRetryableError, etc.) and the wrapped error is returned. +// - Verdict Infra means "non-retryable infra" — which is already the default +// behavior for an unwrapped chain, so no wrap is added. +// - If no classifier recognises anything, err is returned unchanged. +// +// Implementation: two passes over the chain. Pass 1 is a cheap type check +// looking for an existing framework wrap and short-circuits if one is found — +// no classifier is invoked. Pass 2 runs the configured classifiers per node. +// Walking the chain is cheap relative to a classifier call, so this avoids +// running classifiers whenever the chain is already classified deeper down. +// +// NOTE: this central classifier model cannot disambiguate errors of the same +// underlying type produced by different extensions (e.g. a net.OpError from a +// mysql connection vs the same type from an HTTP caller would both match the +// mysql classifier here). Resolving that requires per-extension provenance +// tagging; intentionally deferred. +func Classify(err error, classifiers ...Classifier) error { + if err == nil { + return nil + } + + // Pass 1 — cheap framework-wrap check. If any node already carries a + // framework type, the chain is interpretable as-is and classifiers are + // never invoked. + for cur := err; cur != nil; cur = errors.Unwrap(cur) { + switch cur.(type) { + case *userError, *infraError: + return err + } + } + + // Pass 2 — run classifiers per node from outermost to innermost. Stop at + // the first non-Unknown verdict. + var verdict Verdict + for cur := err; cur != nil && verdict == Unknown; cur = errors.Unwrap(cur) { + for _, c := range classifiers { + if v := c.Classify(cur); v != Unknown { + verdict = v + break + } + } + } + + switch verdict { + case User: + return NewUserError(err) + case InfraRetryable: + return NewRetryableError(err) + case InfraDependency: + return NewDependencyError(err) + case InfraDependencyRetryable: + return NewRetryableDependencyError(err) + } + // Unknown or Infra — no wrap needed; the existing chain already behaves as + // non-retryable infra at the IsRetryable / IsUserError layer. + return err +} + +// IsUserError reports whether err is or wraps a user error, i.e. an error +// produced by NewUserError. Inspects only the framework types in the chain. func IsUserError(err error) bool { - var target *userError - return errors.As(err, &target) + var ue *userError + return errors.As(err, &ue) } -// IsRetryable checks if err is retryable. Returns true when err is or -// wraps an infrastructure error whose retryable flag is set or when err is context.Canceled. User errors are -// never retryable. A generic error (not wrapped) returns false, consistent -// with the convention that unclassified errors are non-retryable. +// IsRetryable reports whether err is or wraps an infra error marked +// retryable, i.e. an error produced by NewRetryableError or +// NewRetryableDependencyError. Inspects only the framework types in the chain. func IsRetryable(err error) bool { - if errors.Is(err, context.Canceled) { - return true - } var ie *infraError - if errors.As(err, &ie) { - return ie.retryable - } - return false + return errors.As(err, &ie) && ie.retryable } -// IsDependencyError checks if err is or wraps an infra error that originated -// in a downstream dependency. Returns false for user errors, generic errors, -// and infra errors not marked as dependency. +// IsDependencyError reports whether err is or wraps an infra error marked as +// originating in a downstream dependency, i.e. an error produced by +// NewDependencyError or NewRetryableDependencyError. Inspects only the +// framework types in the chain. func IsDependencyError(err error) bool { var ie *infraError - if errors.As(err, &ie) { - return ie.dependency - } - return false + return errors.As(err, &ie) && ie.dependency } diff --git a/core/errs/generic/BUILD.bazel b/core/errs/generic/BUILD.bazel new file mode 100644 index 00000000..10561c9b --- /dev/null +++ b/core/errs/generic/BUILD.bazel @@ -0,0 +1,19 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "generic", + srcs = ["generic.go"], + importpath = "github.com/uber/submitqueue/core/errs/generic", + visibility = ["//visibility:public"], + deps = ["//core/errs"], +) + +go_test( + name = "generic_test", + srcs = ["generic_test.go"], + embed = [":generic"], + deps = [ + "//core/errs", + "@com_github_stretchr_testify//assert", + ], +) diff --git a/core/errs/generic/generic.go b/core/errs/generic/generic.go new file mode 100644 index 00000000..47e794c0 --- /dev/null +++ b/core/errs/generic/generic.go @@ -0,0 +1,53 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package generic provides an errs.Classifier for errors that are not tied to +// any particular backend. Servers should wire this classifier in alongside +// any backend-specific classifiers (e.g. core/errs/mysql). +package generic + +import ( + "context" + + "github.com/uber/submitqueue/core/errs" +) + +// Classifier recognises generic, non-backend-specific errors and returns +// errs.Unknown for anything it does not recognise so the surrounding +// errs.Classify walker can keep looking down the unwrap chain. +// +// The classifier is stateless; this package-level singleton is the canonical +// handle. Pass it into consumer.New as a vararg. +var Classifier errs.Classifier = classifier{} + +type classifier struct{} + +// Classify inspects a single node. Per the errs.Classifier contract, this +// must not call errors.Is / errors.As — errs.Classify owns the chain walk. +func (classifier) Classify(err error) errs.Verdict { + // Cancellation signals that the caller aborted the work in flight + // (process shutdown, deadline on the inbound RPC, parent operation gone) — + // it is not a statement about the work itself being invalid. The same + // message handed to a fresh process with an uncancelled context is + // expected to succeed, so nacking for redelivery is the correct response. + // Cases where cancellation truly means "do not run this again" are + // caller-specific and should be expressed by wrapping with an explicit + // NewUserError / NewDependencyError before returning; the pass-1 + // framework-wrap check in errs.Classify will then short-circuit before + // this classifier is consulted. + if err == context.Canceled { + return errs.InfraRetryable + } + return errs.Unknown +} diff --git a/core/errs/generic/generic_test.go b/core/errs/generic/generic_test.go new file mode 100644 index 00000000..048f2efd --- /dev/null +++ b/core/errs/generic/generic_test.go @@ -0,0 +1,76 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package generic + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/uber/submitqueue/core/errs" +) + +func TestClassifier_ContextCanceled(t *testing.T) { + assert.Equal(t, errs.InfraRetryable, Classifier.Classify(context.Canceled)) +} + +func TestClassifier_Unknown(t *testing.T) { + tests := []struct { + name string + err error + }{ + // Per-node contract — Classifier should NOT match a wrapped + // context.Canceled; the surrounding errs.Classify walk will reach the + // inner node and ask Classifier again there. + {"wrapped context.Canceled", fmt.Errorf("op: %w", context.Canceled)}, + {"deadline exceeded", context.DeadlineExceeded}, + {"plain error", errors.New("anything")}, + {"nil", nil}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, errs.Unknown, Classifier.Classify(tt.err)) + }) + } +} + +func TestClassifier_AppliedViaClassify(t *testing.T) { + t.Run("bare context.Canceled becomes retryable infra", func(t *testing.T) { + out := errs.Classify(context.Canceled, Classifier) + assert.True(t, errs.IsRetryable(out)) + }) + + t.Run("wrapped context.Canceled becomes retryable infra", func(t *testing.T) { + // The chain walker reaches the inner context.Canceled node and the + // classifier matches there. + wrapped := fmt.Errorf("process: %w", context.Canceled) + out := errs.Classify(wrapped, Classifier) + assert.True(t, errs.IsRetryable(out)) + }) + + t.Run("framework wrap in chain wins", func(t *testing.T) { + // A controller explicitly classified the shutdown as non-retryable. + // The pass-1 framework-wrap check short-circuits before Classifier + // runs. + err := errs.NewUserError(context.Canceled) + out := errs.Classify(err, Classifier) + assert.Same(t, err, out) + assert.False(t, errs.IsRetryable(out)) + assert.True(t, errs.IsUserError(out)) + }) +} diff --git a/core/errs/mysql/BUILD.bazel b/core/errs/mysql/BUILD.bazel new file mode 100644 index 00000000..332ad6d4 --- /dev/null +++ b/core/errs/mysql/BUILD.bazel @@ -0,0 +1,23 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mysql", + srcs = ["mysql.go"], + importpath = "github.com/uber/submitqueue/core/errs/mysql", + visibility = ["//visibility:public"], + deps = [ + "//core/errs", + "@com_github_go_sql_driver_mysql//:mysql", + ], +) + +go_test( + name = "mysql_test", + srcs = ["mysql_test.go"], + embed = [":mysql"], + deps = [ + "//core/errs", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_stretchr_testify//assert", + ], +) diff --git a/core/errs/mysql/mysql.go b/core/errs/mysql/mysql.go new file mode 100644 index 00000000..7753366b --- /dev/null +++ b/core/errs/mysql/mysql.go @@ -0,0 +1,167 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mysql provides an errs.Classifier for errors originating from the +// go-sql-driver/mysql driver and the standard database/sql + net packages +// commonly seen when talking to a MySQL backend. +// +// The classifier inspects a single error node at a time, as required by the +// errs.Classifier contract. It returns errs.Unknown for nodes it does not +// recognise so the surrounding errs.Classify chain walk can continue to +// deeper nodes. +package mysql + +import ( + "database/sql" + "database/sql/driver" + "net" + + gomysql "github.com/go-sql-driver/mysql" + "github.com/uber/submitqueue/core/errs" +) + +// Classifier implements errs.Classifier for MySQL-related errors. It recognises: +// +// - *gomysql.MySQLError values, dispatching on the server-reported error +// number to one of InfraRetryable (transient server / lock / connection +// issues) or Infra (everything else the driver reports — schema bugs, +// constraint violations, programmer errors). +// - driver.ErrBadConn and sql.ErrConnDone — pooled connection failures that +// are safe to retry on a fresh connection. +// - sql.ErrTxDone — a programming error, non-retryable. +// - net.Error values (including *net.OpError, *net.DNSError) — transient +// network failures while talking to the server, retryable. +// +// Anything else returns errs.Unknown so the surrounding errs.Classify walker +// can keep looking down the unwrap chain. +// +// The classifier never returns errs.User. Constraint violations and similar +// codes that a caller might want to surface as user errors must be wrapped +// explicitly with errs.NewUserError at the controller — only the controller +// knows whether a duplicate key, FK violation, etc. reflects bad input from +// the user or an internal invariant being broken. The framework-wrap check +// in errs.Classify short-circuits before this classifier runs, so an +// explicit controller wrap always wins. +// +// The classifier is stateless; this package-level singleton is the canonical +// handle. Pass it into errs.Classify (typically as a vararg to consumer.New). +var Classifier errs.Classifier = classifier{} + +type classifier struct{} + +// Classify dispatches a single error node to one of the recognisers above. +// See the Classifier var docs for the full list. +func (classifier) Classify(err error) errs.Verdict { + // MySQL server-reported errors. We do the type assertion directly on the + // current node (not errors.As) so an outer framework wrap (e.g. an explicit + // NewUserError from the controller) keeps winning — errs.Classify owns the + // chain walk and stops at any *userError / *infraError it sees first. + if me, ok := err.(*gomysql.MySQLError); ok { + return classifyMySQLNumber(me.Number) + } + + // Pooled-connection lifecycle errors from database/sql. + switch err { + case driver.ErrBadConn, sql.ErrConnDone: + return errs.InfraRetryable + case sql.ErrTxDone: + // Using a transaction after Commit/Rollback is a programmer bug, not + // a transient failure — retrying will not help. + return errs.Infra + } + + // Any network-layer failure while talking to the server is transient by + // nature: connection reset, timeout, DNS hiccup, etc. The net.Error + // interface covers *net.OpError, *net.DNSError, and other concrete types. + if _, ok := err.(net.Error); ok { + return errs.InfraRetryable + } + + return errs.Unknown +} + +// classifyMySQLNumber maps a server-reported MySQL error number to a Verdict. +// Codes not listed here return Unknown so the chain walker can continue. +// +// This classifier never returns User. Constraint violations (duplicate key, +// FK violation, check constraint) are mapped to Infra because only the +// calling controller knows whether the violation reflects bad user input or +// an internal invariant being broken — controllers must wrap with +// errs.NewUserError explicitly when the former applies. +// +// References: +// - https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html +// - https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html +func classifyMySQLNumber(number uint16) errs.Verdict { + switch number { + // --- Transient: server load / locking / connection-level failures. --- + case + 1040, // ER_CON_COUNT_ERROR — too many connections + 1042, // ER_BAD_HOST_ERROR — can't get hostname (DNS / lookup) + 1043, // ER_HANDSHAKE_ERROR + 1053, // ER_SERVER_SHUTDOWN + 1077, // ER_NORMAL_SHUTDOWN + 1078, // ER_GOT_SIGNAL + 1079, // ER_SHUTDOWN_COMPLETE + 1080, // ER_FORCING_CLOSE + 1129, // ER_HOST_IS_BLOCKED + 1130, // ER_HOST_NOT_PRIVILEGED — usually a transient ACL replication lag + 1158, // ER_NET_READ_ERROR_FROM_PIPE + 1159, // ER_NET_READ_INTERRUPTED + 1160, // ER_NET_ERROR_ON_WRITE + 1161, // ER_NET_WRITE_INTERRUPTED + 1205, // ER_LOCK_WAIT_TIMEOUT + 1213, // ER_LOCK_DEADLOCK + 1290, // ER_OPTION_PREVENTS_STATEMENT — read-only mode (failover in progress) + 1317, // ER_QUERY_INTERRUPTED + 1836, // ER_READ_ONLY_MODE + 2002, // CR_CONNECTION_ERROR — can't connect via socket + 2003, // CR_CONN_HOST_ERROR — can't connect via TCP + 2004, // CR_IPSOCK_ERROR — can't create TCP/IP socket + 2005, // CR_UNKNOWN_HOST + 2006, // CR_SERVER_GONE_ERROR + 2013, // CR_SERVER_LOST + 2055: // CR_SERVER_LOST_EXTENDED + return errs.InfraRetryable + + // --- Non-retryable: schema bugs, constraint violations, programmer errors. + // Controllers wrap with errs.NewUserError when a constraint violation + // reflects bad user input; otherwise these stay infra. --- + case + 1022, // ER_DUP_KEY (older duplicate-key code) + 1044, // ER_DBACCESS_DENIED_ERROR — auth/grants misconfiguration + 1045, // ER_ACCESS_DENIED_ERROR + 1049, // ER_BAD_DB_ERROR — unknown database + 1054, // ER_BAD_FIELD_ERROR — unknown column + 1062, // ER_DUP_ENTRY — duplicate value for a unique key + 1064, // ER_PARSE_ERROR — SQL syntax + 1146, // ER_NO_SUCH_TABLE + 1149, // ER_SYNTAX_ERROR + 1169, // ER_DUP_UNIQUE + 1216, // ER_NO_REFERENCED_ROW — FK constraint fails (insert/update child) + 1217, // ER_ROW_IS_REFERENCED — FK constraint fails (delete parent) + 1364, // ER_NO_DEFAULT_FOR_FIELD + 1366, // ER_TRUNCATED_WRONG_VALUE_FOR_FIELD — encoding / type mismatch + 1406, // ER_DATA_TOO_LONG + 1451, // ER_ROW_IS_REFERENCED_2 + 1452, // ER_NO_REFERENCED_ROW_2 + 1557, // ER_FOREIGN_DUPLICATE_KEY + 3819: // ER_CHECK_CONSTRAINT_VIOLATED + return errs.Infra + } + + // Unrecognised number — defer to the surrounding chain walk and/or default + // non-retryable infra treatment in the caller. + return errs.Unknown +} diff --git a/core/errs/mysql/mysql_test.go b/core/errs/mysql/mysql_test.go new file mode 100644 index 00000000..788c5f57 --- /dev/null +++ b/core/errs/mysql/mysql_test.go @@ -0,0 +1,229 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "database/sql" + "database/sql/driver" + "errors" + "fmt" + "net" + "testing" + + gomysql "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "github.com/uber/submitqueue/core/errs" +) + +func TestClassifier_MySQLErrorNumbers(t *testing.T) { + tests := []struct { + name string + number uint16 + want errs.Verdict + }{ + // Constraint violations -> Infra. The classifier never returns User; + // controllers wrap with NewUserError explicitly when the violation + // reflects bad user input. + {"duplicate entry (1062)", 1062, errs.Infra}, + {"FK insert (1452)", 1452, errs.Infra}, + {"FK delete (1451)", 1451, errs.Infra}, + {"check constraint (3819)", 3819, errs.Infra}, + + // Transient server / lock / connection -> InfraRetryable. + {"deadlock (1213)", 1213, errs.InfraRetryable}, + {"lock wait timeout (1205)", 1205, errs.InfraRetryable}, + {"too many connections (1040)", 1040, errs.InfraRetryable}, + {"server shutdown (1053)", 1053, errs.InfraRetryable}, + {"read-only mode (1290)", 1290, errs.InfraRetryable}, + {"server gone (2006)", 2006, errs.InfraRetryable}, + {"server lost (2013)", 2013, errs.InfraRetryable}, + + // Programmer / schema bugs -> Infra. + {"unknown column (1054)", 1054, errs.Infra}, + {"syntax error (1064)", 1064, errs.Infra}, + {"no such table (1146)", 1146, errs.Infra}, + {"truncated value (1366)", 1366, errs.Infra}, + + // Unknown numbers -> Unknown so the chain walker keeps looking. + {"unrecognized number (9999)", 9999, errs.Unknown}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := &gomysql.MySQLError{Number: tt.number, Message: "test"} + assert.Equal(t, tt.want, Classifier.Classify(err)) + }) + } +} + +func TestClassifier_ConnectionLifecycle(t *testing.T) { + tests := []struct { + name string + err error + want errs.Verdict + }{ + {"driver bad conn", driver.ErrBadConn, errs.InfraRetryable}, + {"sql conn done", sql.ErrConnDone, errs.InfraRetryable}, + {"sql tx done", sql.ErrTxDone, errs.Infra}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, Classifier.Classify(tt.err)) + }) + } +} + +func TestClassifier_NetErrors(t *testing.T) { + tests := []struct { + name string + err error + }{ + { + name: "net.OpError", + err: &net.OpError{ + Op: "dial", + Net: "tcp", + Err: errors.New("connection refused"), + }, + }, + { + name: "net.DNSError", + err: &net.DNSError{Err: "no such host", Name: "mysql"}, + }, + { + name: "timeoutError", + err: netTimeoutErr{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, errs.InfraRetryable, Classifier.Classify(tt.err)) + }) + } +} + +func TestClassifier_Unknown(t *testing.T) { + // A plain, unrecognised error must yield Unknown so the package-level + // errs.Classify walker can move on to the next node in the chain rather + // than locking in a verdict. + assert.Equal(t, errs.Unknown, Classifier.Classify(errors.New("anything"))) + assert.Equal(t, errs.Unknown, Classifier.Classify(nil)) +} + +func TestClassifier_AppliedViaClassify(t *testing.T) { + // End-to-end behavior: the consumer's call site is + // `err = errs.Classify(err, mysqlerrs.Classifier)`, followed by + // errs.IsUserError / IsRetryable / IsDependencyError type checks. These + // tests pin that contract — given an err from a controller, the returned + // chain answers the right question. + + t.Run("mysql connection error surfaces as retryable infra", func(t *testing.T) { + // Simulates the queue or storage layer returning a wrapped net.OpError + // for a failed connection to MySQL. + netErr := &net.OpError{Op: "read", Net: "tcp", Err: errors.New("reset")} + wrapped := fmt.Errorf("publish: %w", netErr) + + out := errs.Classify(wrapped, Classifier) + assert.True(t, errs.IsRetryable(out)) + assert.False(t, errs.IsUserError(out)) + }) + + t.Run("mysql deadlock surfaces as retryable infra", func(t *testing.T) { + dl := &gomysql.MySQLError{Number: 1213, Message: "deadlock"} + wrapped := fmt.Errorf("update: %w", dl) + + out := errs.Classify(wrapped, Classifier) + assert.True(t, errs.IsRetryable(out)) + }) + + t.Run("mysql schema error is non-retryable infra", func(t *testing.T) { + se := &gomysql.MySQLError{Number: 1054, Message: "unknown column"} + wrapped := fmt.Errorf("select: %w", se) + + // Verdict Infra means "non-retryable infra" — the default for an + // unwrapped chain — so Classify leaves the chain alone. + out := errs.Classify(wrapped, Classifier) + assert.False(t, errs.IsRetryable(out)) + assert.False(t, errs.IsUserError(out)) + assert.Same(t, wrapped, out, "Infra verdict should not add a wrap") + }) + + t.Run("mysql duplicate key is non-retryable infra", func(t *testing.T) { + // The classifier never returns User — a controller that wants to surface + // a duplicate-key as a user error must wrap with errs.NewUserError + // explicitly (see the controller-override test below). + dup := &gomysql.MySQLError{Number: 1062, Message: "duplicate"} + out := errs.Classify(dup, Classifier) + assert.False(t, errs.IsRetryable(out)) + assert.False(t, errs.IsUserError(out)) + assert.Same(t, dup, out, "Infra verdict should not add a wrap") + }) + + t.Run("controller User wrap turns duplicate key into a user error", func(t *testing.T) { + dup := &gomysql.MySQLError{Number: 1062, Message: "duplicate"} + err := errs.NewUserError(fmt.Errorf("create: %w", dup)) + + out := errs.Classify(err, Classifier) + assert.Same(t, err, out) + assert.True(t, errs.IsUserError(out)) + assert.False(t, errs.IsRetryable(out)) + }) + + t.Run("controller-level User wrap beats deeper mysql classification", func(t *testing.T) { + // Even though the underlying cause is a transient mysql deadlock, the + // outermost frame is an explicit NewUserError. Classify sees a framework + // wrap already in the chain and returns the chain untouched. + deadlock := &gomysql.MySQLError{Number: 1213, Message: "deadlock"} + err := errs.NewUserError(fmt.Errorf("conflict: %w", deadlock)) + + out := errs.Classify(err, Classifier) + assert.Same(t, err, out) + assert.True(t, errs.IsUserError(out)) + assert.False(t, errs.IsRetryable(out)) + }) + + t.Run("controller-level Retryable wrap beats deeper mysql schema verdict", func(t *testing.T) { + // Inverse: a schema error would normally be non-retryable infra, but the + // controller chose to mark it retryable anyway. The outer framework wrap + // is already in the chain, so Classify is a no-op. + schemaErr := &gomysql.MySQLError{Number: 1054, Message: "unknown column"} + err := errs.NewRetryableError(fmt.Errorf("query: %w", schemaErr)) + + out := errs.Classify(err, Classifier) + assert.Same(t, err, out) + assert.True(t, errs.IsRetryable(out)) + }) + + t.Run("unwrapped non-mysql error stays unclassified", func(t *testing.T) { + raw := errors.New("something else") + out := errs.Classify(raw, Classifier) + assert.Same(t, raw, out) + assert.False(t, errs.IsRetryable(out)) + assert.False(t, errs.IsUserError(out)) + }) +} + +// netTimeoutErr satisfies net.Error so we can verify the net.Error branch of +// Classifier.Classify without having to provoke a real network timeout. +type netTimeoutErr struct{} + +func (netTimeoutErr) Error() string { return "i/o timeout" } +func (netTimeoutErr) Timeout() bool { return true } +func (netTimeoutErr) Temporary() bool { return true } + +// Sanity check: net.Error is the interface we rely on. +var _ net.Error = netTimeoutErr{} diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index f7538e42..9912c570 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -12,6 +12,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//core/consumer", + "//core/errs/generic", + "//core/errs/mysql", "//core/httpclient", "//entity", "//extension/buildrunner", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 48ec4994..53fc315d 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -31,6 +31,8 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" "github.com/uber/submitqueue/core/httpclient" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/buildrunner" @@ -197,8 +199,14 @@ func run() error { return fmt.Errorf("failed to create topic registry: %w", err) } - // Create consumer - c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry) + // Create consumer. + c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + genericerrs.Classifier, + // Storage (extension/storage/mysql) and queue (extension/queue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + mysqlerrs.Classifier, + ) // Create merge checker mc, err := newMergeChecker(logger, scope)