tjtjtjのメモ

自分のためのメモです

go-kit を試す #1

ここを参考に go-kit を試した。http で受けたメッセージを amqp に publish する。

tech.orylab.com

サービスのインタフェース定義

サービスはインタフェースで定義する

type PublishService interface {
    Publish(message string) (string, error)
}

サービスを実装

type publishService struct{}

func (publishService) Publish(s string) (string, error) {
    if s == "" {
        return "", errors.New("empty")
    }
    err := pub("testqueue", s)
    if err != nil {
        return "", err
    }
    return "Done", nil
}

func pub(qname string, message string) error {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Print("failed Dial")
        return err
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Print("failed Channel")
        return err
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        qname, // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        log.Print("failed QueueDeclare")
        return err
    }

    err = ch.Publish(
        "",     // exchange
        q.Name, //q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })
    if err != nil {
        log.Print("failed Publish")
        return err
    }

    log.Printf("pulished %s : %s", qname, message)
    return nil
}

メッセージ関係

type publishRequest struct {
    S string `json:"s"`
}

type publishResponse struct {
    V   string `json:"v"`
    Err string `json:"err,omitempty"`
}

func decodePublishRequest(_ context.Context, r *http.Request) (interface{}, error) {
    var request publishRequest
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        return nil, err
    }
    return request, nil
}

func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
    return json.NewEncoder(w).Encode(response)
}

エンドポイント

func makePublishEndpoint(svc PublishService) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(publishRequest)
        v, err := svc.Publish(req.S)
        if err != nil {
            return publishResponse{v, err.Error()}, nil
        }
        return publishResponse{v, ""}, nil
    }
}

main()

func main() {
    svc := publishService{}
    publishHandler := httptransport.NewServer(
        makePublishEndpoint(svc),
        decodePublishRequest,
        encodeResponse,
    )
    http.Handle("/publish", publishHandler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

curl

$ curl -s -X POST -d'{"s":"hello, world1"}' localhost:8080/publish
{"v":"Done"}

$ curl -s -X POST -d'{"s":"hello, world2"}' localhost:8080/publish
{"v":"Done"}

log

$go run main.go
2020/02/20 20:29:06 pulished testqueue : hello, world1
2020/02/20 20:29:08 pulished testqueue : hello, world2

ここらへん

後で試す

https://github.com/go-kit/kit/tree/master/transport/amqp