796 lines
35 KiB
Go
796 lines
35 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
_ "github.com/go-sql-driver/mysql"
|
||
)
|
||
|
||
func main() {
|
||
dsn := "root:123456@tcp(localhost:3306)/copy_play?charset=utf8mb4&parseTime=True&loc=Local"
|
||
|
||
// 使用 sql.Open() 打开数据库连接
|
||
db, err := sql.Open("mysql", dsn)
|
||
if err != nil {
|
||
log.Fatalf("failed to open database: %v", err)
|
||
}
|
||
defer db.Close() // 养成好习惯,确保连接在函数结束时关闭
|
||
|
||
// 验证数据库连接是否有效
|
||
if err := db.Ping(); err != nil {
|
||
log.Fatalf("failed to connect to database: %v", err)
|
||
}
|
||
fmt.Println("Successfully connected to the database!")
|
||
|
||
err = copy(db, context.Background(), 10900, "_")
|
||
if err != nil {
|
||
log.Fatalf("failed to copy data")
|
||
}
|
||
}
|
||
|
||
type Project struct {
|
||
ID uint
|
||
UserID uint
|
||
Name string
|
||
Env string
|
||
Status string
|
||
State []byte
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT,
|
||
//
|
||
// `experience_title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '经验体主题',
|
||
// `version` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '当前版本号',
|
||
// `session_id` bigint NOT NULL COMMENT '所属会话的唯一ID',
|
||
// `created_at` datetime NOT NULL COMMENT '创建时间',
|
||
// `updated_at` datetime NOT NULL COMMENT '更新时间',
|
||
// `deleted_at` datetime DEFAULT NULL COMMENT '删除时间(未删为null)',
|
||
// `publish_version` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '构建版本',
|
||
// `status` int DEFAULT NULL COMMENT '经验体状态 0:下线 1:已上线 2:构建中',
|
||
// `description` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '经验体描述',
|
||
type Experience struct {
|
||
ID uint
|
||
ExperienceTitle string
|
||
Version *string
|
||
PublishVersion *string
|
||
ExpStatus sql.NullInt64
|
||
Description *string
|
||
}
|
||
|
||
// `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '消息ID,主键',
|
||
// `user_id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '用户ID',
|
||
// `project_id` int NOT NULL COMMENT '项目ID',
|
||
// `chat_id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '聊天会话ID',
|
||
// `content` text COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息内容',
|
||
// `tree_type` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'message' COMMENT '树类型:message、taskpoint',
|
||
// `source` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息来源',
|
||
// `type` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息类型',
|
||
// `task_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '任务id',
|
||
// `parent_task_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '父任务ID',
|
||
// `task_status` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '任务点状态,tree_type为message时为空,taskpoint情况下为runing或finished,状态变更不会更新该字段只会添加消息',
|
||
// `event_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'mq消息event_type字段的记录',
|
||
// `metadata` json DEFAULT NULL COMMENT '消息元数据',
|
||
// `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||
// `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
|
||
type Message struct {
|
||
ID uint
|
||
MsgUserID string
|
||
ChatID string
|
||
Content string
|
||
TreeType string
|
||
Source string
|
||
MsgType string
|
||
TaskID *string
|
||
ParentTaskID *string
|
||
TaskStatus *string
|
||
EventType *string
|
||
Metadata *string
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT,
|
||
// `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '系统名称',
|
||
// `system_desc` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '系统描述',
|
||
// `created_at` datetime NOT NULL COMMENT '创建时间',
|
||
// `updated_at` datetime NOT NULL COMMENT '更新时间',
|
||
// `system_url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '系统域名',
|
||
// `experience_id` int DEFAULT NULL COMMENT '经验体id',
|
||
// `unique_system_id` varchar(255) DEFAULT NULL COMMENT '所属的全局唯一系统id',
|
||
// `system_pages_desc` longtext COMMENT '系统下所有页面描述',
|
||
// `system_summary` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '系统简介',
|
||
// `matched_labels` json DEFAULT NULL COMMENT '系统关联的标签',
|
||
// `memo_process_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否处理备忘录,0=未处理,1=已处理',
|
||
type System struct {
|
||
ID uint
|
||
SystemName string
|
||
SystemDesc *string
|
||
SystemURL string
|
||
ExperienceID *uint
|
||
UniqueSystemID *string
|
||
SystemPagesDesc *string
|
||
SystemSummary *string
|
||
MatchedLabels []byte
|
||
MemoProcessStatus int
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT,
|
||
// `relation_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
|
||
// `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
|
||
// `taskpoint_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
|
||
// `experience_id` int NOT NULL COMMENT '所属经验体的唯一ID',
|
||
// `status` tinyint DEFAULT NULL COMMENT '0:关闭,1:开启',
|
||
// `version` int DEFAULT NULL,
|
||
// `created_at` datetime NOT NULL COMMENT '创建时间',
|
||
// `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
// `last_heartbeat_time` datetime DEFAULT NULL COMMENT '最后一次心跳',
|
||
// `clean_status` int NOT NULL DEFAULT '0' COMMENT '0为清洗完毕,1为需要判断清洗,2为判断中,3为需要清洗',
|
||
// `dig_status` int NOT NULL DEFAULT '0' COMMENT '0=准备挖掘,1=挖掘中',
|
||
//
|
||
// 根据经验体ID来查询所有的关联插件
|
||
// TODO 应该使用新的taskpoint_id
|
||
type RelationPlugin struct {
|
||
UserID string
|
||
TaskpointID string
|
||
RelationID string
|
||
Status sql.NullInt64
|
||
Version sql.NullInt64
|
||
LastHeartbeatTime *time.Time
|
||
CleanStatus int
|
||
DigStatus int
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||
//
|
||
// `video_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '视频ID(与l_video_events表一致)',
|
||
// `relation_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关联经验体ID(支持UUID)',
|
||
// `video_path` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '视频文件路径',
|
||
// `start_time` bigint DEFAULT NULL COMMENT '视频开始毫秒时间戳',
|
||
// `end_time` bigint DEFAULT NULL COMMENT '视频结束毫秒时间戳',
|
||
// `upload_time` datetime DEFAULT NULL COMMENT '上传时间',
|
||
// `created_at` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
|
||
// `action_sequence` json DEFAULT NULL COMMENT '动作序列',
|
||
// `system_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '系统名称',
|
||
type Video struct {
|
||
VideoID string
|
||
RelationID string
|
||
VideoPath string
|
||
StartTime int64
|
||
EndTime int64
|
||
UploadTime time.Time
|
||
// 这是user action表的id集合
|
||
ActionSeq []byte
|
||
SystemName string
|
||
}
|
||
|
||
// `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||
//
|
||
// `parse_result` json NOT NULL COMMENT '解析结果(JSON格式)',
|
||
// `relation_id` varchar(64) NOT NULL COMMENT '采集订单ID(字符串)',
|
||
// `session_id` bigint NOT NULL COMMENT '会话ID',
|
||
// `clean_plan_id` int DEFAULT NULL COMMENT '关联的清洗计划ID',
|
||
// `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||
// `status` tinyint NOT NULL DEFAULT '0' COMMENT '解析任务状态:0=待处理,1=处理中,2=已完成,3=处理失败',
|
||
type MonitoringLogParseResult struct {
|
||
ParseResult []byte
|
||
RelationID string
|
||
SessionID uint
|
||
CleanPlanID *uint
|
||
Status int
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT,
|
||
//
|
||
// `extracted_subjective_knowledge_desc` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '主观知识描述',
|
||
// `session_id` bigint NOT NULL COMMENT '会话ID',
|
||
// `dialog_ids` json DEFAULT NULL COMMENT '关联的对话ID列表',
|
||
// `experience_id` int NOT NULL COMMENT '所属经验体的唯一ID',
|
||
// `version` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '所属经验体的版本号',
|
||
// `created_at` datetime NOT NULL COMMENT '创建时间',
|
||
// `updated_at` datetime NOT NULL COMMENT '更新时间',
|
||
// `status` int DEFAULT NULL COMMENT '0:通过挖掘产生但未实践 1:通过实践 2:实践未通过',
|
||
// `taskpoint_id` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '任务点id',
|
||
// `relation_id` varchar(200) DEFAULT NULL,
|
||
// `system_ids` json DEFAULT NULL COMMENT '系统ID列表',
|
||
// `page_ids` json DEFAULT NULL COMMENT '页面ID列表',
|
||
// `category_tags` varchar(255) DEFAULT NULL COMMENT '分类标签',
|
||
// `is_global_associated` int DEFAULT NULL COMMENT '是否全局关联 (0:否, 1:是)',
|
||
type SubjectKnowledge struct {
|
||
ExtractedSubjectiveKnowledgeDesc *string
|
||
SessionID uint
|
||
DialogIDs []byte
|
||
ExperienceID uint
|
||
Version *string
|
||
Status *int
|
||
TaskpointID *string
|
||
RelationID *string
|
||
SystemIDs []byte
|
||
PageIDs []byte
|
||
CategoryTags *string
|
||
IsGlobalAssociated *int
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT COMMENT '采集的操作信息唯一ID',
|
||
//
|
||
// `relation_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '插件&系统通讯关系唯一ID',
|
||
// `created_at` datetime(3) NOT NULL COMMENT '创建时间(操作上报的时间)',
|
||
// `updated_at` datetime(3) NOT NULL COMMENT '更新时间',
|
||
// `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '操作所在的系统名称',
|
||
// `page_url` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '操作所在页面的 URL',
|
||
// `element_text` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '被操作元素的文本',
|
||
// `event_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '用户执行的操作类型(如 click、input)',
|
||
// `page_html_path` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '页面 HTML 存储路径(OSS 路径)',
|
||
// `screenshot_path` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '页面截图存储路径(OSS 路径)',
|
||
// `element_tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '操作的元素标签(如 input、button等)',
|
||
// `status` varchar(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '0' COMMENT '是否参与客观知识生成(0=否,1=是)',
|
||
// `extra_data` json DEFAULT NULL COMMENT '用户行为数据集合',
|
||
// `user_id` int DEFAULT NULL,
|
||
// `experience_id` int DEFAULT NULL COMMENT '经验体id',
|
||
// `page_id` int DEFAULT NULL COMMENT '所属页面',
|
||
// `system_id` int DEFAULT NULL COMMENT '所属系统',
|
||
// `log_dig_status` int NOT NULL DEFAULT '0' COMMENT '0=待挖掘,1=挖掘中,2=已挖掘',
|
||
// `clean_page_desc` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗',
|
||
// `clean_system_desc` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗',
|
||
// `clean_html_diff` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗',
|
||
// `clean_elements_json` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗',
|
||
// `clean_page_inputs` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗',
|
||
// `clean_page_stats` int NOT NULL DEFAULT '0' COMMENT '0=未清洗,1=已清洗',
|
||
// `processing_delay` int DEFAULT NULL,
|
||
type UserAction struct {
|
||
ID uint
|
||
RelationID string
|
||
SystemName string
|
||
PageUrl string
|
||
ElementText *string
|
||
EventType string
|
||
PageHtmlPath *string
|
||
ScreenshotPath *string
|
||
ElementTag *string
|
||
Status string
|
||
ExtraData []byte
|
||
UserID uint
|
||
ExperienceID uint
|
||
PageID uint
|
||
SystemID uint
|
||
LogDigStatus int
|
||
CleanPageDesc int
|
||
CleanSystemDesc int
|
||
CleanHtmlDiff int
|
||
CleanElementsJson int
|
||
CleanPageInputs int
|
||
CleanPageStats int
|
||
ProcessingDelay *int
|
||
}
|
||
|
||
// `id` int NOT NULL AUTO_INCREMENT,
|
||
//
|
||
// `page_url` json NOT NULL DEFAULT (json_array()) COMMENT '页面URL(JSON 数组)',
|
||
// `page_desc` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '页面说明',
|
||
// `elements_json` json DEFAULT NULL COMMENT '页面元素结构',
|
||
// `statistics` json DEFAULT NULL COMMENT '页面统计结果&条件关系',
|
||
// `system_id` int NOT NULL COMMENT '所属系统的唯一ID',
|
||
// `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '所属系统名称',
|
||
// `created_at` datetime NOT NULL COMMENT '创建时间',
|
||
// `updated_at` datetime NOT NULL COMMENT '更新时间',
|
||
// `input_json` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
|
||
// `page_summary` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '页面简介',
|
||
// `clickable` json NOT NULL DEFAULT (json_array()) COMMENT '可点击元素列表,元素格式:<tag>_<text>,JSON 数组存储',
|
||
// `experience_id` int DEFAULT NULL COMMENT '经验体id',
|
||
// `page_type` json DEFAULT NULL COMMENT '页面类型与分析结果',
|
||
// `unique_page_id` varchar(255) DEFAULT NULL COMMENT '所属的全局唯一页面id',
|
||
// `unique_system_id` varchar(255) DEFAULT NULL COMMENT '所属的全局唯一系统id',
|
||
// `memo_process_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否处理备忘录,0=未处理,1=已处理',
|
||
type Page struct {
|
||
ID uint
|
||
PageUrl []byte
|
||
PageDesc string
|
||
ElementsJson []byte
|
||
Statistics []byte
|
||
SystemID uint
|
||
SystemName string
|
||
InputJson *string
|
||
PageSummary *string
|
||
Clickable []byte
|
||
PageType []byte
|
||
UniquePageID *string
|
||
UniqueSystemID *string
|
||
MemoProcessStatus int
|
||
}
|
||
|
||
func copy(db *sql.DB, ctx context.Context, projectID uint, _taskPointID string) error {
|
||
tx, err := db.Begin()
|
||
if err != nil {
|
||
log.Fatalf("failed to begin transaction: %v", err)
|
||
return err
|
||
}
|
||
defer tx.Rollback()
|
||
var project Project
|
||
query := "select id, user_id,name,env,status,state from l_projects lp where lp.id = ?"
|
||
row := tx.QueryRowContext(ctx, query, projectID)
|
||
if err := row.Scan(&project.ID, &project.UserID, &project.Name, &project.Env, &project.Status, &project.State); err != nil {
|
||
log.Fatalf("failed to scan row: %v", err)
|
||
return err
|
||
}
|
||
|
||
oldProjectID := project.ID
|
||
|
||
// 向project表插入这条查询出来的数据,然后记录返回的ID
|
||
xsql := "INSERT INTO l_projects (user_id, name, env, status, state, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)"
|
||
result, err := tx.ExecContext(ctx, xsql, project.UserID, project.Name, project.Env, project.Status, project.State, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data: %v", err)
|
||
return err
|
||
}
|
||
newProjectID, err := result.LastInsertId()
|
||
if err != nil {
|
||
log.Fatalf("failed to get last insert id: %v", err)
|
||
return err
|
||
}
|
||
|
||
println("新项目ID:", newProjectID)
|
||
|
||
var experience Experience
|
||
query = "select id, experience_title, version, publish_version, status, description from l_experiences le where le.session_id = ?"
|
||
row = tx.QueryRowContext(ctx, query, oldProjectID)
|
||
|
||
if err := row.Scan(&experience.ID, &experience.ExperienceTitle, &experience.Version, &experience.PublishVersion, &experience.ExpStatus, &experience.Description); err != nil {
|
||
log.Fatalf("failed to scan row: %v", err)
|
||
return err
|
||
}
|
||
|
||
oldExperienceID := experience.ID
|
||
|
||
// 插入一条新的经验体
|
||
xsql = "INSERT INTO l_experiences (experience_title, version, session_id, created_at, updated_at, publish_version, status, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
|
||
result, err = tx.ExecContext(ctx, xsql, experience.ExperienceTitle, experience.Version, newProjectID, time.Now(), time.Now(), experience.PublishVersion, experience.ExpStatus, experience.Description)
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_experiences: %v", err)
|
||
return err
|
||
}
|
||
|
||
// 获取新的ID
|
||
newExperienceID, err := result.LastInsertId()
|
||
if err != nil {
|
||
log.Fatalf("failed to get last insert id for l_experiences: %v", err)
|
||
}
|
||
|
||
println("新经验ID:", newExperienceID)
|
||
|
||
// 查询一个项目ID下面的所有消息
|
||
// TODO 这里chat——id需要使用更新后的,而不是查询得到的
|
||
// TODO task_id parent_task_id 也需要更新
|
||
query = "select id, user_id, chat_id, content, tree_type, source, type, task_id, parent_task_id, task_status, event_type, metadata from l_message lm where lm.project_id = ?"
|
||
rows, err := tx.QueryContext(ctx, query, oldProjectID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_message: %v", err)
|
||
return err
|
||
}
|
||
|
||
messages := make([]Message, 0)
|
||
for rows.Next() {
|
||
var message Message
|
||
if err := rows.Scan(&message.ID, &message.MsgUserID, &message.ChatID, &message.Content, &message.TreeType, &message.Source, &message.MsgType, &message.TaskID, &message.ParentTaskID, &message.TaskStatus, &message.EventType, &message.Metadata); err != nil {
|
||
log.Fatalf("failed to scan row from l_message: %v", err)
|
||
return err
|
||
}
|
||
|
||
messages = append(messages, message)
|
||
}
|
||
rows.Close()
|
||
|
||
msgIDRel := make(map[uint]uint)
|
||
|
||
// 单独处理消息插入,需要记录新旧ID的关系
|
||
for _, msg := range messages {
|
||
xsql = "INSERT INTO l_message (user_id, project_id, chat_id, content, tree_type, source, type, task_id, parent_task_id, task_status, event_type, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||
result, err = tx.ExecContext(ctx, xsql, msg.MsgUserID, newProjectID, msg.ChatID, msg.Content, msg.TreeType, msg.Source, msg.MsgType, msg.TaskID, msg.ParentTaskID, msg.TaskStatus, msg.EventType, msg.Metadata, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_message: %v", err)
|
||
return err
|
||
}
|
||
newMsgID, err := result.LastInsertId()
|
||
if err != nil {
|
||
log.Fatalf("failed to get last insert id for l_message: %v", err)
|
||
return err
|
||
}
|
||
msgIDRel[msg.ID] = uint(newMsgID)
|
||
}
|
||
|
||
for oldID, newID := range msgIDRel {
|
||
println("旧消息ID:", oldID, "新消息ID:", newID)
|
||
}
|
||
|
||
// 查询经验体ID下的系统
|
||
query = "select id, system_name, system_desc, system_url, unique_system_id, system_pages_desc, system_summary, matched_labels, memo_process_status from l_systems ls where ls.experience_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldExperienceID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_systems: %v", err)
|
||
return err
|
||
}
|
||
systems := make([]System, 0)
|
||
for rows.Next() {
|
||
var system System
|
||
if err := rows.Scan(&system.ID, &system.SystemName, &system.SystemDesc, &system.SystemURL, &system.UniqueSystemID, &system.SystemPagesDesc, &system.SystemSummary, &system.MatchedLabels, &system.MemoProcessStatus); err != nil {
|
||
log.Fatalf("failed to scan row from l_systems: %v", err)
|
||
return err
|
||
}
|
||
systems = append(systems, system)
|
||
}
|
||
rows.Close()
|
||
|
||
sysRel := make(map[uint]uint)
|
||
for _, sys := range systems {
|
||
xsql = "INSERT INTO l_systems (system_name, system_desc, system_url, experience_id, unique_system_id, system_pages_desc, system_summary, matched_labels, memo_process_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||
result, err = tx.ExecContext(ctx, xsql, sys.SystemName, sys.SystemDesc, sys.SystemURL, newExperienceID, sys.UniqueSystemID, sys.SystemPagesDesc, sys.SystemSummary, sys.MatchedLabels, sys.MemoProcessStatus, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_systems: %v", err)
|
||
return err
|
||
}
|
||
|
||
newSysID, err := result.LastInsertId()
|
||
if err != nil {
|
||
log.Fatalf("failed to get last insert id for l_systems: %v", err)
|
||
return err
|
||
}
|
||
sysRel[sys.ID] = uint(newSysID)
|
||
}
|
||
|
||
for oldID, newID := range sysRel {
|
||
println("旧系统ID:", oldID, "新系统ID:", newID)
|
||
}
|
||
|
||
query = "select relation_id, user_id, taskpoint_id, status, version, last_heartbeat_time, clean_status, dig_status from l_plugin_relation where experience_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldExperienceID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_plugin_relation: %v", err)
|
||
return err
|
||
}
|
||
relations := make([]RelationPlugin, 0)
|
||
for rows.Next() {
|
||
var relation RelationPlugin
|
||
if err := rows.Scan(&relation.RelationID, &relation.UserID, &relation.TaskpointID, &relation.Status, &relation.Version, &relation.LastHeartbeatTime, &relation.CleanStatus, &relation.DigStatus); err != nil {
|
||
log.Fatalf("failed to scan row from l_plugin_relation: %v", err)
|
||
return err
|
||
}
|
||
relations = append(relations, relation)
|
||
}
|
||
rows.Close()
|
||
|
||
newRelIDs := make([]string, 0)
|
||
relRelIDs := make(map[string]string)
|
||
for _, rel := range relations {
|
||
oldRelID := rel.RelationID
|
||
newRelID := rel.RelationID + "-" + strconv.FormatUint(uint64(newProjectID), 10)
|
||
xsql := "INSERT INTO l_plugin_relation (relation_id, user_id, taskpoint_id, status, version, last_heartbeat_time, experience_id, clean_status, dig_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||
_, err = tx.ExecContext(ctx, xsql, newRelID, rel.UserID, rel.TaskpointID, rel.Status, rel.Version, rel.LastHeartbeatTime, newExperienceID, rel.CleanStatus, rel.DigStatus, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_plugin_relation: %v", err)
|
||
return err
|
||
}
|
||
println("旧关联ID:", oldRelID, "新关联ID:", newRelID)
|
||
newRelIDs = append(newRelIDs, newRelID)
|
||
relRelIDs[oldRelID] = newRelID
|
||
// 通过关联ID来获取video的内容
|
||
query = "select video_id, video_path, start_time, end_time, upload_time, action_sequence, system_name from l_videos where relation_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldRelID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_video: %v", err)
|
||
return err
|
||
}
|
||
videos := make([]Video, 0)
|
||
for rows.Next() {
|
||
var video Video
|
||
if err := rows.Scan(&video.VideoID, &video.VideoPath, &video.StartTime, &video.EndTime, &video.UploadTime, &video.ActionSeq, &video.SystemName); err != nil {
|
||
log.Fatalf("failed to scan row from l_video: %v", err)
|
||
return err
|
||
}
|
||
videos = append(videos, video)
|
||
}
|
||
rows.Close()
|
||
|
||
// video可以支持批量插入
|
||
valueStrings := make([]string, 0, len(videos))
|
||
valueArgs := make([]any, 0, len(videos)*9) // 每个项目有9个字段
|
||
|
||
for _, video := range videos {
|
||
oldVideoID := video.VideoID
|
||
newVideoID := oldVideoID + "-" + strconv.FormatUint(uint64(newProjectID), 10)
|
||
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)")
|
||
valueArgs = append(valueArgs, newVideoID, newRelID, video.VideoPath, video.StartTime, video.EndTime, video.UploadTime, video.ActionSeq, video.SystemName, time.Now())
|
||
}
|
||
|
||
xsql = fmt.Sprintf("INSERT INTO l_videos (video_id, relation_id, video_path, start_time, end_time, upload_time, action_sequence, system_name, created_at) VALUES %s", strings.Join(valueStrings, ","))
|
||
result, err = tx.ExecContext(ctx, xsql, valueArgs...)
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_videos: %v", err)
|
||
return err
|
||
}
|
||
|
||
x, err := result.RowsAffected()
|
||
if err != nil {
|
||
log.Fatalf("failed to get rows affected for l_video: %v", err)
|
||
return err
|
||
}
|
||
println("插入视频:", x)
|
||
|
||
// 根据旧的relation_ID获取monitor_parse_result的内容
|
||
query = "select parse_result, session_id, clean_plan_id, status from l_monitoring_log_parse_result where relation_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldRelID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_monitoring_log_parse_result: %v", err)
|
||
return err
|
||
}
|
||
parses := make([]MonitoringLogParseResult, 0)
|
||
for rows.Next() {
|
||
var parse MonitoringLogParseResult
|
||
if err := rows.Scan(&parse.ParseResult, &parse.SessionID, &parse.CleanPlanID, &parse.Status); err != nil {
|
||
log.Fatalf("failed to scan row from l_monitoring_log_parse_result: %v", err)
|
||
return err
|
||
}
|
||
parses = append(parses, parse)
|
||
}
|
||
rows.Close()
|
||
// 使用新的relation_id和新的project_id插入新记录
|
||
valueStrings = make([]string, 0, len(parses))
|
||
valueArgs = make([]any, 0, len(parses)*6) // 每个项目有6个字段
|
||
|
||
for _, parse := range parses {
|
||
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?)")
|
||
valueArgs = append(valueArgs, parse.ParseResult, newRelID, newProjectID, parse.CleanPlanID, parse.Status, time.Now())
|
||
}
|
||
|
||
xsql = fmt.Sprintf("INSERT INTO l_monitoring_log_parse_result (parse_result, relation_id, session_id, clean_plan_id, status, created_at) VALUES %s", strings.Join(valueStrings, ","))
|
||
result, err = tx.ExecContext(ctx, xsql, valueArgs...)
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_monitoring_log_parse_result: %v", err)
|
||
return err
|
||
}
|
||
|
||
x, err = result.RowsAffected()
|
||
if err != nil {
|
||
log.Fatalf("failed to get rows affected for l_monitoring_log_parse_result: %v", err)
|
||
return err
|
||
}
|
||
println("插入日志解析结果:", x)
|
||
}
|
||
|
||
// 根据旧的经验体ID读取pages内容
|
||
query = "select id, page_url, page_desc, elements_json, statistics, system_id, system_name, input_json, page_summary, clickable, page_type, unique_page_id, unique_system_id, memo_process_status from l_pages where experience_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldExperienceID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_pages: %v", err)
|
||
return err
|
||
}
|
||
pages := make([]Page, 0)
|
||
for rows.Next() {
|
||
var page Page
|
||
if err := rows.Scan(&page.ID, &page.PageUrl, &page.PageDesc, &page.ElementsJson, &page.Statistics, &page.SystemID, &page.SystemName, &page.InputJson, &page.PageSummary, &page.Clickable, &page.PageType, &page.UniquePageID, &page.UniqueSystemID, &page.MemoProcessStatus); err != nil {
|
||
log.Fatalf("failed to scan row from l_pages: %v", err)
|
||
return err
|
||
}
|
||
pages = append(pages, page)
|
||
}
|
||
rows.Close()
|
||
|
||
pageIDRel := make(map[uint]uint)
|
||
actionIDRel := make(map[uint]uint)
|
||
for _, page := range pages {
|
||
oldPageID := page.ID
|
||
// 每个page单独插入
|
||
newSysID, ok := sysRel[page.SystemID]
|
||
if !ok {
|
||
log.Fatalf("failed to find new system ID for old system ID: %d", page.SystemID)
|
||
return fmt.Errorf("failed to find new system ID for old system ID: %d", page.SystemID)
|
||
}
|
||
|
||
xsql = "INSERT INTO l_pages (page_url, page_desc, elements_json, statistics, system_id, system_name, input_json, page_summary, clickable, page_type, experience_id, unique_page_id, unique_system_id, memo_process_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||
result, err = tx.ExecContext(ctx, xsql, page.PageUrl, page.PageDesc, page.ElementsJson, page.Statistics, newSysID, page.SystemName, page.InputJson, page.PageSummary, page.Clickable, page.PageType, newExperienceID, page.UniquePageID, page.UniqueSystemID, page.MemoProcessStatus, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_pages: %v", err)
|
||
return err
|
||
}
|
||
newPageID, err := result.LastInsertId()
|
||
if err != nil {
|
||
log.Fatalf("failed to get rows affected for l_pages: %v", err)
|
||
return err
|
||
}
|
||
println("插入页面:", newPageID)
|
||
pageIDRel[oldPageID] = uint(newPageID)
|
||
|
||
// 使用旧的PageID获取user_action表中的数据
|
||
query = "select id, relation_id, system_name, page_url, element_text, event_type, page_html_path, screenshot_path, element_tag, status, extra_data, user_id, experience_id, page_id, system_id, log_dig_status, clean_page_desc, clean_system_desc, clean_html_diff, clean_elements_json, clean_page_inputs, clean_page_stats, processing_delay from l_user_actions where page_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldPageID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_user_actions: %v", err)
|
||
return err
|
||
}
|
||
actions := make([]UserAction, 0)
|
||
for rows.Next() {
|
||
var action UserAction
|
||
if err := rows.Scan(&action.ID, &action.RelationID, &action.SystemName, &action.PageUrl, &action.ElementText, &action.EventType, &action.PageHtmlPath, &action.ScreenshotPath, &action.ElementTag, &action.Status, &action.ExtraData, &action.UserID, &action.ExperienceID, &action.PageID, &action.SystemID, &action.LogDigStatus, &action.CleanPageDesc, &action.CleanSystemDesc, &action.CleanHtmlDiff, &action.CleanElementsJson, &action.CleanPageInputs, &action.CleanPageStats, &action.ProcessingDelay); err != nil {
|
||
log.Fatalf("failed to scan row from l_user_actions: %v", err)
|
||
return err
|
||
}
|
||
actions = append(actions, action)
|
||
}
|
||
rows.Close()
|
||
|
||
// 需要收集每个新插入的user_action的id,所以要一个一个插入
|
||
for _, action := range actions {
|
||
oldRelID := action.RelationID
|
||
newRelID := oldRelID + "-" + strconv.FormatUint(uint64(newProjectID), 10)
|
||
xsql = "INSERT INTO l_user_actions (relation_id, system_name, page_url, element_text, event_type, page_html_path, screenshot_path, element_tag, status, extra_data, user_id, experience_id, page_id, system_id, log_dig_status, clean_page_desc, clean_system_desc, clean_html_diff, clean_elements_json, clean_page_inputs, clean_page_stats, processing_delay, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||
result, err = tx.ExecContext(ctx, xsql, newRelID, action.SystemName, action.PageUrl, action.ElementText, action.EventType, action.PageHtmlPath, action.ScreenshotPath, action.ElementTag, action.Status, action.ExtraData, action.UserID, newExperienceID, newPageID, sysRel[action.SystemID], action.LogDigStatus, action.CleanPageDesc, action.CleanSystemDesc, action.CleanHtmlDiff, action.CleanElementsJson, action.CleanPageInputs, action.CleanPageStats, action.ProcessingDelay, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_user_actions: %v", err)
|
||
return err
|
||
}
|
||
x, err := result.LastInsertId()
|
||
if err != nil {
|
||
log.Fatalf("failed to get last insert id for l_user_actions: %v", err)
|
||
return err
|
||
}
|
||
actionIDRel[action.ID] = uint(x)
|
||
}
|
||
}
|
||
for old, new := range actionIDRel {
|
||
fmt.Printf("旧ActionID: %d, 新ActionID: %d\n", old, new)
|
||
}
|
||
|
||
updateVideoIDSequence := make(map[uint][]byte)
|
||
for _, newRelID := range newRelIDs {
|
||
videoQuery := "SELECT id, action_sequence FROM l_videos WHERE relation_id = ?"
|
||
videoRows, err := tx.QueryContext(ctx, videoQuery, newRelID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_videos: %v", err)
|
||
return err
|
||
}
|
||
|
||
for videoRows.Next() {
|
||
var videoID uint
|
||
var actionSequence string
|
||
if err := videoRows.Scan(&videoID, &actionSequence); err != nil {
|
||
log.Fatalf("failed to scan row from l_videos: %v", err)
|
||
return err
|
||
}
|
||
|
||
// 2. 然后将action_sequence解析为json数组
|
||
var actionIDs []uint
|
||
if err := json.Unmarshal([]byte(actionSequence), &actionIDs); err != nil {
|
||
log.Fatalf("failed to unmarshal action_sequence: %v", err)
|
||
return err
|
||
}
|
||
fmt.Printf("old actions IDs: %+v\n", actionIDs)
|
||
|
||
// 3. 然后根据新旧ID关系来创建好新的json数组
|
||
var newActionIDs []uint
|
||
for _, oldID := range actionIDs {
|
||
if newID, ok := actionIDRel[oldID]; ok {
|
||
newActionIDs = append(newActionIDs, newID)
|
||
}
|
||
}
|
||
fmt.Printf("new actions IDs: %+v\n", newActionIDs)
|
||
|
||
// 4. 根据ID来更新video表
|
||
newActionSequence, err := json.Marshal(newActionIDs)
|
||
if err != nil {
|
||
log.Fatalf("failed to marshal new action_sequence: %v", err)
|
||
return err
|
||
}
|
||
|
||
updateVideoIDSequence[videoID] = newActionSequence
|
||
}
|
||
videoRows.Close()
|
||
}
|
||
updateVideoQuery := "UPDATE l_videos SET action_sequence = ? WHERE id = ?"
|
||
for videoID, newActionSequence := range updateVideoIDSequence {
|
||
if _, err := tx.ExecContext(ctx, updateVideoQuery, newActionSequence, videoID); err != nil {
|
||
log.Fatalf("failed to update l_video: %v", err)
|
||
return err
|
||
}
|
||
println("更新视频ID:", videoID, "的用户动作序列")
|
||
}
|
||
|
||
// 根据旧的经验体ID获取主观知识表
|
||
query = "select extracted_subjective_knowledge_desc, session_id, dialog_ids, experience_id, version, status, taskpoint_id, relation_id, system_ids, page_ids, category_tags, is_global_associated from l_extracted_subjective_knowledges where experience_id = ?"
|
||
rows, err = tx.QueryContext(ctx, query, oldExperienceID)
|
||
if err != nil {
|
||
log.Fatalf("failed to query l_extracted_subjective_knowledges: %v", err)
|
||
return err
|
||
}
|
||
subjectKnowledges := make([]SubjectKnowledge, 0)
|
||
for rows.Next() {
|
||
var sk SubjectKnowledge
|
||
if err := rows.Scan(&sk.ExtractedSubjectiveKnowledgeDesc, &sk.SessionID, &sk.DialogIDs, &sk.ExperienceID, &sk.Version, &sk.Status, &sk.TaskpointID, &sk.RelationID, &sk.SystemIDs, &sk.PageIDs, &sk.CategoryTags, &sk.IsGlobalAssociated); err != nil {
|
||
log.Fatalf("failed to scan row from l_extracted_subjective_knowledges: %v", err)
|
||
return err
|
||
}
|
||
subjectKnowledges = append(subjectKnowledges, sk)
|
||
}
|
||
rows.Close()
|
||
|
||
for _, sk := range subjectKnowledges {
|
||
// 解析DialogIDs
|
||
var dialogIDs []uint
|
||
if err := json.Unmarshal(sk.DialogIDs, &dialogIDs); err != nil {
|
||
log.Fatalf("failed to unmarshal dialog_ids: %v", err)
|
||
return err
|
||
}
|
||
var newDialogIDs []uint
|
||
for _, oldID := range dialogIDs {
|
||
if newID, ok := msgIDRel[oldID]; ok {
|
||
newDialogIDs = append(newDialogIDs, newID)
|
||
}
|
||
}
|
||
newDialogIDsJson, err := json.Marshal(newDialogIDs)
|
||
if err != nil {
|
||
log.Fatalf("failed to marshal new dialog_ids: %v", err)
|
||
return err
|
||
}
|
||
|
||
// 解析systemIDs
|
||
var systemIDs []uint
|
||
if err := json.Unmarshal(sk.SystemIDs, &systemIDs); err != nil {
|
||
log.Fatalf("failed to unmarshal system_ids: %v", err)
|
||
return err
|
||
}
|
||
var newSystemIDs []uint
|
||
for _, oldID := range systemIDs {
|
||
if newID, ok := sysRel[oldID]; ok {
|
||
newSystemIDs = append(newSystemIDs, newID)
|
||
}
|
||
}
|
||
newSystemIDsJson, err := json.Marshal(newSystemIDs)
|
||
if err != nil {
|
||
log.Fatalf("failed to marshal new system_ids: %v", err)
|
||
return err
|
||
}
|
||
|
||
// 解析pageIDs
|
||
var pageIDs []uint
|
||
if err := json.Unmarshal(sk.PageIDs, &pageIDs); err != nil {
|
||
log.Fatalf("failed to unmarshal page_ids: %v", err)
|
||
return err
|
||
}
|
||
var newPageIDs []uint
|
||
for _, oldID := range pageIDs {
|
||
if newID, ok := pageIDRel[oldID]; ok {
|
||
newPageIDs = append(newPageIDs, newID)
|
||
}
|
||
}
|
||
newPageIDsJson, err := json.Marshal(newPageIDs)
|
||
if err != nil {
|
||
log.Fatalf("failed to marshal new page_ids: %v", err)
|
||
return err
|
||
}
|
||
|
||
newRelID := relRelIDs[*sk.RelationID]
|
||
xsql = "INSERT INTO l_extracted_subjective_knowledges (extracted_subjective_knowledge_desc, session_id, dialog_ids, experience_id, version, status, taskpoint_id, relation_id, system_ids, page_ids, category_tags, is_global_associated, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||
result, err = tx.ExecContext(ctx, xsql, sk.ExtractedSubjectiveKnowledgeDesc, newProjectID, newDialogIDsJson, newExperienceID, sk.Version, sk.Status, sk.TaskpointID, newRelID, newSystemIDsJson, newPageIDsJson, sk.CategoryTags, sk.IsGlobalAssociated, time.Now(), time.Now())
|
||
if err != nil {
|
||
log.Fatalf("failed to insert data into l_extracted_subjective_knowledges: %v", err)
|
||
return err
|
||
}
|
||
|
||
x, err := result.RowsAffected()
|
||
if err != nil {
|
||
log.Fatalf("failed to get rows affected for l_extracted_subjective_knowledges: %v", err)
|
||
return err
|
||
}
|
||
println("插入主观知识:", x)
|
||
}
|
||
|
||
err = tx.Commit()
|
||
if err != nil {
|
||
log.Fatalf("failed to commit transaction: %v", err)
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|