Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
54c624bde6 | |||
1a473d8b3a | |||
348391cd81 |
@ -10,5 +10,6 @@ import (
|
|||||||
|
|
||||||
// mcube
|
// mcube
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
cmd.Start()
|
cmd.Start()
|
||||||
}
|
}
|
||||||
|
@ -107,4 +107,250 @@ func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) (
|
|||||||
|
|
||||||
## 任务状态更新
|
## 任务状态更新
|
||||||
|
|
||||||
|
```go
|
||||||
|
// 更新Job状态
|
||||||
|
func (i *impl) UpdateJobTaskStatus(ctx context.Context, in *task.UpdateJobTaskStatusRequest) (
|
||||||
|
*task.JobTask, error) {
|
||||||
|
ins, err := i.DescribeJobTask(ctx, task.NewDescribeJobTaskRequest(in.Id))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 校验更新合法性
|
||||||
|
err = i.CheckAllowUpdate(ctx, ins, in.UpdateToken, in.ForceUpdateStatus)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
i.log.Debug().Msgf("更新任务状态: %s,当前状态: %s, 更新状态: %s",
|
||||||
|
ins.Spec.TaskId, ins.Status.Stage, in.Stage)
|
||||||
|
// 状态更新
|
||||||
|
ins.Status.UpdateStatus(in)
|
||||||
|
|
||||||
|
// 更新数据库
|
||||||
|
if err := i.updateJobTaskStatus(ctx, ins); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job Task状态变更回调
|
||||||
|
i.JobTaskStatusChangedCallback(ctx, ins)
|
||||||
|
|
||||||
|
// Pipeline Task 状态变更回调
|
||||||
|
if ins.Spec.PipelineTask != "" {
|
||||||
|
// 如果状态未变化, 不触发流水线更新
|
||||||
|
if !in.ForceTriggerPipeline && !ins.Status.Changed {
|
||||||
|
i.log.Debug().Msgf("task %s status not changed: [%s], skip update pipeline", in.Id, in.Stage)
|
||||||
|
return ins, nil
|
||||||
|
}
|
||||||
|
_, err := i.PipelineTaskStatusChanged(ctx, ins)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ins, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *impl) JobTaskStatusChangedCallback(ctx context.Context, in *task.JobTask) {
|
||||||
|
// 状态未变化不通知
|
||||||
|
if in.Status == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 状态未变化不通知
|
||||||
|
if !in.Status.Changed {
|
||||||
|
i.log.Debug().Msgf("task %s status not changed [%s], skip status callback", in.Spec.TaskId, in.Status.Stage)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
i.log.Debug().Msgf("task %s 执行状态变化回调...", in.Spec.TaskId)
|
||||||
|
|
||||||
|
// 个人通知
|
||||||
|
for index := range in.Spec.MentionUsers {
|
||||||
|
mu := in.Spec.MentionUsers[index]
|
||||||
|
i.TaskMention(ctx, mu, in)
|
||||||
|
}
|
||||||
|
if len(in.Spec.MentionUsers) > 0 {
|
||||||
|
i.updateJobTaskMentionUser(ctx, in.Spec.TaskId, in.Spec.MentionUsers)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 群组通知
|
||||||
|
imRobotHooks := in.Spec.MatchedImRobotNotify(in.Status.Stage.String())
|
||||||
|
i.log.Debug().Msgf("task %s 群组通知: %v", in.Spec.TaskId, imRobotHooks)
|
||||||
|
i.hook.SendTaskStatus(ctx, imRobotHooks, in)
|
||||||
|
if len(imRobotHooks) > 0 {
|
||||||
|
i.updateJobTaskImRobotNotify(ctx, in.Spec.TaskId, imRobotHooks)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WebHook回调
|
||||||
|
webhooks := in.Spec.MatchedWebHooks(in.Status.Stage.String())
|
||||||
|
i.log.Debug().Msgf("task %s WebHook通知: %v", in.Spec.TaskId, webhooks)
|
||||||
|
i.hook.SendTaskStatus(ctx, webhooks, in)
|
||||||
|
if len(webhooks) > 0 {
|
||||||
|
i.updateJobTaskWebHook(ctx, in.Spec.TaskId, webhooks)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Pipeline 事件触发
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Pipeline中任务有变化时,
|
||||||
|
// 如果执行成功则 继续执行, 如果失败则标记Pipeline结束
|
||||||
|
// 当所有任务成功结束时标记Pipeline执行成功
|
||||||
|
func (i *impl) PipelineTaskStatusChanged(ctx context.Context, in *task.JobTask) (
|
||||||
|
*task.PipelineTask, error) {
|
||||||
|
if in == nil || in.Status == nil {
|
||||||
|
return nil, exception.NewBadRequest("job task or job task status is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if in.Spec.PipelineTask == "" {
|
||||||
|
return nil, exception.NewBadRequest("Pipeline Id参数缺失")
|
||||||
|
}
|
||||||
|
|
||||||
|
runErrorJobTasks := []*task.UpdateJobTaskStatusRequest{}
|
||||||
|
// 获取Pipeline Task, 因为Job Task是先保存在触发的回调, 这里获取的Pipeline Task是最新的
|
||||||
|
descReq := task.NewDescribePipelineTaskRequest(in.Spec.PipelineTask)
|
||||||
|
p, err := i.DescribePipelineTask(ctx, descReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// 更新当前任务的pipeline task状态
|
||||||
|
i.mustUpdatePipelineStatus(ctx, p)
|
||||||
|
|
||||||
|
// 如果JobTask正常执行, 则等待回调更新, 如果执行失败 则需要立即更新JobTask状态
|
||||||
|
for index := range runErrorJobTasks {
|
||||||
|
_, err = i.UpdateJobTaskStatus(ctx, runErrorJobTasks[index])
|
||||||
|
if err != nil {
|
||||||
|
p.MarkedFailed(err)
|
||||||
|
i.mustUpdatePipelineStatus(ctx, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 更新Pipeline Task 运行时环境变量
|
||||||
|
p.Status.RuntimeEnvs.Merge(in.RuntimeRunParams()...)
|
||||||
|
|
||||||
|
switch in.Status.Stage {
|
||||||
|
case task.STAGE_PENDDING,
|
||||||
|
task.STAGE_SCHEDULING,
|
||||||
|
task.STAGE_CREATING,
|
||||||
|
task.STAGE_ACTIVE,
|
||||||
|
task.STAGE_CANCELING:
|
||||||
|
// Task状态无变化
|
||||||
|
return p, nil
|
||||||
|
case task.STAGE_CANCELED:
|
||||||
|
// 任务取消, pipeline 取消执行
|
||||||
|
p.MarkedCanceled()
|
||||||
|
return p, nil
|
||||||
|
case task.STAGE_FAILED:
|
||||||
|
// 任务执行结果确认失败
|
||||||
|
if in.IsConfirmEnabled() && in.IsConfirming() {
|
||||||
|
// 状态确认中
|
||||||
|
i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
// 任务执行失败, 更新Pipeline状态为失败
|
||||||
|
if !in.Spec.RunParams.IgnoreFailed {
|
||||||
|
p.MarkedFailed(in.Status.MessageToError())
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
case task.STAGE_SUCCEEDED:
|
||||||
|
// 任务执行结果确认失败
|
||||||
|
if in.IsConfirmEnabled() && in.IsConfirming() {
|
||||||
|
// 状态确认中
|
||||||
|
i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
// 任务运行成功, pipeline继续执行
|
||||||
|
i.log.Info().Msgf("task: %s run successed", in.Spec.TaskId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
task执行成功或者忽略执行失败, 此时pipeline 仍然处于运行中, 需要获取下一个任务执行
|
||||||
|
*/
|
||||||
|
nexts, err := p.NextRun()
|
||||||
|
if err != nil {
|
||||||
|
p.MarkedFailed(err)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果没有需要执行的任务, Pipeline执行结束, 更新Pipeline状态为成功
|
||||||
|
if nexts == nil || nexts.Len() == 0 {
|
||||||
|
p.MarkedSuccess()
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果有需要执行的JobTask, 继续执行
|
||||||
|
for index := range nexts.Items {
|
||||||
|
item := nexts.Items[index]
|
||||||
|
// 如果任务执行成功则等待任务的回调更新任务状态
|
||||||
|
// 如果任务执行失败, 直接更新任务状态
|
||||||
|
t, err := i.RunJob(ctx, item.Spec)
|
||||||
|
if err != nil {
|
||||||
|
updateT := task.NewUpdateJobTaskStatusRequest(item.Spec.TaskId)
|
||||||
|
updateT.UpdateToken = item.Spec.UpdateToken
|
||||||
|
updateT.MarkError(err)
|
||||||
|
runErrorJobTasks = append(runErrorJobTasks, updateT)
|
||||||
|
} else {
|
||||||
|
item.Status = t.Status
|
||||||
|
item.Job = t.Job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
如何获取下一组需要执行的任务: 任务执行调度
|
||||||
|
```go
|
||||||
|
// 返回下个需要执行的JobTask, 允许一次并行执行多个(批量执行)
|
||||||
|
// Task DryRun属性要继承PipelineTask
|
||||||
|
func (p *PipelineTask) NextRun() (*JobTaskSet, error) {
|
||||||
|
set := NewJobTaskSet()
|
||||||
|
var stage *StageStatus
|
||||||
|
|
||||||
|
if p.Status == nil || p.Pipeline == nil {
|
||||||
|
return set, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 需要未执行完成的Job Tasks
|
||||||
|
stages := p.Status.StageStatus
|
||||||
|
for i := range stages {
|
||||||
|
stage = stages[i]
|
||||||
|
|
||||||
|
// 找出Stage中未执行完的Job Task
|
||||||
|
set = stage.UnCompleteJobTask(p.Params.DryRun)
|
||||||
|
set.UpdateFromPipelineTask(p)
|
||||||
|
// 如果找到 直接Break
|
||||||
|
if set.Len() > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果所有Stage寻找完,都没找到, 表示PipelineTask执行完成
|
||||||
|
if set.Len() == 0 {
|
||||||
|
return set, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: 如果这些未执行当中的Job Task 有处于运行中的, 不会执行下个一个任务
|
||||||
|
if set.HasStage(STAGE_ACTIVE) {
|
||||||
|
return set, exception.NewConflict("Stage 还处于运行中")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 当未执行的任务中,没有运行中的时,剩下的就是需要被执行的任务
|
||||||
|
tasks := set.GetJobTaskByStage(STAGE_PENDDING)
|
||||||
|
|
||||||
|
nextTasks := NewJobTaskSet()
|
||||||
|
stageSpec := p.Pipeline.GetStage(stage.Name)
|
||||||
|
if stageSpec.IsParallel {
|
||||||
|
// 并行任务 返回该Stage所有等待执行的job
|
||||||
|
nextTasks.Add(tasks...)
|
||||||
|
} else {
|
||||||
|
// 串行任务取第一个
|
||||||
|
nextTasks.Add(tasks[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
return nextTasks.UpdateFromPipelineTask(p), nil
|
||||||
|
}
|
||||||
|
```
|
47
skills/interview/README.md
Normal file
47
skills/interview/README.md
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
# 面试
|
||||||
|
|
||||||
|
## 项目准备
|
||||||
|
|
||||||
|
基础课: 能够自己写项目, 不能去问很基础的技术问题, 你需要问题的业务问题,一定要让人知道你有做项目的基础技术
|
||||||
|
|
||||||
|
关于你基础知识的加分项:
|
||||||
|
1. 博客
|
||||||
|
2. 项目: 代码,基础的练习代码 (基本的算法模型, 基本工具)
|
||||||
|
3. 准备一个 吃透的 基本原理
|
||||||
|
1. Go调度模型,其他语言是怎么设计(k8s 调度器), 如果理解后 融合到自己的项目中去,比如Job执行系统
|
||||||
|
2. 内存分配, Go语言的内存具体分配情况, 对比Rust, 内存泄露,如何自己操作内存, 比例都次copy,节省性能开销 "8" -> 8, 描述转化过程,描述内存开销 var a string = "8" 4byte
|
||||||
|
4. 提前涮一些算法, 性能优化: 去掉无用操作, (10 io, 1 io), 避免对象的重复分配
|
||||||
|
|
||||||
|
项目课:
|
||||||
|
1. book api, 脚本项目: 可用使用Go来开发一些 基本工具(运维)
|
||||||
|
2. 接口开发能力, crud项目的开发能力, vblog 项目, 合理组织工程架构
|
||||||
|
3. 微服务开发能力: rpc, grpc, kafka, 缓存,中央化认证, ...
|
||||||
|
4. 业务分析能力: 如何拆分业务模块, 业务定义(需要收集和分析)
|
||||||
|
|
||||||
|
## 面试
|
||||||
|
|
||||||
|
+ 筛选公司, 针对你筛选公司,做简历定制化, 做公司背调, 了解公司的产品
|
||||||
|
+ 把JD 需要的技能加强,不需要的做精简
|
||||||
|
+ 了解公司的产品, 非技术面 共同的话题
|
||||||
|
|
||||||
|
+ 简历的编写
|
||||||
|
+ 后端: 工程体现, 架构, 开发方向
|
||||||
|
+ Web开发(API, 类似单体服务, 初级岗位, RESTful接口)
|
||||||
|
+ 微服务开发(中台, MQ, RPC/GRPC)
|
||||||
|
+ 前端
|
||||||
|
+ 大前端(h5开发): pc/小程序/web
|
||||||
|
+ Web开发(JS体系, vue2/3, react, js/css)
|
||||||
|
+ 专业前端(ios/andriod)
|
||||||
|
+ 全栈: 人员不够, 团队初期
|
||||||
|
+ 一个人 就是一个团队 (运维开发)
|
||||||
|
+ 前端(vue3) + 后端(单体/分布式)
|
||||||
|
|
||||||
|
+ 投递简历:
|
||||||
|
+ 晚上8点过后投递
|
||||||
|
+ 面试 不要提前(5 ~ 10分钟)
|
||||||
|
|
||||||
|
+ 面试: 随机性很大(面试官因数很大)
|
||||||
|
+ 面试前 多做准备(提前1个月刷算法题)
|
||||||
|
+ 多面,多总结(好的面试都能发现自己的不足,然后提升自己)
|
||||||
|
+ 面试官和眼缘(职位的等级匹配)
|
||||||
|
+ 没有合适的机会的话,是可以去外部(转行,确的是实践经验(1~2年,再找下一份工作会非常容易))
|
12
skills/interview/basic_test.go
Normal file
12
skills/interview/basic_test.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package interview_test
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
// 变长 1 ~ 3
|
||||||
|
func TestBy(t *testing.T) {
|
||||||
|
|
||||||
|
// int64 (1)
|
||||||
|
// int8(1)
|
||||||
|
t.Log([]byte("1024"))
|
||||||
|
// int16 1024
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user