| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | package internal |
| |
|
| | import ( |
| | "encoding/json" |
| | "fmt" |
| | "log/slog" |
| | "mime/multipart" |
| | "net/http" |
| | "net/url" |
| | "os" |
| | "path/filepath" |
| | "regexp" |
| | "strings" |
| | "time" |
| |
|
| | rtctokenbuilder "github.com/AgoraIO/Tools/DynamicKey/AgoraDynamicKey/go/src/rtctokenbuilder2" |
| | "github.com/gin-gonic/gin" |
| | "github.com/gin-gonic/gin/binding" |
| | "github.com/gogf/gf/crypto/gmd5" |
| | ) |
| |
|
| | type HttpServer struct { |
| | config *HttpServerConfig |
| | } |
| |
|
| | type HttpServerConfig struct { |
| | AppId string |
| | AppCertificate string |
| | LogPath string |
| | Log2Stdout bool |
| | PropertyJsonFile string |
| | Port string |
| | WorkersMax int |
| | WorkerQuitTimeoutSeconds int |
| | } |
| |
|
| | type PingReq struct { |
| | RequestId string `json:"request_id,omitempty"` |
| | ChannelName string `json:"channel_name,omitempty"` |
| | } |
| |
|
| | type StartReq struct { |
| | RequestId string `json:"request_id,omitempty"` |
| | ChannelName string `json:"channel_name,omitempty"` |
| | GraphName string `json:"graph_name,omitempty"` |
| | RemoteStreamId uint32 `json:"user_uid,omitempty"` |
| | BotStreamId uint32 `json:"bot_uid,omitempty"` |
| | Token string `json:"token,omitempty"` |
| | WorkerHttpServerPort int32 `json:"worker_http_server_port,omitempty"` |
| | Properties map[string]map[string]interface{} `json:"properties,omitempty"` |
| | QuitTimeoutSeconds int `json:"timeout,omitempty"` |
| | } |
| |
|
| | type StopReq struct { |
| | RequestId string `json:"request_id,omitempty"` |
| | ChannelName string `json:"channel_name,omitempty"` |
| | } |
| |
|
| | type GenerateTokenReq struct { |
| | RequestId string `json:"request_id,omitempty"` |
| | ChannelName string `json:"channel_name,omitempty"` |
| | Uid uint32 `json:"uid,omitempty"` |
| | } |
| |
|
| | type VectorDocumentUpdate struct { |
| | RequestId string `json:"request_id,omitempty"` |
| | ChannelName string `json:"channel_name,omitempty"` |
| | Collection string `json:"collection,omitempty"` |
| | FileName string `json:"file_name,omitempty"` |
| | } |
| |
|
| | type VectorDocumentUpload struct { |
| | RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` |
| | ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` |
| | File *multipart.FileHeader `form:"file" binding:"required"` |
| | } |
| |
|
| | func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer { |
| | return &HttpServer{ |
| | config: httpServerConfig, |
| | } |
| | } |
| |
|
| | func (s *HttpServer) handlerHealth(c *gin.Context) { |
| | slog.Debug("handlerHealth", logTag) |
| | s.output(c, codeOk, nil) |
| | } |
| |
|
| | func (s *HttpServer) handlerList(c *gin.Context) { |
| | slog.Info("handlerList start", logTag) |
| | |
| | filtered := make([]map[string]interface{}, len(workers.Keys())) |
| | for _, channelName := range workers.Keys() { |
| | worker := workers.Get(channelName).(*Worker) |
| | workerJson := map[string]interface{}{ |
| | "channelName": worker.ChannelName, |
| | "createTs": worker.CreateTs, |
| | } |
| | filtered = append(filtered, workerJson) |
| | } |
| | slog.Info("handlerList end", logTag) |
| | s.output(c, codeSuccess, filtered) |
| | } |
| |
|
| | func (s *HttpServer) handleGraphs(c *gin.Context) { |
| | |
| | |
| | content, err := os.ReadFile(PropertyJsonFile) |
| | if err != nil { |
| | slog.Error("failed to read property.json file", "err", err, logTag) |
| | s.output(c, codeErrReadFileFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | var propertyJson map[string]interface{} |
| | err = json.Unmarshal(content, &propertyJson) |
| | if err != nil { |
| | slog.Error("failed to parse property.json file", "err", err, logTag) |
| | s.output(c, codeErrParseJsonFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | tenSection, ok := propertyJson["_ten"].(map[string]interface{}) |
| | if !ok { |
| | slog.Error("Invalid format: _ten section missing", logTag) |
| | s.output(c, codeErrParseJsonFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | predefinedGraphs, ok := tenSection["predefined_graphs"].([]interface{}) |
| | if !ok { |
| | slog.Error("Invalid format: predefined_graphs missing or not an array", logTag) |
| | s.output(c, codeErrParseJsonFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | |
| | var graphs []map[string]interface{} |
| | for _, graph := range predefinedGraphs { |
| | graphMap, ok := graph.(map[string]interface{}) |
| | if ok { |
| | graphs = append(graphs, map[string]interface{}{ |
| | "name": graphMap["name"], |
| | "auto_start": graphMap["auto_start"], |
| | }) |
| | } |
| | } |
| |
|
| | s.output(c, codeSuccess, graphs) |
| | } |
| |
|
| | func (s *HttpServer) handleAddonDefaultProperties(c *gin.Context) { |
| | |
| | baseDir := "./agents/ten_packages/extension" |
| |
|
| | |
| | entries, err := os.ReadDir(baseDir) |
| | if err != nil { |
| | slog.Error("failed to read extension directory", "err", err, logTag) |
| | s.output(c, codeErrReadDirectoryFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | |
| | var addons []map[string]interface{} |
| | for _, entry := range entries { |
| | if entry.IsDir() { |
| | addonName := entry.Name() |
| | propertyFilePath := fmt.Sprintf("%s/%s/property.json", baseDir, addonName) |
| | content, err := os.ReadFile(propertyFilePath) |
| | if err != nil { |
| | slog.Warn("failed to read property file", "addon", addonName, "err", err, logTag) |
| | continue |
| | } |
| |
|
| | var properties map[string]interface{} |
| | err = json.Unmarshal(content, &properties) |
| | if err != nil { |
| | slog.Warn("failed to parse property file", "addon", addonName, "err", err, logTag) |
| | continue |
| | } |
| |
|
| | addons = append(addons, map[string]interface{}{ |
| | "addon": addonName, |
| | "property": properties, |
| | }) |
| | } |
| | } |
| |
|
| | s.output(c, codeSuccess, addons) |
| | } |
| |
|
| | func (s *HttpServer) handlerPing(c *gin.Context) { |
| | var req PingReq |
| |
|
| | if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { |
| | slog.Error("handlerPing params invalid", "err", err, logTag) |
| | s.output(c, codeErrParamsInvalid, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerPing start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| |
|
| | if strings.TrimSpace(req.ChannelName) == "" { |
| | slog.Error("handlerPing channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelEmpty, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if !workers.Contains(req.ChannelName) { |
| | slog.Error("handlerPing channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | |
| | worker := workers.Get(req.ChannelName).(*Worker) |
| | worker.UpdateTs = time.Now().Unix() |
| |
|
| | slog.Info("handlerPing end", "worker", worker, "requestId", req.RequestId, logTag) |
| | s.output(c, codeSuccess, nil) |
| | } |
| |
|
| | func (s *HttpServer) handlerStart(c *gin.Context) { |
| | workersRunning := workers.Size() |
| |
|
| | slog.Info("handlerStart start", "workersRunning", workersRunning, logTag) |
| |
|
| | var req StartReq |
| |
|
| | if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { |
| | slog.Error("handlerStart params invalid", "err", err, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrParamsInvalid, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if strings.TrimSpace(req.ChannelName) == "" { |
| | slog.Error("handlerStart channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelEmpty, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if workersRunning >= s.config.WorkersMax { |
| | slog.Error("handlerStart workers exceed", "workersRunning", workersRunning, "WorkersMax", s.config.WorkersMax, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrWorkersLimit, http.StatusTooManyRequests) |
| | return |
| | } |
| |
|
| | if workers.Contains(req.ChannelName) { |
| | slog.Error("handlerStart channel existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelExisted, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | |
| | if strings.Contains(req.GraphName, "gemini") { |
| | |
| | graphNameCount := 0 |
| | for _, channelName := range workers.Keys() { |
| | worker := workers.Get(channelName).(*Worker) |
| | if worker.GraphName == req.GraphName { |
| | graphNameCount++ |
| | } |
| | } |
| |
|
| | |
| | if graphNameCount >= MAX_GEMINI_WORKER_COUNT { |
| | slog.Error("handlerStart graphName workers exceed limit", "graphName", req.GraphName, "graphNameCount", graphNameCount, logTag) |
| | s.output(c, codeErrWorkersLimit, http.StatusTooManyRequests) |
| | return |
| | } |
| | } |
| |
|
| | req.WorkerHttpServerPort = getHttpServerPort() |
| | propertyJsonFile, logFile, err := s.processProperty(&req) |
| | if err != nil { |
| | slog.Error("handlerStart process property", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrProcessPropertyFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | worker := newWorker(req.ChannelName, logFile, s.config.Log2Stdout, propertyJsonFile) |
| | worker.HttpServerPort = req.WorkerHttpServerPort |
| | worker.GraphName = req.GraphName |
| |
|
| | if req.QuitTimeoutSeconds > 0 { |
| | worker.QuitTimeoutSeconds = req.QuitTimeoutSeconds |
| | } else { |
| | worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds |
| | } |
| |
|
| | if err := worker.start(&req); err != nil { |
| | slog.Error("handlerStart start worker failed", "err", err, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrStartWorkerFailed, http.StatusInternalServerError) |
| | return |
| | } |
| | workers.SetIfNotExist(req.ChannelName, worker) |
| |
|
| | slog.Info("handlerStart end", "workersRunning", workers.Size(), "worker", worker, "requestId", req.RequestId, logTag) |
| | s.output(c, codeSuccess, nil) |
| | } |
| |
|
| | func (s *HttpServer) handlerStop(c *gin.Context) { |
| | var req StopReq |
| |
|
| | if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { |
| | slog.Error("handlerStop params invalid", "err", err, logTag) |
| | s.output(c, codeErrParamsInvalid, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerStop start", "req", req, logTag) |
| |
|
| | if strings.TrimSpace(req.ChannelName) == "" { |
| | slog.Error("handlerStop channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelEmpty, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if !workers.Contains(req.ChannelName) { |
| | slog.Error("handlerStop channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | worker := workers.Get(req.ChannelName).(*Worker) |
| | if err := worker.stop(req.RequestId, req.ChannelName); err != nil { |
| | slog.Error("handlerStop kill app failed", "err", err, "worker", workers.Get(req.ChannelName), "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrStopWorkerFailed, http.StatusInternalServerError) |
| | return |
| | } |
| |
|
| | slog.Info("handlerStop end", "requestId", req.RequestId, logTag) |
| | s.output(c, codeSuccess, nil) |
| | } |
| |
|
| | func (s *HttpServer) handlerGenerateToken(c *gin.Context) { |
| | var req GenerateTokenReq |
| |
|
| | if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { |
| | slog.Error("handlerGenerateToken params invalid", "err", err, logTag) |
| | s.output(c, codeErrParamsInvalid, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerGenerateToken start", "req", req, logTag) |
| |
|
| | if strings.TrimSpace(req.ChannelName) == "" { |
| | slog.Error("handlerGenerateToken channel empty", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelEmpty, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if s.config.AppCertificate == "" { |
| | s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": s.config.AppId, "channel_name": req.ChannelName, "uid": req.Uid}) |
| | return |
| | } |
| |
|
| | token, err := rtctokenbuilder.BuildTokenWithRtm(s.config.AppId, s.config.AppCertificate, req.ChannelName, fmt.Sprintf("%d", req.Uid), rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, tokenExpirationInSeconds) |
| | if err != nil { |
| | slog.Error("handlerGenerateToken generate token failed", "err", err, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrGenerateTokenFailed, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerGenerateToken end", "requestId", req.RequestId, logTag) |
| | s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) |
| | } |
| |
|
| | func (s *HttpServer) handlerVectorDocumentPresetList(c *gin.Context) { |
| | presetList := []map[string]any{} |
| | vectorDocumentPresetList := os.Getenv("VECTOR_DOCUMENT_PRESET_LIST") |
| |
|
| | if vectorDocumentPresetList != "" { |
| | err := json.Unmarshal([]byte(vectorDocumentPresetList), &presetList) |
| | if err != nil { |
| | slog.Error("handlerVectorDocumentPresetList parse json failed", "err", err, logTag) |
| | s.output(c, codeErrParseJsonFailed, http.StatusBadRequest) |
| | return |
| | } |
| | } |
| |
|
| | s.output(c, codeSuccess, presetList) |
| | } |
| |
|
| | func (s *HttpServer) handlerVectorDocumentUpdate(c *gin.Context) { |
| | var req VectorDocumentUpdate |
| |
|
| | if err := c.ShouldBind(&req); err != nil { |
| | slog.Error("handlerVectorDocumentUpdate params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrParamsInvalid, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if !workers.Contains(req.ChannelName) { |
| | slog.Error("handlerVectorDocumentUpdate channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerVectorDocumentUpdate start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| |
|
| | |
| | worker := workers.Get(req.ChannelName).(*Worker) |
| | err := worker.update(&WorkerUpdateReq{ |
| | RequestId: req.RequestId, |
| | ChannelName: req.ChannelName, |
| | Collection: req.Collection, |
| | FileName: req.FileName, |
| | Ten: &WorkerUpdateReqTen{ |
| | Name: "update_querying_collection", |
| | Type: "cmd", |
| | }, |
| | }) |
| | if err != nil { |
| | slog.Error("handlerVectorDocumentUpdate update worker failed", "err", err, "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerVectorDocumentUpdate end", "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName}) |
| | } |
| |
|
| | func (s *HttpServer) handlerVectorDocumentUpload(c *gin.Context) { |
| | var req VectorDocumentUpload |
| |
|
| | if err := c.ShouldBind(&req); err != nil { |
| | slog.Error("handlerVectorDocumentUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrParamsInvalid, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | if !workers.Contains(req.ChannelName) { |
| | slog.Error("handlerVectorDocumentUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerVectorDocumentUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| |
|
| | file := req.File |
| | uploadFile := fmt.Sprintf("%s/file-%s-%d%s", s.config.LogPath, gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano(), filepath.Ext(file.Filename)) |
| | if err := c.SaveUploadedFile(file, uploadFile); err != nil { |
| | slog.Error("handlerVectorDocumentUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrSaveFileFailed, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | |
| | collection := fmt.Sprintf("a%s_%d", gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano()) |
| | fileName := filepath.Base(file.Filename) |
| |
|
| | |
| | worker := workers.Get(req.ChannelName).(*Worker) |
| | err := worker.update(&WorkerUpdateReq{ |
| | RequestId: req.RequestId, |
| | ChannelName: req.ChannelName, |
| | Collection: collection, |
| | FileName: fileName, |
| | Path: uploadFile, |
| | Ten: &WorkerUpdateReqTen{ |
| | Name: "file_chunk", |
| | Type: "cmd", |
| | }, |
| | }) |
| | if err != nil { |
| | slog.Error("handlerVectorDocumentUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| | s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest) |
| | return |
| | } |
| |
|
| | slog.Info("handlerVectorDocumentUpload end", "channelName", req.ChannelName, "collection", collection, "uploadFile", uploadFile, "requestId", req.RequestId, logTag) |
| | s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "collection": collection, "file_name": fileName}) |
| | } |
| |
|
| | func (s *HttpServer) output(c *gin.Context, code *Code, data any, httpStatus ...int) { |
| | if len(httpStatus) == 0 { |
| | httpStatus = append(httpStatus, http.StatusOK) |
| | } |
| |
|
| | c.JSON(httpStatus[0], gin.H{"code": code.code, "msg": code.msg, "data": data}) |
| | } |
| |
|
| | func (s *HttpServer) processProperty(req *StartReq) (propertyJsonFile string, logFile string, err error) { |
| | content, err := os.ReadFile(PropertyJsonFile) |
| | if err != nil { |
| | slog.Error("handlerStart read property.json failed", "err", err, "propertyJsonFile", propertyJsonFile, "requestId", req.RequestId, logTag) |
| | return |
| | } |
| |
|
| | |
| | var propertyJson map[string]interface{} |
| | err = json.Unmarshal(content, &propertyJson) |
| | if err != nil { |
| | slog.Error("handlerStart unmarshal property.json failed", "err", err, "requestId", req.RequestId, logTag) |
| | return |
| | } |
| |
|
| | |
| | graphName := req.GraphName |
| | if graphName == "" { |
| | slog.Error("graph_name is mandatory", "requestId", req.RequestId, logTag) |
| | return |
| | } |
| |
|
| | |
| | req.Token = s.config.AppId |
| | if s.config.AppCertificate != "" { |
| | |
| | req.Token, err = rtctokenbuilder.BuildTokenWithRtm(s.config.AppId, s.config.AppCertificate, req.ChannelName, fmt.Sprintf("%d", 0), rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, tokenExpirationInSeconds) |
| | if err != nil { |
| | slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) |
| | return |
| | } |
| | } |
| |
|
| | |
| | tenSection, ok := propertyJson["_ten"].(map[string]interface{}) |
| | if !ok { |
| | slog.Error("Invalid format: _ten section missing", "requestId", req.RequestId, logTag) |
| | return |
| | } |
| |
|
| | predefinedGraphs, ok := tenSection["predefined_graphs"].([]interface{}) |
| | if !ok { |
| | slog.Error("Invalid format: predefined_graphs missing or not an array", "requestId", req.RequestId, logTag) |
| | return |
| | } |
| |
|
| | |
| | var newGraphs []interface{} |
| | for _, graph := range predefinedGraphs { |
| | graphMap, ok := graph.(map[string]interface{}) |
| | if ok && graphMap["name"] == graphName { |
| | newGraphs = append(newGraphs, graph) |
| | } |
| | } |
| |
|
| | if len(newGraphs) == 0 { |
| | slog.Error("handlerStart graph not found", "graph", graphName, "requestId", req.RequestId, logTag) |
| | err = fmt.Errorf("graph not found") |
| | return |
| | } |
| |
|
| | |
| | tenSection["predefined_graphs"] = newGraphs |
| |
|
| | |
| | for _, graph := range newGraphs { |
| | graphMap, _ := graph.(map[string]interface{}) |
| | graphMap["auto_start"] = true |
| | } |
| |
|
| | |
| | for extensionName, props := range req.Properties { |
| | if extensionName != "" { |
| | for prop, val := range props { |
| | |
| | for _, graph := range newGraphs { |
| | graphMap, _ := graph.(map[string]interface{}) |
| | nodes, _ := graphMap["nodes"].([]interface{}) |
| | for _, node := range nodes { |
| | nodeMap, _ := node.(map[string]interface{}) |
| | if nodeMap["name"] == extensionName { |
| | properties := nodeMap["property"].(map[string]interface{}) |
| | properties[prop] = val |
| | } |
| | } |
| | } |
| | } |
| | } |
| | } |
| |
|
| | |
| | for key, props := range startPropMap { |
| | val := getFieldValue(req, key) |
| | if val != "" { |
| | for _, prop := range props { |
| | |
| | for _, graph := range newGraphs { |
| | graphMap, _ := graph.(map[string]interface{}) |
| | nodes, _ := graphMap["nodes"].([]interface{}) |
| | for _, node := range nodes { |
| | nodeMap, _ := node.(map[string]interface{}) |
| | if nodeMap["name"] == prop.ExtensionName { |
| | properties := nodeMap["property"].(map[string]interface{}) |
| | properties[prop.Property] = val |
| | } |
| | } |
| | } |
| | } |
| | } |
| | } |
| |
|
| | |
| | envPattern := regexp.MustCompile(`\${env:([^}|]+)}`) |
| | for _, graph := range newGraphs { |
| | graphMap, _ := graph.(map[string]interface{}) |
| | nodes, ok := graphMap["nodes"].([]interface{}) |
| | if !ok { |
| | slog.Info("No nodes section in the graph", "graph", graphName, "requestId", req.RequestId, logTag) |
| | continue |
| | } |
| | for _, node := range nodes { |
| | nodeMap, _ := node.(map[string]interface{}) |
| | properties, ok := nodeMap["property"].(map[string]interface{}) |
| | if !ok { |
| | |
| | continue |
| | } |
| | for key, val := range properties { |
| | strVal, ok := val.(string) |
| | if !ok { |
| | continue |
| | } |
| | |
| | |
| |
|
| | matches := envPattern.FindAllStringSubmatch(strVal, -1) |
| | |
| | |
| | |
| |
|
| | for _, match := range matches { |
| | if len(match) < 2 { |
| | continue |
| | } |
| | variable := match[1] |
| | exists := os.Getenv(variable) != "" |
| | |
| | if !exists { |
| | slog.Error("Environment variable not found", "variable", variable, "property", key, "requestId", req.RequestId, logTag) |
| | } |
| | } |
| | } |
| |
|
| | } |
| | } |
| |
|
| | |
| | modifiedPropertyJson, err := json.MarshalIndent(propertyJson, "", " ") |
| | if err != nil { |
| | slog.Error("handlerStart marshal modified JSON failed", "err", err, "requestId", req.RequestId, logTag) |
| | return |
| | } |
| |
|
| | ts := time.Now().Format("20060102_150405_000") |
| | propertyJsonFile = fmt.Sprintf("%s/property-%s-%s.json", s.config.LogPath, url.QueryEscape(req.ChannelName), ts) |
| | logFile = fmt.Sprintf("%s/app-%s-%s.log", s.config.LogPath, url.QueryEscape(req.ChannelName), ts) |
| | os.WriteFile(propertyJsonFile, []byte(modifiedPropertyJson), 0644) |
| |
|
| | return |
| | } |
| |
|
| | func (s *HttpServer) Start() { |
| | r := gin.Default() |
| | r.Use(corsMiddleware()) |
| |
|
| | r.GET("/", s.handlerHealth) |
| | r.GET("/health", s.handlerHealth) |
| | r.GET("/list", s.handlerList) |
| | r.POST("/start", s.handlerStart) |
| | r.POST("/stop", s.handlerStop) |
| | r.POST("/ping", s.handlerPing) |
| | r.GET("/graphs", s.handleGraphs) |
| | r.GET("/dev-tmp/addons/default-properties", s.handleAddonDefaultProperties) |
| | r.POST("/token/generate", s.handlerGenerateToken) |
| | r.GET("/vector/document/preset/list", s.handlerVectorDocumentPresetList) |
| | r.POST("/vector/document/update", s.handlerVectorDocumentUpdate) |
| | r.POST("/vector/document/upload", s.handlerVectorDocumentUpload) |
| |
|
| | slog.Info("server start", "port", s.config.Port, logTag) |
| |
|
| | go timeoutWorkers() |
| | r.Run(fmt.Sprintf(":%s", s.config.Port)) |
| | } |
| |
|